diff --git a/docs/release-notes/release-notes-0.8.0.md b/docs/release-notes/release-notes-0.8.0.md index dc552c2e4..cc20c0960 100644 --- a/docs/release-notes/release-notes-0.8.0.md +++ b/docs/release-notes/release-notes-0.8.0.md @@ -38,6 +38,10 @@ `AnchorVirtualPsbts`. A new configuration is available to control the sweeping via the flag `wallet.sweep-orphan-utxos`. +- [RFQ buy/sell accepts are now written to the database](https://github.com/lightninglabs/taproot-assets/pull/1863) + `rfq_policies` table whenever a policy is agreed, giving us an audit trail + and keeping quotes alive across restarts. + ## RPC Updates - [PR#1841](https://github.com/lightninglabs/taproot-assets/pull/1841): Remove diff --git a/itest/rfq_test.go b/itest/rfq_test.go index ff7ecc3eb..0471ab4ec 100644 --- a/itest/rfq_test.go +++ b/itest/rfq_test.go @@ -228,6 +228,19 @@ func testRfqAssetBuyHtlcIntercept(t *harnessTest) { expectedAssetID := mintedAssetId require.Equal(t.t, expectedAssetID, actualAssetID) + // Restart Bob's tapd to ensure the accepted quote policy survives a + // restart and is restored. + require.NoError(t.t, ts.BobTapd.stop(false)) + require.NoError(t.t, ts.BobTapd.start(false)) + + // Carol should still see the accepted quote after Bob's restart. + acceptedQuotes, err = ts.CarolTapd.QueryPeerAcceptedQuotes( + ctxt, &rfqrpc.QueryPeerAcceptedQuotesRequest{}, + ) + require.NoError(t.t, err) + require.Len(t.t, acceptedQuotes.BuyQuotes, 1) + acceptedQuote = acceptedQuotes.BuyQuotes[0] + // Carol will now use the accepted quote (received from Bob) to create // a lightning invoice which will be given to and settled by Alice. // diff --git a/rfq/manager.go b/rfq/manager.go index 66dba3005..5054c846c 100644 --- a/rfq/manager.go +++ b/rfq/manager.go @@ -118,6 +118,9 @@ type ManagerCfg struct { // helps us communicate custom feature bits with our peer. AuxChanNegotiator *tapfeatures.AuxChannelNegotiator + // PolicyStore provides persistence for agreed RFQ policies. + PolicyStore PolicyStore + // AcceptPriceDeviationPpm is the price deviation in // parts per million that is accepted by the RFQ negotiator. // @@ -179,34 +182,6 @@ type Manager struct { // events. acceptHtlcEvents chan *AcceptHtlcEvent - // peerAcceptedBuyQuotes holds buy quotes for assets that our node has - // requested and that have been accepted by peer nodes. These quotes are - // exclusively used by our node for the acquisition of assets, as they - // represent agreed-upon terms for purchase transactions with our peers. - peerAcceptedBuyQuotes lnutils.SyncMap[SerialisedScid, rfqmsg.BuyAccept] - - // peerAcceptedSellQuotes holds sell quotes for assets that our node has - // requested and that have been accepted by peer nodes. These quotes are - // exclusively used by our node for the sale of assets, as they - // represent agreed-upon terms for sale transactions with our peers. - peerAcceptedSellQuotes lnutils.SyncMap[ - SerialisedScid, rfqmsg.SellAccept, - ] - - // localAcceptedBuyQuotes holds buy quotes for assets that our node has - // accepted and that have been requested by peer nodes. These quotes are - // exclusively used by our node for the acquisition of assets, as they - // represent agreed-upon terms for purchase transactions with our peers. - localAcceptedBuyQuotes lnutils.SyncMap[SerialisedScid, rfqmsg.BuyAccept] - - // localAcceptedSellQuotes holds sell quotes for assets that our node - // has accepted and that have been requested by peer nodes. These quotes - // are exclusively used by our node for the sale of assets, as they - // represent agreed-upon terms for sale transactions with our peers. - localAcceptedSellQuotes lnutils.SyncMap[ - SerialisedScid, rfqmsg.SellAccept, - ] - // groupKeyLookupCache is a map that helps us quickly perform an // in-memory look up of the group an asset belongs to. Since this // information is static and generated during minting, it is not @@ -234,10 +209,6 @@ func NewManager(cfg ManagerCfg) (*Manager, error) { outgoingMessages: make(chan rfqmsg.OutgoingMsg), acceptHtlcEvents: make(chan *AcceptHtlcEvent), - peerAcceptedBuyQuotes: lnutils.SyncMap[ - SerialisedScid, rfqmsg.BuyAccept]{}, - peerAcceptedSellQuotes: lnutils.SyncMap[ - SerialisedScid, rfqmsg.SellAccept]{}, subscribers: lnutils.SyncMap[ uint64, *fn.EventReceiver[fn.Event]]{}, @@ -264,13 +235,14 @@ func (m *Manager) startSubsystems(ctx context.Context) error { SpecifierChecker: m.AssetMatchesSpecifier, NoOpHTLCs: m.cfg.NoOpHTLCs, AuxChanNegotiator: m.cfg.AuxChanNegotiator, + PolicyStore: m.cfg.PolicyStore, }) if err != nil { return fmt.Errorf("error initializing RFQ order handler: %w", err) } - if err := m.orderHandler.Start(); err != nil { + if err := m.orderHandler.Start(ctx); err != nil { return fmt.Errorf("unable to start RFQ order handler: %w", err) } @@ -435,7 +407,7 @@ func (m *Manager) handleIncomingMessage(incomingMsg rfqmsg.IncomingMsg) error { // quote so that it can be used to send a payment by our // lightning node. scid := msg.ShortChannelId() - m.peerAcceptedBuyQuotes.Store(scid, msg) + m.orderHandler.peerAcceptedBuyQuotes.Store(scid, msg) // Since we're going to buy assets from our peer, we // need to make sure we can identify the incoming asset @@ -488,7 +460,7 @@ func (m *Manager) handleIncomingMessage(incomingMsg rfqmsg.IncomingMsg) error { // quote so that it can be used to send a payment by our // lightning node. scid := msg.ShortChannelId() - m.peerAcceptedSellQuotes.Store(scid, msg) + m.orderHandler.peerAcceptedSellQuotes.Store(scid, msg) // Notify subscribers of the incoming peer accepted // asset sell quote. @@ -522,16 +494,16 @@ func (m *Manager) handleOutgoingMessage(outgoingMsg rfqmsg.OutgoingMsg) error { // we inform our peer of our decision, we inform the order // handler that we are willing to sell the asset subject to a // sale policy. - m.orderHandler.RegisterAssetSalePolicy(*msg) - - // We want to store that we accepted the buy quote, in case we - // need to look it up for a direct peer payment. - m.localAcceptedBuyQuotes.Store(msg.ShortChannelId(), *msg) + err := m.orderHandler.RegisterAssetSalePolicy(*msg) + if err != nil { + return fmt.Errorf("error registering asset sale "+ + "policy: %w", err) + } // Since our peer is going to buy assets from us, we need to // make sure we can identify the forwarded asset payment by the // outgoing SCID alias within the onion packet. - err := m.addScidAlias( + err = m.addScidAlias( uint64(msg.ShortChannelId()), msg.Request.AssetSpecifier, msg.Peer, ) @@ -545,11 +517,11 @@ func (m *Manager) handleOutgoingMessage(outgoingMsg rfqmsg.OutgoingMsg) error { // we inform our peer of our decision, we inform the order // handler that we are willing to buy the asset subject to a // purchase policy. - m.orderHandler.RegisterAssetPurchasePolicy(*msg) - - // We want to store that we accepted the sell quote, in case we - // need to look it up for a direct peer payment. - m.localAcceptedSellQuotes.Store(msg.ShortChannelId(), *msg) + err := m.orderHandler.RegisterAssetPurchasePolicy(*msg) + if err != nil { + return fmt.Errorf("error registering asset purchase "+ + "policy: %w", err) + } } // Send the outgoing message to the peer. @@ -842,10 +814,11 @@ func (m *Manager) PeerAcceptedBuyQuotes() BuyAcceptMap { // Returning the map directly is not thread safe. We will therefore // create a copy. buyQuotesCopy := make(map[SerialisedScid]rfqmsg.BuyAccept) - m.peerAcceptedBuyQuotes.ForEach( + m.orderHandler.peerAcceptedBuyQuotes.ForEach( func(scid SerialisedScid, accept rfqmsg.BuyAccept) error { if time.Now().After(accept.AssetRate.Expiry) { - m.peerAcceptedBuyQuotes.Delete(scid) + //nolint:lll + m.orderHandler.peerAcceptedBuyQuotes.Delete(scid) return nil } @@ -864,10 +837,11 @@ func (m *Manager) PeerAcceptedSellQuotes() SellAcceptMap { // Returning the map directly is not thread safe. We will therefore // create a copy. sellQuotesCopy := make(map[SerialisedScid]rfqmsg.SellAccept) - m.peerAcceptedSellQuotes.ForEach( + m.orderHandler.peerAcceptedSellQuotes.ForEach( func(scid SerialisedScid, accept rfqmsg.SellAccept) error { if time.Now().After(accept.AssetRate.Expiry) { - m.peerAcceptedSellQuotes.Delete(scid) + //nolint:lll + m.orderHandler.peerAcceptedSellQuotes.Delete(scid) return nil } @@ -886,10 +860,11 @@ func (m *Manager) LocalAcceptedBuyQuotes() BuyAcceptMap { // Returning the map directly is not thread safe. We will therefore // create a copy. buyQuotesCopy := make(map[SerialisedScid]rfqmsg.BuyAccept) - m.localAcceptedBuyQuotes.ForEach( + m.orderHandler.localAcceptedBuyQuotes.ForEach( func(scid SerialisedScid, accept rfqmsg.BuyAccept) error { if time.Now().After(accept.AssetRate.Expiry) { - m.localAcceptedBuyQuotes.Delete(scid) + //nolint:lll + m.orderHandler.localAcceptedBuyQuotes.Delete(scid) return nil } @@ -908,10 +883,11 @@ func (m *Manager) LocalAcceptedSellQuotes() SellAcceptMap { // Returning the map directly is not thread safe. We will therefore // create a copy. sellQuotesCopy := make(map[SerialisedScid]rfqmsg.SellAccept) - m.localAcceptedSellQuotes.ForEach( + m.orderHandler.localAcceptedSellQuotes.ForEach( func(scid SerialisedScid, accept rfqmsg.SellAccept) error { if time.Now().After(accept.AssetRate.Expiry) { - m.localAcceptedSellQuotes.Delete(scid) + //nolint:lll + m.orderHandler.localAcceptedSellQuotes.Delete(scid) return nil } diff --git a/rfq/manager_test.go b/rfq/manager_test.go index f6b95b066..0006cce1e 100644 --- a/rfq/manager_test.go +++ b/rfq/manager_test.go @@ -11,6 +11,7 @@ import ( "github.com/lightninglabs/taproot-assets/address" "github.com/lightninglabs/taproot-assets/asset" "github.com/lightninglabs/taproot-assets/proof" + "github.com/lightninglabs/taproot-assets/rfqmsg" tpchmsg "github.com/lightninglabs/taproot-assets/tapchannelmsg" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/routing/route" @@ -59,6 +60,26 @@ var ( peer2 = route.Vertex{77} ) +type mockPolicyStore struct{} + +func (mockPolicyStore) StoreSalePolicy(context.Context, + rfqmsg.BuyAccept) error { + + return nil +} + +func (mockPolicyStore) StorePurchasePolicy(context.Context, + rfqmsg.SellAccept) error { + + return nil +} + +func (mockPolicyStore) FetchAcceptedQuotes(context.Context) ( + []rfqmsg.BuyAccept, []rfqmsg.SellAccept, error) { + + return nil, nil, nil +} + // GroupLookupMock mocks the GroupLookup interface that is required by the // rfq manager to check asset IDs against asset specifiers. type GroupLookupMock struct{} @@ -141,6 +162,7 @@ func assertComputeChannelAssetBalance(t *testing.T, mockGroupLookup := &GroupLookupMock{} cfg := ManagerCfg{ GroupLookup: mockGroupLookup, + PolicyStore: mockPolicyStore{}, } manager, err := NewManager(cfg) require.NoError(t, err) diff --git a/rfq/order.go b/rfq/order.go index 270a0f735..f499d75e9 100644 --- a/rfq/order.go +++ b/rfq/order.go @@ -88,6 +88,9 @@ type Policy interface { GenerateInterceptorResponse( lndclient.InterceptedHtlc) (*lndclient.InterceptedHtlcResponse, error) + + // QuoteID returns the RFQ ID if it exists, otherwise false. + QuoteID() (rfqmsg.ID, bool) } // AssetSalePolicy is a struct that holds the terms which determine whether an @@ -325,6 +328,11 @@ func (c *AssetSalePolicy) GenerateInterceptorResponse( }, nil } +// QuoteID returns the RFQ identifier that originated this policy. +func (c *AssetSalePolicy) QuoteID() (rfqmsg.ID, bool) { + return c.AcceptedQuoteId, true +} + // Ensure that AssetSalePolicy implements the Policy interface. var _ Policy = (*AssetSalePolicy)(nil) @@ -537,6 +545,11 @@ func (c *AssetPurchasePolicy) GenerateInterceptorResponse( }, nil } +// QuoteID returns the RFQ identifier that originated this policy. +func (c *AssetPurchasePolicy) QuoteID() (rfqmsg.ID, bool) { + return c.AcceptedQuoteId, true +} + // Ensure that AssetPurchasePolicy implements the Policy interface. var _ Policy = (*AssetPurchasePolicy)(nil) @@ -674,6 +687,13 @@ func (a *AssetForwardPolicy) GenerateInterceptorResponse( }, nil } +// QuoteID returns the RFQ identifier that originated this policy. +// +// Forward policies do not map to a single quote, hence we return false. +func (a *AssetForwardPolicy) QuoteID() (rfqmsg.ID, bool) { + return rfqmsg.ID{}, false +} + // Ensure that AssetForwardPolicy implements the Policy interface. var _ Policy = (*AssetForwardPolicy)(nil) @@ -707,6 +727,9 @@ type OrderHandlerCfg struct { // that is encapsulated in the init and reestablish peer messages. This // helps us communicate custom feature bits with our peer. AuxChanNegotiator *tapfeatures.AuxChannelNegotiator + + // PolicyStore persists agreed RFQ policies. + PolicyStore PolicyStore } // OrderHandler orchestrates management of accepted quote bundles. It monitors @@ -719,10 +742,41 @@ type OrderHandler struct { // cfg holds the configuration parameters for the RFQ order handler. cfg OrderHandlerCfg + // policyStore provides persistence for agreed policies. + policyStore PolicyStore + // policies is a map of serialised short channel IDs (SCIDs) to // associated asset transaction policies. policies lnutils.SyncMap[SerialisedScid, Policy] + // peerAcceptedBuyQuotes holds buy quotes for assets that our node has + // requested and that have been accepted by peer nodes. These quotes are + // exclusively used by our node for the acquisition of assets, as they + // represent agreed-upon terms for purchase transactions with our peers. + peerAcceptedBuyQuotes lnutils.SyncMap[SerialisedScid, rfqmsg.BuyAccept] + + // peerAcceptedSellQuotes holds sell quotes for assets that our node has + // requested and that have been accepted by peer nodes. These quotes are + // exclusively used by our node for the sale of assets, as they + // represent agreed-upon terms for sale transactions with our peers. + peerAcceptedSellQuotes lnutils.SyncMap[ + SerialisedScid, rfqmsg.SellAccept, + ] + + // localAcceptedBuyQuotes holds buy quotes for assets that our node has + // accepted and that have been requested by peer nodes. These quotes are + // exclusively used by our node for the acquisition of assets, as they + // represent agreed-upon terms for purchase transactions with our peers. + localAcceptedBuyQuotes lnutils.SyncMap[SerialisedScid, rfqmsg.BuyAccept] + + // localAcceptedSellQuotes holds sell quotes for assets that our node + // has accepted and that have been requested by peer nodes. These quotes + // are exclusively used by our node for the sale of assets, as they + // represent agreed-upon terms for sale transactions with our peers. + localAcceptedSellQuotes lnutils.SyncMap[ + SerialisedScid, rfqmsg.SellAccept, + ] + // htlcToPolicy maps an HTLC circuit key to the policy that applies to // it. We need this map because for failed HTLCs we don't have the RFQ // data available, so we need to cache this info. @@ -736,8 +790,9 @@ type OrderHandler struct { // NewOrderHandler creates a new struct instance. func NewOrderHandler(cfg OrderHandlerCfg) (*OrderHandler, error) { return &OrderHandler{ - cfg: cfg, - policies: lnutils.SyncMap[SerialisedScid, Policy]{}, + cfg: cfg, + policyStore: cfg.PolicyStore, + policies: lnutils.SyncMap[SerialisedScid, Policy]{}, ContextGuard: &fn.ContextGuard{ DefaultTimeout: DefaultTimeout, Quit: make(chan struct{}), @@ -849,7 +904,6 @@ func (h *OrderHandler) mainEventLoop() { log.Debug("Cleaning up any stale policy from the " + "order handler") h.cleanupStalePolicies() - case <-h.Quit: log.Debug("Received quit signal. Stopping negotiator " + "event loop") @@ -915,9 +969,17 @@ func (h *OrderHandler) subscribeHtlcs(ctx context.Context) error { } // Start starts the service. -func (h *OrderHandler) Start() error { +func (h *OrderHandler) Start(ctx context.Context) error { var startErr error + h.startOnce.Do(func() { + startErr = h.restorePersistedPolicies(ctx) + if startErr != nil { + log.Errorf("error restoring persisted RFQ "+ + "policies: %w", startErr) + return + } + // Start the main event loop in a separate goroutine. h.Wg.Add(1) go func() { @@ -957,7 +1019,9 @@ func (h *OrderHandler) Start() error { // RegisterAssetSalePolicy generates and registers an asset sale policy with the // order handler. This function takes an outgoing buy accept message as an // argument. -func (h *OrderHandler) RegisterAssetSalePolicy(buyAccept rfqmsg.BuyAccept) { +func (h *OrderHandler) RegisterAssetSalePolicy( + buyAccept rfqmsg.BuyAccept) error { + log.Debugf("Order handler is registering an asset sale policy given a "+ "buy accept message: %s", buyAccept.String()) @@ -965,20 +1029,96 @@ func (h *OrderHandler) RegisterAssetSalePolicy(buyAccept rfqmsg.BuyAccept) { buyAccept, h.cfg.NoOpHTLCs, h.cfg.AuxChanNegotiator, ) + err := h.storeSalePolicy(buyAccept) + if err != nil { + return fmt.Errorf("unable to persist asset sale policy "+ + "(id=%x): %w", + buyAccept.ID[:], err) + } + h.policies.Store(policy.AcceptedQuoteId.Scid(), policy) + + // We want to store that we accepted the buy quote, in case we + // need to look it up for a direct peer payment. + h.localAcceptedBuyQuotes.Store(buyAccept.ShortChannelId(), buyAccept) + + return nil } // RegisterAssetPurchasePolicy generates and registers an asset buy policy with the // order handler. This function takes an incoming sell accept message as an // argument. func (h *OrderHandler) RegisterAssetPurchasePolicy( - sellAccept rfqmsg.SellAccept) { + sellAccept rfqmsg.SellAccept) error { log.Debugf("Order handler is registering an asset buy policy given a "+ "sell accept message: %s", sellAccept.String()) policy := NewAssetPurchasePolicy(sellAccept) + + err := h.storePurchasePolicy(sellAccept) + if err != nil { + return fmt.Errorf("unable to persist asset purchase policy "+ + "(id=%x): %w", sellAccept.ID[:], err) + } + h.policies.Store(policy.scid, policy) + + // We want to store that we accepted the sell quote, in case we + // need to look it up for a direct peer payment. + h.localAcceptedSellQuotes.Store(sellAccept.ShortChannelId(), sellAccept) + + return nil +} + +// storeSalePolicy stores a sale policy in the policy store. +func (h *OrderHandler) storeSalePolicy(buyAccept rfqmsg.BuyAccept) error { + ctx, cancel := h.WithCtxQuit() + defer cancel() + + return h.policyStore.StoreSalePolicy(ctx, buyAccept) +} + +// storePurchasePolicy stores a purchase policy in the policy store. +func (h *OrderHandler) storePurchasePolicy( + sellAccept rfqmsg.SellAccept) error { + + ctx, cancel := h.WithCtxQuit() + defer cancel() + + return h.policyStore.StorePurchasePolicy(ctx, sellAccept) +} + +// restorePersistedPolicies restores persisted policies from the policy store. +func (h *OrderHandler) restorePersistedPolicies(ctx context.Context) error { + buyAccepts, sellAccepts, err := h.cfg.PolicyStore.FetchAcceptedQuotes( + ctx, + ) + if err != nil { + return fmt.Errorf("error fetching persisted policies: %w", err) + } + + for _, accept := range buyAccepts { + policy := NewAssetSalePolicy( + accept, h.cfg.NoOpHTLCs, h.cfg.AuxChanNegotiator, + ) + h.policies.Store(policy.AcceptedQuoteId.Scid(), policy) + } + + for _, accept := range sellAccepts { + policy := NewAssetPurchasePolicy(accept) + h.policies.Store(policy.scid, policy) + } + + for _, accept := range buyAccepts { + h.localAcceptedBuyQuotes.Store(accept.ShortChannelId(), accept) + } + + for _, accept := range sellAccepts { + h.localAcceptedSellQuotes.Store(accept.ShortChannelId(), accept) + } + + return nil } // fetchPolicy fetches a policy which is relevant to a given HTLC. If a policy @@ -1049,12 +1189,10 @@ func (h *OrderHandler) fetchPolicy(htlc lndclient.InterceptedHtlc) (Policy, outgoingPolicy := outPolicy if incomingPolicy.HasExpired() { - scid := incomingPolicy.Scid() - h.policies.Delete(SerialisedScid(scid)) + h.policies.Delete(inScid) } if outgoingPolicy.HasExpired() { - scid := outgoingPolicy.Scid() - h.policies.Delete(SerialisedScid(scid)) + h.policies.Delete(SerialisedScid(outgoingPolicy.Scid())) } // If either the incoming or outgoing policy has expired, we diff --git a/rfq/policy_store.go b/rfq/policy_store.go new file mode 100644 index 000000000..3ab85d9a4 --- /dev/null +++ b/rfq/policy_store.go @@ -0,0 +1,20 @@ +package rfq + +import ( + "context" + + "github.com/lightninglabs/taproot-assets/rfqmsg" +) + +// PolicyStore abstracts persistence of RFQ policies. +type PolicyStore interface { + // StoreSalePolicy stores an asset sale policy. + StoreSalePolicy(ctx context.Context, accept rfqmsg.BuyAccept) error + + // StorePurchasePolicy stores an asset purchase policy. + StorePurchasePolicy(ctx context.Context, accept rfqmsg.SellAccept) error + + // FetchAcceptedQuotes fetches all accepted buy and sell quotes. + FetchAcceptedQuotes(ctx context.Context) ([]rfqmsg.BuyAccept, + []rfqmsg.SellAccept, error) +} diff --git a/tapcfg/server.go b/tapcfg/server.go index abdfc29a4..6e092c20b 100644 --- a/tapcfg/server.go +++ b/tapcfg/server.go @@ -114,6 +114,13 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, lndservices.WithPsbtMaxFeeRatio(cfg.Wallet.PsbtMaxFeeRatio), ) + rfqPolicyDB := tapdb.NewTransactionExecutor( + db, func(tx *sql.Tx) tapdb.RfqPolicyStore { + return db.WithTx(tx) + }, + ) + policyStore := tapdb.NewPersistedPolicyStore(rfqPolicyDB) + // Create a block header cache with default configuration. headerCache, err := lndservices.NewBlockHeaderCache( lndservices.DefaultBlockHeaderCacheConfig(), @@ -510,6 +517,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, SendPriceHint: rfqCfg.SendPriceHint, SendPeerId: rfqCfg.PriceOracleSendPeerId, NoOpHTLCs: cfg.Channel.NoopHTLCs, + PolicyStore: policyStore, ErrChan: mainErrChan, }) if err != nil { diff --git a/tapdb/migrations.go b/tapdb/migrations.go index 4ff83484d..ab9d54edb 100644 --- a/tapdb/migrations.go +++ b/tapdb/migrations.go @@ -24,7 +24,7 @@ const ( // daemon. // // NOTE: This MUST be updated when a new migration is added. - LatestMigrationVersion = 48 + LatestMigrationVersion = 49 ) // DatabaseBackend is an interface that contains all methods our different diff --git a/tapdb/rfq_policies.go b/tapdb/rfq_policies.go new file mode 100644 index 000000000..6e13562e0 --- /dev/null +++ b/tapdb/rfq_policies.go @@ -0,0 +1,533 @@ +package tapdb + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/btcsuite/btcd/btcec/v2" + "github.com/lightninglabs/taproot-assets/asset" + "github.com/lightninglabs/taproot-assets/fn" + "github.com/lightninglabs/taproot-assets/rfqmath" + "github.com/lightninglabs/taproot-assets/rfqmsg" + "github.com/lightninglabs/taproot-assets/tapdb/sqlc" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" +) + +// RfqPolicyType denotes the type of a persisted RFQ policy. +type RfqPolicyType string + +const ( + // RfqPolicyTypeAssetSale identifies an asset sale policy. + RfqPolicyTypeAssetSale RfqPolicyType = "asset_sale" + + // RfqPolicyTypeAssetPurchase identifies an asset purchase policy. + RfqPolicyTypeAssetPurchase RfqPolicyType = "asset_purchase" +) + +// String converts the policy type to its string representation. +func (t RfqPolicyType) String() string { + return string(t) +} + +// rfqPolicy is the database model for an RFQ policy. +type rfqPolicy struct { + PolicyType RfqPolicyType + Scid uint64 + RfqID [32]byte + Peer [33]byte + AssetID *[32]byte + AssetGroupKey *[33]byte + RateCoefficient []byte + RateScale uint8 + ExpiryUnix uint64 + MaxOutAssetAmt *uint64 + PaymentMaxMsat *int64 + RequestAssetMaxAmt *uint64 + RequestPaymentMaxMsat *int64 + PriceOracleMetadata string + RequestVersion *uint32 + AgreedAt time.Time +} + +// RfqPolicyStore is the database interface for RFQ policies. +type RfqPolicyStore interface { + InsertRfqPolicy(context.Context, + sqlc.InsertRfqPolicyParams) (int64, error) + + FetchActiveRfqPolicies(context.Context) ([]sqlc.RfqPolicy, error) +} + +// BatchedRfqPolicyStore supports batched database operations. +type BatchedRfqPolicyStore interface { + RfqPolicyStore + BatchedTx[RfqPolicyStore] +} + +// PersistedPolicyStore offers helpers to persist and load RFQ policies. +type PersistedPolicyStore struct { + db BatchedRfqPolicyStore +} + +// NewPersistedPolicyStore creates a new policy persistence helper. +func NewPersistedPolicyStore(db BatchedRfqPolicyStore) *PersistedPolicyStore { + return &PersistedPolicyStore{ + db: db, + } +} + +// StoreSalePolicy persists a buy-accept policy. +func (s *PersistedPolicyStore) StoreSalePolicy(ctx context.Context, + accept rfqmsg.BuyAccept) error { + + assetID, groupKey := specifierPointers(accept.Request.AssetSpecifier) + rateBytes := coefficientBytes(accept.AssetRate.Rate) + expiry := accept.AssetRate.Expiry.UTC() + + record := rfqPolicy{ + PolicyType: RfqPolicyTypeAssetSale, + Scid: uint64(accept.ShortChannelId()), + RfqID: rfqIDArray(accept.ID), + Peer: serializePeer(accept.Peer), + AssetID: assetID, + AssetGroupKey: groupKey, + RateCoefficient: rateBytes, + RateScale: accept.AssetRate.Rate.Scale, + ExpiryUnix: uint64(expiry.Unix()), + MaxOutAssetAmt: ptrUint64(accept.Request.AssetMaxAmt), + RequestAssetMaxAmt: ptrUint64(accept.Request.AssetMaxAmt), + PriceOracleMetadata: accept.Request.PriceOracleMetadata, + RequestVersion: ptrUint32(uint32(accept.Request.Version)), + AgreedAt: time.Now().UTC(), + } + + return s.storePolicy(ctx, record) +} + +// StorePurchasePolicy persists a sell-accept policy. +func (s *PersistedPolicyStore) StorePurchasePolicy(ctx context.Context, + acpt rfqmsg.SellAccept) error { + + assetID, groupKey := specifierPointers(acpt.Request.AssetSpecifier) + rateBytes := coefficientBytes(acpt.AssetRate.Rate) + expiry := acpt.AssetRate.Expiry.UTC() + paymentMax := int64(acpt.Request.PaymentMaxAmt) + + record := rfqPolicy{ + PolicyType: RfqPolicyTypeAssetPurchase, + Scid: uint64(acpt.ShortChannelId()), + RfqID: rfqIDArray(acpt.ID), + Peer: serializePeer(acpt.Peer), + AssetID: assetID, + AssetGroupKey: groupKey, + RateCoefficient: rateBytes, + RateScale: acpt.AssetRate.Rate.Scale, + ExpiryUnix: uint64(expiry.Unix()), + PaymentMaxMsat: ptrInt64(paymentMax), + RequestPaymentMaxMsat: ptrInt64(paymentMax), + PriceOracleMetadata: acpt.Request.PriceOracleMetadata, + RequestVersion: ptrUint32(uint32(acpt.Request.Version)), + AgreedAt: time.Now().UTC(), + } + + return s.storePolicy(ctx, record) +} + +func (s *PersistedPolicyStore) storePolicy(ctx context.Context, + policy rfqPolicy) error { + + writeOpts := WriteTxOption() + return s.db.ExecTx(ctx, writeOpts, func(q RfqPolicyStore) error { + _, err := q.InsertRfqPolicy(ctx, newInsertParams(policy)) + if err != nil { + return fmt.Errorf("error inserting RFQ policy: %w", err) + } + + return nil + }) +} + +// FetchAcceptedQuotes retrieves all non-expired policies from the database and +// returns them as buy and sell accepts. +func (s *PersistedPolicyStore) FetchAcceptedQuotes(ctx context.Context) ( + []rfqmsg.BuyAccept, []rfqmsg.SellAccept, error) { + + readOpts := ReadTxOption() + var ( + buyAccepts []rfqmsg.BuyAccept + sellAccepts []rfqmsg.SellAccept + ) + now := time.Now().UTC() + + err := s.db.ExecTx(ctx, readOpts, func(q RfqPolicyStore) error { + rows, err := q.FetchActiveRfqPolicies(ctx) + if err != nil { + return fmt.Errorf("error fetching policies: %w", err) + } + + for _, row := range rows { + policy := policyFromRow(row) + + // Skip expired entries. + expiry := time.Unix(int64(policy.ExpiryUnix), 0).UTC() + if now.After(expiry) { + continue + } + + switch policy.PolicyType { + case RfqPolicyTypeAssetSale: + accept, err := buyAcceptFromStored(policy) + if err != nil { + return fmt.Errorf("error restoring "+ + "sale policy: %w", err) + } + buyAccepts = append(buyAccepts, accept) + + case RfqPolicyTypeAssetPurchase: + accept, err := sellAcceptFromStored(policy) + if err != nil { + return fmt.Errorf("error restoring "+ + "purchase policy: %w", err) + } + sellAccepts = append(sellAccepts, accept) + + default: + // This should never happen by assertion. + return fmt.Errorf("unknown policy type: %s", + policy.PolicyType) + } + } + + return nil + }) + if err != nil { + return nil, nil, err + } + + return buyAccepts, sellAccepts, nil +} + +func newInsertParams(policy rfqPolicy) sqlc.InsertRfqPolicyParams { + params := sqlc.InsertRfqPolicyParams{ + PolicyType: policy.PolicyType.String(), + Scid: int64(policy.Scid), + RfqID: policy.RfqID[:], + Peer: policy.Peer[:], + RateCoefficient: append([]byte(nil), policy.RateCoefficient...), + RateScale: int32(policy.RateScale), + Expiry: int64(policy.ExpiryUnix), + AgreedAt: policy.AgreedAt.Unix(), + } + + if policy.AssetID != nil { + params.AssetID = policy.AssetID[:] + } + + if policy.AssetGroupKey != nil { + params.AssetGroupKey = policy.AssetGroupKey[:] + } + + if policy.MaxOutAssetAmt != nil { + params.MaxOutAssetAmt = sql.NullInt64{ + Int64: int64(*policy.MaxOutAssetAmt), + Valid: true, + } + params.RequestAssetMaxAmt = sql.NullInt64{ + Int64: int64(*policy.MaxOutAssetAmt), + Valid: true, + } + } + + if policy.PaymentMaxMsat != nil { + params.PaymentMaxMsat = sql.NullInt64{ + Int64: *policy.PaymentMaxMsat, + Valid: true, + } + params.RequestPaymentMaxMsat = sql.NullInt64{ + Int64: *policy.PaymentMaxMsat, + Valid: true, + } + } + + if policy.RequestAssetMaxAmt != nil { + params.RequestAssetMaxAmt = sql.NullInt64{ + Int64: int64(*policy.RequestAssetMaxAmt), + Valid: true, + } + } + + if policy.RequestPaymentMaxMsat != nil { + params.RequestPaymentMaxMsat = sql.NullInt64{ + Int64: *policy.RequestPaymentMaxMsat, + Valid: true, + } + } + + if policy.PriceOracleMetadata != "" { + params.PriceOracleMetadata = sql.NullString{ + String: policy.PriceOracleMetadata, + Valid: true, + } + } + + if policy.RequestVersion != nil { + params.RequestVersion = sql.NullInt32{ + Int32: int32(*policy.RequestVersion), + Valid: true, + } + } + + return params +} + +func policyFromRow(row sqlc.RfqPolicy) rfqPolicy { + var ( + rfqID [32]byte + peer [33]byte + ) + copy(rfqID[:], row.RfqID) + copy(peer[:], row.Peer) + + var assetIDPtr *[32]byte + if len(row.AssetID) > 0 { + var id [32]byte + copy(id[:], row.AssetID) + assetIDPtr = &id + } + + var groupKeyPtr *[33]byte + if len(row.AssetGroupKey) > 0 { + var key [33]byte + copy(key[:], row.AssetGroupKey) + groupKeyPtr = &key + } + + policy := rfqPolicy{ + PolicyType: RfqPolicyType(row.PolicyType), + Scid: uint64(row.Scid), + RfqID: rfqID, + Peer: peer, + AssetID: assetIDPtr, + AssetGroupKey: groupKeyPtr, + RateCoefficient: append([]byte(nil), row.RateCoefficient...), + RateScale: uint8(row.RateScale), + ExpiryUnix: uint64(row.Expiry), + PriceOracleMetadata: func() string { + if row.PriceOracleMetadata.Valid { + return row.PriceOracleMetadata.String + } + return "" + }(), + RequestVersion: func() *uint32 { + if row.RequestVersion.Valid { + val := uint32(row.RequestVersion.Int32) + return &val + } + return nil + }(), + AgreedAt: time.Unix(row.AgreedAt, 0).UTC(), + } + + if row.MaxOutAssetAmt.Valid { + amt := uint64(row.MaxOutAssetAmt.Int64) + policy.MaxOutAssetAmt = &amt + } + + if row.PaymentMaxMsat.Valid { + val := row.PaymentMaxMsat.Int64 + policy.PaymentMaxMsat = &val + } + + if row.RequestAssetMaxAmt.Valid { + amt := uint64(row.RequestAssetMaxAmt.Int64) + policy.RequestAssetMaxAmt = &amt + } + + if row.RequestPaymentMaxMsat.Valid { + val := row.RequestPaymentMaxMsat.Int64 + policy.RequestPaymentMaxMsat = &val + } + + return policy +} + +func specifierPointers(spec asset.Specifier) (*[32]byte, *[33]byte) { + var assetIDPtr *[32]byte + if id := spec.UnwrapIdToPtr(); id != nil { + assetID := new([32]byte) + copy(assetID[:], id[:]) + assetIDPtr = assetID + } + + var groupKeyPtr *[33]byte + if key := spec.UnwrapGroupKeyToPtr(); key != nil { + groupKey := new([33]byte) + copy(groupKey[:], key.SerializeCompressed()) + groupKeyPtr = groupKey + } + + return assetIDPtr, groupKeyPtr +} + +func coefficientBytes(rate rfqmath.BigIntFixedPoint) []byte { + coeff := rate.Coefficient.Bytes() + if len(coeff) == 0 { + return []byte{0} + } + + return append([]byte(nil), coeff...) +} + +func serializePeer(peer route.Vertex) [33]byte { + var peerBytes [33]byte + copy(peerBytes[:], peer[:]) + return peerBytes +} + +func rfqIDArray(id rfqmsg.ID) [32]byte { + var idBytes [32]byte + copy(idBytes[:], id[:]) + return idBytes +} + +func ptrUint64(v uint64) *uint64 { + val := v + return &val +} + +func ptrInt64(v int64) *int64 { + val := v + return &val +} + +func ptrUint32(v uint32) *uint32 { + val := v + return &val +} + +func buyAcceptFromStored(row rfqPolicy) (rfqmsg.BuyAccept, error) { + spec, err := assetSpecifierFromStored(row) + if err != nil { + return rfqmsg.BuyAccept{}, err + } + + rate := rateFromStored(row) + + vertex := vertexFromBytes(row.Peer) + id := rfqIDFromBytes(row.RfqID) + + version := rfqmsg.V1 + if row.RequestVersion != nil { + version = rfqmsg.WireMsgDataVersion(*row.RequestVersion) + } + + assetMax := row.RequestAssetMaxAmt + if assetMax == nil { + assetMax = row.MaxOutAssetAmt + } + + expiry := time.Unix(int64(row.ExpiryUnix), 0).UTC() + + request := rfqmsg.BuyRequest{ + Peer: vertex, + Version: version, + ID: id, + AssetSpecifier: spec, + AssetMaxAmt: *assetMax, + AssetRateHint: fn.None[rfqmsg.AssetRate](), + PriceOracleMetadata: row.PriceOracleMetadata, + } + + return rfqmsg.BuyAccept{ + Peer: vertex, + Request: request, + Version: rfqmsg.V1, + ID: id, + AssetRate: rfqmsg.NewAssetRate(rate, expiry), + }, nil +} + +func sellAcceptFromStored(row rfqPolicy) (rfqmsg.SellAccept, error) { + spec, err := assetSpecifierFromStored(row) + if err != nil { + return rfqmsg.SellAccept{}, err + } + + rate := rateFromStored(row) + + vertex := vertexFromBytes(row.Peer) + id := rfqIDFromBytes(row.RfqID) + + version := rfqmsg.V1 + if row.RequestVersion != nil { + version = rfqmsg.WireMsgDataVersion(*row.RequestVersion) + } + + paymentPtr := row.RequestPaymentMaxMsat + if paymentPtr == nil { + paymentPtr = row.PaymentMaxMsat + } + + expiry := time.Unix(int64(row.ExpiryUnix), 0).UTC() + + request := rfqmsg.SellRequest{ + Peer: vertex, + Version: version, + ID: id, + AssetSpecifier: spec, + PaymentMaxAmt: lnwire.MilliSatoshi(*paymentPtr), + AssetRateHint: fn.None[rfqmsg.AssetRate](), + PriceOracleMetadata: row.PriceOracleMetadata, + } + + return rfqmsg.SellAccept{ + Peer: vertex, + Request: request, + Version: rfqmsg.V1, + ID: id, + AssetRate: rfqmsg.NewAssetRate(rate, expiry), + }, nil +} + +func assetSpecifierFromStored(row rfqPolicy) (asset.Specifier, error) { + var idPtr *asset.ID + if row.AssetID != nil { + var id asset.ID + copy(id[:], row.AssetID[:]) + idPtr = &id + } + + var groupKey *btcec.PublicKey + if row.AssetGroupKey != nil { + key, err := btcec.ParsePubKey(row.AssetGroupKey[:]) + if err != nil { + return asset.Specifier{}, fmt.Errorf("error parsing "+ + "group key: %w", err) + } + groupKey = key + } + + return asset.NewSpecifier(idPtr, groupKey, nil, true) +} + +func rateFromStored(row rfqPolicy) rfqmath.BigIntFixedPoint { + coeff := rfqmath.BigInt{}.FromBytes(row.RateCoefficient) + return rfqmath.BigIntFixedPoint{ + Coefficient: coeff, + Scale: row.RateScale, + } +} + +func vertexFromBytes(raw [33]byte) route.Vertex { + var vertex route.Vertex + copy(vertex[:], raw[:]) + return vertex +} + +func rfqIDFromBytes(raw [32]byte) rfqmsg.ID { + var id rfqmsg.ID + copy(id[:], raw[:]) + return id +} diff --git a/tapdb/sqlc/migrations/000049_rfq_policies.down.sql b/tapdb/sqlc/migrations/000049_rfq_policies.down.sql new file mode 100644 index 000000000..dc219eb17 --- /dev/null +++ b/tapdb/sqlc/migrations/000049_rfq_policies.down.sql @@ -0,0 +1 @@ +DROP TABLE IF EXISTS rfq_policies; diff --git a/tapdb/sqlc/migrations/000049_rfq_policies.up.sql b/tapdb/sqlc/migrations/000049_rfq_policies.up.sql new file mode 100644 index 000000000..19c5c83ff --- /dev/null +++ b/tapdb/sqlc/migrations/000049_rfq_policies.up.sql @@ -0,0 +1,22 @@ +CREATE TABLE IF NOT EXISTS rfq_policies ( + id INTEGER PRIMARY KEY, + policy_type TEXT NOT NULL CHECK ( + policy_type IN ('asset_sale', 'asset_purchase') + ), + scid BIGINT NOT NULL, + rfq_id BLOB NOT NULL CHECK (length(rfq_id) = 32), + peer BLOB NOT NULL CHECK (length(peer) = 33), + asset_id BLOB CHECK (length(asset_id) = 32), + asset_group_key BLOB CHECK (length(asset_group_key) = 33), + rate_coefficient BLOB NOT NULL, + rate_scale INTEGER NOT NULL, + expiry BIGINT NOT NULL, + max_out_asset_amt BIGINT, + payment_max_msat BIGINT, + request_asset_max_amt BIGINT, + request_payment_max_msat BIGINT, + price_oracle_metadata TEXT, + request_version INTEGER, + agreed_at BIGINT NOT NULL, + UNIQUE(rfq_id) +); diff --git a/tapdb/sqlc/models.go b/tapdb/sqlc/models.go index 22a61d275..6dd37ef80 100644 --- a/tapdb/sqlc/models.go +++ b/tapdb/sqlc/models.go @@ -366,6 +366,26 @@ type ProofType struct { ProofType string } +type RfqPolicy struct { + ID int64 + PolicyType string + Scid int64 + RfqID []byte + Peer []byte + AssetID []byte + AssetGroupKey []byte + RateCoefficient []byte + RateScale int32 + Expiry int64 + MaxOutAssetAmt sql.NullInt64 + PaymentMaxMsat sql.NullInt64 + RequestAssetMaxAmt sql.NullInt64 + RequestPaymentMaxMsat sql.NullInt64 + PriceOracleMetadata sql.NullString + RequestVersion sql.NullInt32 + AgreedAt int64 +} + type ScriptKey struct { ScriptKeyID int64 InternalKeyID int64 diff --git a/tapdb/sqlc/querier.go b/tapdb/sqlc/querier.go index b6fb1fa32..8232e0648 100644 --- a/tapdb/sqlc/querier.go +++ b/tapdb/sqlc/querier.go @@ -47,6 +47,7 @@ type Querier interface { DeleteUniverseSupplyLeaf(ctx context.Context, arg DeleteUniverseSupplyLeafParams) error DeleteUniverseSupplyLeaves(ctx context.Context, namespaceRoot string) error DeleteUniverseSupplyRoot(ctx context.Context, namespaceRoot string) error + FetchActiveRfqPolicies(ctx context.Context) ([]RfqPolicy, error) FetchAddrEvent(ctx context.Context, id int64) (FetchAddrEventRow, error) FetchAddrEventByAddrKeyAndOutpoint(ctx context.Context, arg FetchAddrEventByAddrKeyAndOutpointParams) (FetchAddrEventByAddrKeyAndOutpointRow, error) FetchAddrEventOutputs(ctx context.Context, addrEventID int64) ([]FetchAddrEventOutputsRow, error) @@ -142,6 +143,7 @@ type Querier interface { InsertNewProofEvent(ctx context.Context, arg InsertNewProofEventParams) error InsertNewSyncEvent(ctx context.Context, arg InsertNewSyncEventParams) error InsertPassiveAsset(ctx context.Context, arg InsertPassiveAssetParams) error + InsertRfqPolicy(ctx context.Context, arg InsertRfqPolicyParams) (int64, error) InsertRootKey(ctx context.Context, arg InsertRootKeyParams) error InsertSupplyCommitTransition(ctx context.Context, arg InsertSupplyCommitTransitionParams) (int64, error) InsertSupplyCommitment(ctx context.Context, arg InsertSupplyCommitmentParams) (int64, error) diff --git a/tapdb/sqlc/queries/rfq.sql b/tapdb/sqlc/queries/rfq.sql new file mode 100644 index 000000000..2d22b6874 --- /dev/null +++ b/tapdb/sqlc/queries/rfq.sql @@ -0,0 +1,45 @@ +-- name: InsertRfqPolicy :one +INSERT INTO rfq_policies ( + policy_type, + scid, + rfq_id, + peer, + asset_id, + asset_group_key, + rate_coefficient, + rate_scale, + expiry, + max_out_asset_amt, + payment_max_msat, + request_asset_max_amt, + request_payment_max_msat, + price_oracle_metadata, + request_version, + agreed_at +) +VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, + $10, $11, $12, $13, $14, $15, $16 +) +RETURNING id; + +-- name: FetchActiveRfqPolicies :many +SELECT + id, + policy_type, + scid, + rfq_id, + peer, + asset_id, + asset_group_key, + rate_coefficient, + rate_scale, + expiry, + max_out_asset_amt, + payment_max_msat, + request_asset_max_amt, + request_payment_max_msat, + price_oracle_metadata, + request_version, + agreed_at +FROM rfq_policies; diff --git a/tapdb/sqlc/rfq.sql.go b/tapdb/sqlc/rfq.sql.go new file mode 100644 index 000000000..bb1577f78 --- /dev/null +++ b/tapdb/sqlc/rfq.sql.go @@ -0,0 +1,143 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.29.0 +// source: rfq.sql + +package sqlc + +import ( + "context" + "database/sql" +) + +const FetchActiveRfqPolicies = `-- name: FetchActiveRfqPolicies :many +SELECT + id, + policy_type, + scid, + rfq_id, + peer, + asset_id, + asset_group_key, + rate_coefficient, + rate_scale, + expiry, + max_out_asset_amt, + payment_max_msat, + request_asset_max_amt, + request_payment_max_msat, + price_oracle_metadata, + request_version, + agreed_at +FROM rfq_policies +` + +func (q *Queries) FetchActiveRfqPolicies(ctx context.Context) ([]RfqPolicy, error) { + rows, err := q.db.QueryContext(ctx, FetchActiveRfqPolicies) + if err != nil { + return nil, err + } + defer rows.Close() + var items []RfqPolicy + for rows.Next() { + var i RfqPolicy + if err := rows.Scan( + &i.ID, + &i.PolicyType, + &i.Scid, + &i.RfqID, + &i.Peer, + &i.AssetID, + &i.AssetGroupKey, + &i.RateCoefficient, + &i.RateScale, + &i.Expiry, + &i.MaxOutAssetAmt, + &i.PaymentMaxMsat, + &i.RequestAssetMaxAmt, + &i.RequestPaymentMaxMsat, + &i.PriceOracleMetadata, + &i.RequestVersion, + &i.AgreedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const InsertRfqPolicy = `-- name: InsertRfqPolicy :one +INSERT INTO rfq_policies ( + policy_type, + scid, + rfq_id, + peer, + asset_id, + asset_group_key, + rate_coefficient, + rate_scale, + expiry, + max_out_asset_amt, + payment_max_msat, + request_asset_max_amt, + request_payment_max_msat, + price_oracle_metadata, + request_version, + agreed_at +) +VALUES ( + $1, $2, $3, $4, $5, $6, $7, $8, $9, + $10, $11, $12, $13, $14, $15, $16 +) +RETURNING id +` + +type InsertRfqPolicyParams struct { + PolicyType string + Scid int64 + RfqID []byte + Peer []byte + AssetID []byte + AssetGroupKey []byte + RateCoefficient []byte + RateScale int32 + Expiry int64 + MaxOutAssetAmt sql.NullInt64 + PaymentMaxMsat sql.NullInt64 + RequestAssetMaxAmt sql.NullInt64 + RequestPaymentMaxMsat sql.NullInt64 + PriceOracleMetadata sql.NullString + RequestVersion sql.NullInt32 + AgreedAt int64 +} + +func (q *Queries) InsertRfqPolicy(ctx context.Context, arg InsertRfqPolicyParams) (int64, error) { + row := q.db.QueryRowContext(ctx, InsertRfqPolicy, + arg.PolicyType, + arg.Scid, + arg.RfqID, + arg.Peer, + arg.AssetID, + arg.AssetGroupKey, + arg.RateCoefficient, + arg.RateScale, + arg.Expiry, + arg.MaxOutAssetAmt, + arg.PaymentMaxMsat, + arg.RequestAssetMaxAmt, + arg.RequestPaymentMaxMsat, + arg.PriceOracleMetadata, + arg.RequestVersion, + arg.AgreedAt, + ) + var id int64 + err := row.Scan(&id) + return id, err +} diff --git a/tapdb/sqlc/schemas/generated_schema.sql b/tapdb/sqlc/schemas/generated_schema.sql index 0decc4bf0..338c0a07d 100644 --- a/tapdb/sqlc/schemas/generated_schema.sql +++ b/tapdb/sqlc/schemas/generated_schema.sql @@ -782,6 +782,29 @@ CREATE TABLE proof_types ( proof_type TEXT PRIMARY KEY ); +CREATE TABLE rfq_policies ( + id INTEGER PRIMARY KEY, + policy_type TEXT NOT NULL CHECK ( + policy_type IN ('asset_sale', 'asset_purchase') + ), + scid BIGINT NOT NULL, + rfq_id BLOB NOT NULL CHECK (length(rfq_id) = 32), + peer BLOB NOT NULL CHECK (length(peer) = 33), + asset_id BLOB CHECK (length(asset_id) = 32), + asset_group_key BLOB CHECK (length(asset_group_key) = 33), + rate_coefficient BLOB NOT NULL, + rate_scale INTEGER NOT NULL, + expiry BIGINT NOT NULL, + max_out_asset_amt BIGINT, + payment_max_msat BIGINT, + request_asset_max_amt BIGINT, + request_payment_max_msat BIGINT, + price_oracle_metadata TEXT, + request_version INTEGER, + agreed_at BIGINT NOT NULL, + UNIQUE(rfq_id) +); + CREATE TABLE script_keys ( script_key_id INTEGER PRIMARY KEY,