Skip to content

Commit 8d26508

Browse files
committed
Merge branch '0-6-1-pull-requests-1652' into 0-6-1-pull-requests
2 parents 08de01a + 05cceb7 commit 8d26508

File tree

2 files changed

+26
-12
lines changed

2 files changed

+26
-12
lines changed

server.go

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -837,12 +837,16 @@ func (s *Server) Name() msgmux.EndpointName {
837837
//
838838
// NOTE: This method is part of the msgmux.MsgEndpoint interface.
839839
func (s *Server) CanHandle(msg msgmux.PeerMsg) bool {
840-
err := s.waitForReady()
841-
if err != nil {
842-
srvrLog.Debugf("Can't handle PeerMsg, server not ready %v",
843-
err)
840+
// We can't wait for ready here, as this method is potentially called
841+
// during startup. The `CanHandle` method is stateless, so we can call
842+
// it if the funding controller has been created (but potentially has
843+
// not yet been started).
844+
if s == nil || s.cfg == nil || s.cfg.AuxFundingController == nil {
845+
// This shouldn't happen, the server and funding controller
846+
// should always be initialized before the msgmux is started.
844847
return false
845848
}
849+
846850
return s.cfg.AuxFundingController.CanHandle(msg)
847851
}
848852

@@ -851,12 +855,18 @@ func (s *Server) CanHandle(msg msgmux.PeerMsg) bool {
851855
//
852856
// NOTE: This method is part of the msgmux.MsgEndpoint interface.
853857
func (s *Server) SendMessage(ctx context.Context, msg msgmux.PeerMsg) bool {
854-
err := s.waitForReady()
855-
if err != nil {
856-
srvrLog.Debugf("Failed to send PeerMsg, server not ready %v",
857-
err)
858+
// We can't wait for ready here, as this method is potentially called
859+
// during startup. The `SendMessage` method will buffer messages that
860+
// come in between the funding controller being created and it being
861+
// started (which only happens after waitForReady fires). So it's safe
862+
// to call it here, as long as the funding controller has been created.
863+
if s == nil || s.cfg == nil || s.cfg.AuxFundingController == nil {
864+
// This shouldn't happen, the CanHandle method is always called
865+
// first, and that should've already returned false in this
866+
// case.
858867
return false
859868
}
869+
860870
return s.cfg.AuxFundingController.SendMessage(ctx, msg)
861871
}
862872

tapchannel/aux_funding_controller.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ type FundingController struct {
301301

302302
cfg FundingControllerCfg
303303

304-
msgs chan msgmux.PeerMsg
304+
msgQueue *fn.ConcurrentQueue[msgmux.PeerMsg]
305305

306306
bindFundingReqs chan *bindFundingReq
307307

@@ -318,9 +318,11 @@ type FundingController struct {
318318

319319
// NewFundingController creates a new instance of the FundingController.
320320
func NewFundingController(cfg FundingControllerCfg) *FundingController {
321+
msgs := fn.NewConcurrentQueue[msgmux.PeerMsg](fn.DefaultQueueSize)
322+
msgs.Start()
321323
return &FundingController{
322324
cfg: cfg,
323-
msgs: make(chan msgmux.PeerMsg, 10),
325+
msgQueue: msgs,
324326
bindFundingReqs: make(chan *bindFundingReq, 10),
325327
newFundingReqs: make(chan *FundReq, 10),
326328
rootReqs: make(chan *assetRootReq, 10),
@@ -387,6 +389,8 @@ func (f *FundingController) Stop() error {
387389
close(f.Quit)
388390
f.Wg.Wait()
389391

392+
f.msgQueue.Stop()
393+
390394
return nil
391395
}
392396

@@ -1858,7 +1862,7 @@ func (f *FundingController) chanFunder() {
18581862
// The remote party has sent us some upfront proof for channel
18591863
// asset inputs. We'll log this pending chan ID, then validate
18601864
// the proofs included.
1861-
case msg := <-f.msgs:
1865+
case msg := <-f.msgQueue.ChanOut():
18621866
tempFundingID, err := f.processFundingMsg(
18631867
ctxc, fundingFlows, msg,
18641868
)
@@ -2438,7 +2442,7 @@ func (f *FundingController) CanHandle(msg msgmux.PeerMsg) bool {
24382442
func (f *FundingController) SendMessage(_ context.Context,
24392443
msg msgmux.PeerMsg) bool {
24402444

2441-
return fn.SendOrQuit(f.msgs, msg, f.Quit)
2445+
return fn.SendOrQuit(f.msgQueue.ChanIn(), msg, f.Quit)
24422446
}
24432447

24442448
// TODO(roasbeef): try to protofsm it?

0 commit comments

Comments
 (0)