Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/bitcoin-sv/testcontainers-aerospike-go v0.2.2
github.com/bsv-blockchain/go-bt/v2 v2.5.1
github.com/bsv-blockchain/go-chaincfg v1.4.0
github.com/bsv-blockchain/go-subtree v1.1.2
github.com/bsv-blockchain/go-subtree v1.1.4
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd
github.com/btcsuite/goleveldb v1.0.0
github.com/centrifugal/centrifuge v0.33.2
Expand Down Expand Up @@ -61,7 +61,7 @@ require (
go.opentelemetry.io/otel/trace v1.38.0
go.uber.org/atomic v1.11.0
golang.org/x/crypto v0.45.0
golang.org/x/exp v0.0.0-20251113190631-e25ba8c21ef6
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39
golang.org/x/sync v0.18.0
golang.org/x/sys v0.38.0
golang.org/x/term v0.37.0
Expand Down Expand Up @@ -178,8 +178,8 @@ require (
github.com/bsv-blockchain/go-lockfree-queue v1.0.0
github.com/bsv-blockchain/go-p2p-message-bus v0.1.7
github.com/bsv-blockchain/go-safe-conversion v1.1.0
github.com/bsv-blockchain/go-sdk v1.2.11
github.com/bsv-blockchain/go-tx-map v1.2.0
github.com/bsv-blockchain/go-sdk v1.2.12
github.com/bsv-blockchain/go-tx-map v1.2.1
github.com/bsv-blockchain/go-wire v1.0.6
github.com/felixge/fgprof v0.9.5
github.com/gocarina/gocsv v0.0.0-20240520201108-78e41c74b4b1
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,12 @@ github.com/bsv-blockchain/go-p2p-message-bus v0.1.7 h1:+5Gi6oD5mKq6NfpqmdhaM/zMA
github.com/bsv-blockchain/go-p2p-message-bus v0.1.7/go.mod h1:jFJ9Fl7BOfkXlB5CfgZoWSrP613GoCA6TBW726rxTUE=
github.com/bsv-blockchain/go-safe-conversion v1.1.0 h1:EjyF+fDsmSK0AwbN2NnIzel/z0dIJcT64KvIwTbWHFg=
github.com/bsv-blockchain/go-safe-conversion v1.1.0/go.mod h1:KwO5HkH9S11kppAm7SedJhgaJnZbUMYRZalSq9fxLHQ=
github.com/bsv-blockchain/go-sdk v1.2.11 h1:SK8kDuDZNP3ubvx0AL0bR/I8tXWljJICyUsiF4y9ZkQ=
github.com/bsv-blockchain/go-sdk v1.2.11/go.mod h1:S+8iokWX2la9G4mzwHIeCvYkADRzcdfk1AprN0z5MDI=
github.com/bsv-blockchain/go-subtree v1.1.2 h1:evQ961Cku9wIrKA5qjvn2t0nfUL/7rBPqHth2eZRso4=
github.com/bsv-blockchain/go-subtree v1.1.2/go.mod h1:e+VXWba1DoKZ05LiBU0N6FKZl6cD12yUNE0SrjtJup8=
github.com/bsv-blockchain/go-tx-map v1.2.0 h1:HF8XTLrl5YGpEeWmsoO58w/XSNKJy49DYgrt/0EQHn4=
github.com/bsv-blockchain/go-tx-map v1.2.0/go.mod h1:sjsSHrl5HNT+0p1AeS/6CE7Ds4V4Kjn9PRBcKB3ozMc=
github.com/bsv-blockchain/go-sdk v1.2.12 h1:t/50ONqCTgumJH82YbQ8iqdo30ezIACyuFgvyHbkX9A=
github.com/bsv-blockchain/go-sdk v1.2.12/go.mod h1:1FWCWH+x6xc1kH9r6tuyRQqUomfrLBOQfdPesJZK/1k=
github.com/bsv-blockchain/go-subtree v1.1.4 h1:PcAqIHnKk5V0Smr5+A5khDW0YFReflok7hemAJ7FnmM=
github.com/bsv-blockchain/go-subtree v1.1.4/go.mod h1:c76Lw73NgEI0Rw2wY8z8iegJXklS2PVtzYxwhWHjXTo=
github.com/bsv-blockchain/go-tx-map v1.2.1 h1:uzKryYn4uMCR3ko6N71TyUrKfAnRvwG5Mdm1ufCStyo=
github.com/bsv-blockchain/go-tx-map v1.2.1/go.mod h1:4NsZBCM6bFNwYEc7J4C6jiNdm5aY/rtRVXqpG7PgbX4=
github.com/bsv-blockchain/go-wire v1.0.6 h1:rMVASfuXtrZB1ZaZEl+/tvmXfdPMf4KY8Pew7/VeyQ0=
github.com/bsv-blockchain/go-wire v1.0.6/go.mod h1:Jp6ekSmh/KZL1Gm/OPmbyMspNsficSgjXxlJ6bFD0Hs=
github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd h1:R/opQEbFEy9JGkIguV40SvRY1uliPX8ifOvi6ICsFCw=
Expand Down Expand Up @@ -1198,8 +1198,8 @@ golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1m
golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q=
golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20251113190631-e25ba8c21ef6 h1:zfMcR1Cs4KNuomFFgGefv5N0czO2XZpUbxGUy8i8ug0=
golang.org/x/exp v0.0.0-20251113190631-e25ba8c21ef6/go.mod h1:46edojNIoXTNOhySWIWdix628clX9ODXwPsQuG6hsK0=
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39 h1:DHNhtq3sNNzrvduZZIiFyXWOL9IWaDPHqTnLJp+rCBY=
golang.org/x/exp v0.0.0-20251125195548-87e1e737ad39/go.mod h1:46edojNIoXTNOhySWIWdix628clX9ODXwPsQuG6hsK0=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
Expand Down
1 change: 0 additions & 1 deletion services/blockassembly/BlockAssembler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,6 @@ func setupBlockAssemblyTest(t *testing.T) *baTestItems {
items.blockchainClient,
nil,
items.newSubtreeChan,
subtreeprocessor.WithBatcherSize(1),
)
require.NoError(t, err)

Expand Down
127 changes: 86 additions & 41 deletions services/blockassembly/subtreeprocessor/SubtreeProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,16 @@ type SubtreeProcessor struct {

// closeOnce ensures Close() is only executed once
closeOnce sync.Once

// dequeueBatchBuffer is a pre-allocated reusable slice for batch dequeuing
// This eliminates the allocation overhead of creating a new slice on each dequeue cycle
// Size is set smaller than batchSize to handle typical queue depths efficiently
dequeueBatchBuffer []*TxIDAndFee

// validItemsBuffer is a pre-allocated reusable slice for parallel duplicate detection
// Used to mark which items in a batch are valid and should be processed
// Reused across all batch processing cycles to eliminate allocations
validItemsBuffer []bool
}

type State uint32
Expand Down Expand Up @@ -388,7 +398,6 @@ func NewSubtreeProcessor(ctx context.Context, logger ulogger.Logger, tSettings *
chainedSubtrees: make([]*subtreepkg.Subtree, 0, ExpectedNumberOfSubtrees),
chainedSubtreeCount: atomic.Int32{},
currentSubtree: firstSubtree,
batcher: NewTxIDAndFeeBatch(tSettings.BlockAssembly.SubtreeProcessorBatcherSize),
queue: queue,
currentTxMap: txmap.NewSyncedMap[chainhash.Hash, subtreepkg.TxInpoints](),
removeMap: txmap.NewSwissMap(0),
Expand All @@ -401,6 +410,8 @@ func NewSubtreeProcessor(ctx context.Context, logger ulogger.Logger, tSettings *
announcementTicker: time.NewTicker(tSettings.BlockAssembly.SubtreeAnnouncementInterval),
ctx: processorCtx,
cancel: cancel,
dequeueBatchBuffer: make([]*TxIDAndFee, 0, 128*1024), // Pre-allocate 128K entries for batch dequeueing
validItemsBuffer: make([]bool, 128*1024), // Pre-allocate for parallel duplicate detection
}
stp.setCurrentRunningState(StateStarting)

Expand Down Expand Up @@ -637,61 +648,97 @@ func NewSubtreeProcessor(ctx context.Context, logger ulogger.Logger, tSettings *
}

default:
var err error

stp.setCurrentRunningState(StateDequeue)

nrProcessed := 0
mapLength := stp.removeMap.Length()
// set the validFromMillis to the current time minus the double spend window - so in the past
validFromMillis := time.Now().Add(-1 * stp.settings.BlockAssembly.DoubleSpendWindow).UnixMilli()

for {
node, txInpoints, _, found := stp.queue.dequeue(validFromMillis)
if !found {
time.Sleep(1 * time.Millisecond)
break
batchSize := cap(stp.dequeueBatchBuffer)

// Cache these — they are read on every single iteration
removeMap := stp.removeMap
mapLength := removeMap.Length()
currentTxMap := stp.currentTxMap
currentItemsPerFile := stp.currentItemsPerFile
validFromMillis := time.Now().Add(-stp.settings.BlockAssembly.DoubleSpendWindow).UnixMilli()
addedCount := uint64(0)

// Check if we need to create a new subtree
if stp.currentSubtree == nil {
stp.currentSubtree, err = subtreepkg.NewTreeByLeafCount(currentItemsPerFile)
if err != nil {
stp.logger.Errorf("[SubtreeProcessor] error creating new subtree: %s", err)
// If we can't create a subtree, we can't process this node.
// We should probably retry or abort, but here we skip to avoid crash.
continue
}

// check if the tx needs to be removed
if mapLength > 0 && stp.removeMap.Exists(node.Hash) {
// remove from the map
if err = stp.removeMap.Delete(node.Hash); err != nil {
stp.logger.Errorf("[SubtreeProcessor] error removing tx from remove map: %s", err.Error())
}

// This is the first subtree for this block - we need a coinbase placeholder
if err = stp.currentSubtree.AddCoinbaseNode(); err != nil {
stp.logger.Errorf("[SubtreeProcessor] error adding coinbase placeholder: %s", err)
continue
}
addedCount++
}

if node.Hash.Equal(*subtreepkg.CoinbasePlaceholderHash) {
stp.logger.Errorf("[SubtreeProcessor] error adding node: skipping request to add coinbase tx placeholder")
// Batch dequeue for improved throughput using pre-allocated buffer
batch := stp.dequeueBatchBuffer
batch = stp.queue.DequeueBatch(batch, batchSize, validFromMillis)
batchLen := len(batch)

if batchLen == 0 {
// Queue is empty right now → be nice to the core
runtime.Gosched()
continue
}

// Process the batch
for _, item := range batch {
node := item.node
txInpoints := item.txInpoints

// Fast reject path first (most common in practice)
if mapLength > 0 && removeMap.Exists(node.Hash) {
// Fire-and-forget delete — we don't care about the error in the hot path
_ = removeMap.Delete(node.Hash)
continue
}

// check if the tx is already in the currentTxMap
if _, ok := stp.currentTxMap.Get(node.Hash); ok {
stp.logger.Warnf("[SubtreeProcessor] error adding node: tx %s already in currentTxMap", node.Hash.String())
// We always have txInpoints from the queue, so we use SetIfNotExists.
// This covers the common case where we are adding a new transaction.
if _, wasSet := currentTxMap.SetIfNotExists(node.Hash, txInpoints); !wasSet {
stp.logger.Debugf("duplicate tx %s", node.Hash.String())
continue
}

// check txInpoints
// for _, parent := range txReq.txInpoints {
// if _, ok := stp.currentTxMap.Get(parent); !ok {
// stp.logger.Errorf("[SubtreeProcessor] error adding node: parent %s not found in currentTxMap", parent.String())
// continue
// }
// }

if err = stp.addNode(node, &txInpoints, false); err != nil {
stp.logger.Errorf("[SubtreeProcessor] error adding node: %s", err.Error())
// Add to current subtree
if err = stp.currentSubtree.AddSubtreeNodeWithoutLock(node); err != nil {
stp.logger.Errorf("addNode failed: error adding node to subtree: %s", err)
// If adding failed, should we remove from map?
// addNode didn't remove it, so we keep consistent behavior.
} else {
stp.txCount.Add(1)
addedCount++
nrProcessed++
}

nrProcessed++
if nrProcessed > stp.settings.BlockAssembly.SubtreeProcessorBatcherSize {
break
// Check if subtree is complete
if len(stp.currentSubtree.Nodes) >= currentItemsPerFile {
if err = stp.processCompleteSubtree(false); err != nil {
stp.logger.Errorf("processCompleteSubtree failed: %s", err)
}
}
}

if addedCount > 0 {
stp.txCount.Add(addedCount)
}

// Only yield if we actually hit the batch limit (means queue still has work)
// If we exited because !found, we already Gosched'ed above
if nrProcessed == batchSize {
runtime.Gosched() // let sibling hyper-thread breathe while we go around again
}

stp.setCurrentRunningState(StateRunning)
}
}
Expand Down Expand Up @@ -744,7 +791,7 @@ func (stp *SubtreeProcessor) createIncompleteSubtreeCopy() (*subtreepkg.Subtree,

// Copy all nodes from current subtree (skipping the coinbase placeholder at index 0)
for _, node := range stp.currentSubtree.Nodes[1:] {
if err = incompleteSubtree.AddSubtreeNode(node); err != nil {
if err = incompleteSubtree.AddSubtreeNodeWithoutLock(node); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -1337,16 +1384,14 @@ func (stp *SubtreeProcessor) addNode(node subtreepkg.Node, parents *subtreepkg.T
}

// This is the first subtree for this block - we need a coinbase placeholder
err = stp.currentSubtree.AddCoinbaseNode()
if err != nil {
if err = stp.currentSubtree.AddCoinbaseNode(); err != nil {
return err
}

stp.txCount.Add(1)
}

err = stp.currentSubtree.AddSubtreeNode(node)
if err != nil {
if err = stp.currentSubtree.AddSubtreeNode(node); err != nil {
return errors.NewProcessingError("error adding node to subtree", err)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ func TestCompareMerkleProofsToSubtrees(t *testing.T) {
settings := test.CreateBaseTestSettings(t)
settings.BlockAssembly.InitialMerkleItemsPerSubtree = 4

subtreeProcessor, _ := NewSubtreeProcessor(context.Background(), ulogger.TestLogger{}, settings, nil, nil, nil, newSubtreeChan, WithBatcherSize(1))
subtreeProcessor, _ := NewSubtreeProcessor(context.Background(), ulogger.TestLogger{}, settings, nil, nil, nil, newSubtreeChan)

for i, hash := range hashes {
if i == 0 {
Expand Down
14 changes: 0 additions & 14 deletions services/blockassembly/subtreeprocessor/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,3 @@ package subtreeprocessor
// extensible configuration of the SubtreeProcessor with optional parameters.
// Multiple options can be composed together to customize processor behavior.
type Options func(*SubtreeProcessor)

// WithBatcherSize creates an option to set the batcher size for the SubtreeProcessor.
// This determines how many transactions will be processed in a single batch.
//
// Parameters:
// - size: The desired size for the transaction batcher
//
// Returns:
// - Options: A configuration function that sets the batcher size
func WithBatcherSize(size int) Options {
return func(sp *SubtreeProcessor) {
sp.batcher = NewTxIDAndFeeBatch(size)
}
}
Loading
Loading