From 324abf59fd8d978aa1a7cce5731055692065af14 Mon Sep 17 00:00:00 2001 From: blockchainluffy Date: Tue, 30 Sep 2025 17:24:15 +0530 Subject: [PATCH 1/2] fix: add chainsync client close and close p2p channels in a single go routine --- rolling-shutter/medley/chainsync/client.go | 8 +++++++- rolling-shutter/p2p/bootstrap.go | 1 + rolling-shutter/p2p/p2p.go | 5 +---- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/rolling-shutter/medley/chainsync/client.go b/rolling-shutter/medley/chainsync/client.go index 02d80e88..2f2ea577 100644 --- a/rolling-shutter/medley/chainsync/client.go +++ b/rolling-shutter/medley/chainsync/client.go @@ -146,7 +146,13 @@ func (s *Client) ChainID(ctx context.Context) (*big.Int, error) { return s.chainID, nil } -func (s *Client) Start(_ context.Context, runner service.Runner) error { +func (s *Client) Start(ctx context.Context, runner service.Runner) error { + runner.Go(func() error { + <-ctx.Done() + s.Client.Close() + s.log.Debug("chainsync client closed") + return nil + }) return runner.StartService(s.getServices()...) } diff --git a/rolling-shutter/p2p/bootstrap.go b/rolling-shutter/p2p/bootstrap.go index bc65aa07..6c77701f 100644 --- a/rolling-shutter/p2p/bootstrap.go +++ b/rolling-shutter/p2p/bootstrap.go @@ -100,6 +100,7 @@ func bootstrap( _, err := retry.FunctionCall( ctx, f, + retry.NumberOfRetries(10), retry.StopOnErrors(errInsufficientBootstrpConfigured), retry.Interval(2*time.Second)) if err != nil { diff --git a/rolling-shutter/p2p/p2p.go b/rolling-shutter/p2p/p2p.go index 0387f1df..7364c782 100644 --- a/rolling-shutter/p2p/p2p.go +++ b/rolling-shutter/p2p/p2p.go @@ -103,10 +103,6 @@ func (p *P2PNode) Run( p.mux.Lock() defer p.mux.Unlock() - runner.Defer(func() { - close(p.GossipMessages) - }) - if err := p.init(ctx); err != nil { return err } @@ -120,6 +116,7 @@ func (p *P2PNode) Run( if err := p.dht.Close(); err != nil { log.Error().Err(err).Msg("error closing dht") } + close(p.GossipMessages) log.Debug().Msg("host closed") return nil }) From 715cde797ed721831b130c4515fe7bf9164cdc9e Mon Sep 17 00:00:00 2001 From: blockchainluffy Date: Wed, 12 Nov 2025 12:02:47 +0530 Subject: [PATCH 2/2] feat: add context management to non buffered channels --- rolling-shutter/keyperimpl/gnosis/keyper.go | 30 +++++++++++++------ rolling-shutter/keyperimpl/gnosis/newslot.go | 7 ++++- .../keyperimpl/shutterservice/keyper.go | 30 +++++++++++++------ rolling-shutter/medley/channel/channel.go | 6 +++- .../medley/slotticker/slotticker.go | 6 +++- 5 files changed, 58 insertions(+), 21 deletions(-) diff --git a/rolling-shutter/keyperimpl/gnosis/keyper.go b/rolling-shutter/keyperimpl/gnosis/keyper.go index e67fee75..70d8fbd8 100644 --- a/rolling-shutter/keyperimpl/gnosis/keyper.go +++ b/rolling-shutter/keyperimpl/gnosis/keyper.go @@ -295,17 +295,29 @@ func (kpr *Keyper) processInputs(ctx context.Context) error { } } -func (kpr *Keyper) channelNewBlock(_ context.Context, ev *syncevent.LatestBlock) error { - kpr.newBlocks <- ev - return nil +func (kpr *Keyper) channelNewBlock(ctx context.Context, ev *syncevent.LatestBlock) error { + select { + case kpr.newBlocks <- ev: + return nil + case <-ctx.Done(): + return ctx.Err() + } } -func (kpr *Keyper) channelNewKeyperSet(_ context.Context, ev *syncevent.KeyperSet) error { - kpr.newKeyperSets <- ev - return nil +func (kpr *Keyper) channelNewKeyperSet(ctx context.Context, ev *syncevent.KeyperSet) error { + select { + case kpr.newKeyperSets <- ev: + return nil + case <-ctx.Done(): + return ctx.Err() + } } -func (kpr *Keyper) channelNewEonPublicKey(_ context.Context, key keyper.EonPublicKey) error { - kpr.newEonPublicKeys <- key - return nil +func (kpr *Keyper) channelNewEonPublicKey(ctx context.Context, key keyper.EonPublicKey) error { + select { + case kpr.newEonPublicKeys <- key: + return nil + case <-ctx.Done(): + return ctx.Err() + } } diff --git a/rolling-shutter/keyperimpl/gnosis/newslot.go b/rolling-shutter/keyperimpl/gnosis/newslot.go index 0b65188e..3ccc563d 100644 --- a/rolling-shutter/keyperimpl/gnosis/newslot.go +++ b/rolling-shutter/keyperimpl/gnosis/newslot.go @@ -266,7 +266,12 @@ func (kpr *Keyper) triggerDecryption( Int("num-identities", len(trigger.IdentityPreimages)). Int64("tx-pointer", txPointer). Msg("sending decryption trigger") - kpr.decryptionTriggerChannel <- event + + select { + case kpr.decryptionTriggerChannel <- event: + case <-ctx.Done(): + return ctx.Err() + } return nil } diff --git a/rolling-shutter/keyperimpl/shutterservice/keyper.go b/rolling-shutter/keyperimpl/shutterservice/keyper.go index eb707e0e..4d5960c4 100644 --- a/rolling-shutter/keyperimpl/shutterservice/keyper.go +++ b/rolling-shutter/keyperimpl/shutterservice/keyper.go @@ -274,17 +274,29 @@ func (kpr *Keyper) processInputs(ctx context.Context) error { } } -func (kpr *Keyper) channelNewEonPublicKey(_ context.Context, key keyper.EonPublicKey) error { - kpr.newEonPublicKeys <- key - return nil +func (kpr *Keyper) channelNewEonPublicKey(ctx context.Context, key keyper.EonPublicKey) error { + select { + case kpr.newEonPublicKeys <- key: + return nil + case <-ctx.Done(): + return ctx.Err() + } } -func (kpr *Keyper) channelNewBlock(_ context.Context, ev *syncevent.LatestBlock) error { - kpr.newBlocks <- ev - return nil +func (kpr *Keyper) channelNewBlock(ctx context.Context, ev *syncevent.LatestBlock) error { + select { + case kpr.newBlocks <- ev: + return nil + case <-ctx.Done(): + return ctx.Err() + } } -func (kpr *Keyper) channelNewKeyperSet(_ context.Context, ev *syncevent.KeyperSet) error { - kpr.newKeyperSets <- ev - return nil +func (kpr *Keyper) channelNewKeyperSet(ctx context.Context, ev *syncevent.KeyperSet) error { + select { + case kpr.newKeyperSets <- ev: + return nil + case <-ctx.Done(): + return ctx.Err() + } } diff --git a/rolling-shutter/medley/channel/channel.go b/rolling-shutter/medley/channel/channel.go index 41510b30..8d3b1199 100644 --- a/rolling-shutter/medley/channel/channel.go +++ b/rolling-shutter/medley/channel/channel.go @@ -12,7 +12,11 @@ func Forward[T any](ctx context.Context, if !ok { return nil } - send <- val + select { + case send <- val: + case <-ctx.Done(): + return ctx.Err() + } case <-ctx.Done(): return ctx.Err() } diff --git a/rolling-shutter/medley/slotticker/slotticker.go b/rolling-shutter/medley/slotticker/slotticker.go index a353e821..7cec1d4c 100644 --- a/rolling-shutter/medley/slotticker/slotticker.go +++ b/rolling-shutter/medley/slotticker/slotticker.go @@ -92,7 +92,11 @@ func (t *SlotTicker) run(ctx context.Context) error { timeToNextSlot := nextTickTime.Sub(now) timer.Reset(timeToNextSlot) - <-timer.C + select { + case <-timer.C: + case <-ctx.Done(): + return ctx.Err() + } if err := t.tick(ctx, nextSlotNumber); err != nil { return err