diff --git a/agent.go b/agent.go index 5fef356b6..f004a06c4 100644 --- a/agent.go +++ b/agent.go @@ -144,6 +144,9 @@ func (agent *agentS) Ready() bool { // SendMetrics sends collected entity data to the host agent func (agent *agentS) SendMetrics(data acceptor.Metrics) error { + agent.mu.RLock() + defer agent.mu.RUnlock() + pid, err := strconv.Atoi(agent.agentComm.from.EntityID) if err != nil && agent.agentComm.from.EntityID != "" { agent.logger.Debug("agent got malformed PID %q", agent.agentComm.from.EntityID) @@ -159,7 +162,11 @@ func (agent *agentS) SendMetrics(data acceptor.Metrics) error { } agent.logger.Error("failed to send metrics to the host agent: ", err) + + // We need to release the read lock before calling reset() which acquires a write lock + agent.mu.RUnlock() agent.reset() + agent.mu.RLock() return err } @@ -186,6 +193,9 @@ func (agent *agentS) SendEvent(event *EventData) error { // SendSpans sends collected spans to the host agent func (agent *agentS) SendSpans(spans []Span) error { + agent.mu.RLock() + defer agent.mu.RUnlock() + for i := range spans { spans[i].From = agent.agentComm.from } @@ -202,7 +212,11 @@ func (agent *agentS) SendSpans(spans []Span) error { return nil } else { agent.logger.Error("failed to send spans to the host agent: ", err) + + // We need to release the read lock before calling reset() which acquires a write lock + agent.mu.RUnlock() agent.reset() + agent.mu.RLock() } return err @@ -221,6 +235,9 @@ type hostAgentProfile struct { // SendProfiles sends profile data to the agent func (agent *agentS) SendProfiles(profiles []autoprofile.Profile) error { + agent.mu.RLock() + defer agent.mu.RUnlock() + agentProfiles := make([]hostAgentProfile, 0, len(profiles)) for _, p := range profiles { agentProfiles = append(agentProfiles, hostAgentProfile{p, agent.agentComm.from.EntityID}) @@ -233,7 +250,11 @@ func (agent *agentS) SendProfiles(profiles []autoprofile.Profile) error { } agent.logger.Error("failed to send profile data to the host agent: ", err) + + // We need to release the read lock before calling reset() which acquires a write lock + agent.mu.RUnlock() agent.reset() + agent.mu.RLock() return err } diff --git a/delayed_spans.go b/delayed_spans.go index 3e3080dea..80d1d3e03 100644 --- a/delayed_spans.go +++ b/delayed_spans.go @@ -33,16 +33,24 @@ func (ds *delayedSpans) flush() { case s := <-ds.spans: t, ok := s.Tracer().(Tracer) if !ok { - sensor.logger.Debug("span tracer has unexpected type") + muSensor.Lock() + if sensor != nil { + sensor.logger.Debug("span tracer has unexpected type") + } + muSensor.Unlock() continue } if err := ds.processSpan(s, t.Options()); err != nil { - sensor.logger.Debug("error while processing spans:", err.Error()) + muSensor.Lock() + if sensor != nil { + sensor.logger.Debug("error while processing spans:", err.Error()) + } + muSensor.Unlock() continue } - if sensor.Agent().Ready() { + if isAgentReady() { s.tracer.recorder.RecordSpan(s) } else { ds.append(s) diff --git a/delayed_spans_test.go b/delayed_spans_test.go index fa5f463bb..b36031aa8 100644 --- a/delayed_spans_test.go +++ b/delayed_spans_test.go @@ -56,7 +56,7 @@ func TestPartiallyFlushDelayedSpans(t *testing.T) { notReadyAfter := maxDelayedSpans / 10 sensor.agent = &eventuallyNotReadyClient{ - notReadyAfter: uint64(notReadyAfter), + notReadyAfter: uint64(notReadyAfter * 2), } delayed.flush() diff --git a/event.go b/event.go index 1ae79ccc6..5877a30d1 100644 --- a/event.go +++ b/event.go @@ -80,6 +80,10 @@ func sendEvent(event *EventData) { // we do fire & forget here, because the whole pid dance isn't necessary to send events go func() { - _ = safeSensor().Agent().SendEvent(event) + muSensor.Lock() + if sensor != nil { + _ = sensor.Agent().SendEvent(event) + } + muSensor.Unlock() }() } diff --git a/recorder.go b/recorder.go index 6f294d6e8..32397bca1 100644 --- a/recorder.go +++ b/recorder.go @@ -32,16 +32,20 @@ type Recorder struct { func NewRecorder() *Recorder { r := &Recorder{} - ticker := time.NewTicker(1 * time.Second) + // Create a reference to r that will be captured by the goroutine + go func() { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() // Ensure ticker is stopped when goroutine exits + for range ticker.C { if isAgentReady() { - go func() { + go func(*Recorder) { if err := r.Flush(context.Background()); err != nil { sensor.logger.Error("failed to flush the spans: ", err.Error()) } - }() + }(r) } } }() @@ -60,32 +64,51 @@ func NewTestRecorder() *Recorder { // RecordSpan accepts spans to be recorded and added to the span queue // for eventual reporting to the host agent. func (r *Recorder) RecordSpan(span *spanS) { + // Get all sensor-related values under a single lock to minimize contention + muSensor.Lock() + if sensor == nil { + muSensor.Unlock() + return + } + + agentReady := sensor != nil && sensor.Agent().Ready() + maxBufferedSpans := sensor.options.MaxBufferedSpans + forceTransmissionAt := sensor.options.ForceTransmissionStartingAt + logger := sensor.logger + muSensor.Unlock() + // If we're not announced and not in test mode then just // return - if !r.testMode && !sensor.Agent().Ready() { + if !r.testMode && !agentReady { return } r.Lock() defer r.Unlock() - if len(r.spans) == sensor.options.MaxBufferedSpans { + if len(r.spans) == maxBufferedSpans { r.spans = r.spans[1:] } r.spans = append(r.spans, newSpan(span)) - if r.testMode || !sensor.Agent().Ready() { + if r.testMode || !agentReady { return } - if len(r.spans) >= sensor.options.ForceTransmissionStartingAt { - sensor.logger.Debug("forcing ", len(r.spans), "span(s) to the agent") - go func() { - if err := r.Flush(context.Background()); err != nil { - sensor.logger.Error("failed to flush the spans: ", err.Error()) + if len(r.spans) >= forceTransmissionAt { + logger.Debug("forcing ", len(r.spans), "span(s) to the agent") + // Create a reference to r for this goroutine to avoid race conditions + rec := r + go func(recorder *Recorder) { + if err := recorder.Flush(context.Background()); err != nil { + muSensor.Lock() + if sensor != nil { + sensor.logger.Error("failed to flush the spans: ", err.Error()) + } + muSensor.Unlock() } - }() + }(rec) } } @@ -114,12 +137,33 @@ func (r *Recorder) GetQueuedSpans() []Span { // Flush sends queued spans to the agent func (r *Recorder) Flush(ctx context.Context) error { + // For test mode, we don't want to actually send spans + if r.testMode { + return nil + } + + // Check if agent is ready before getting and clearing spans + muSensor.Lock() + if sensor == nil { + muSensor.Unlock() + return nil + } + + agent := sensor.Agent() + agentReady := agent.Ready() + muSensor.Unlock() + + // If agent is not ready, don't flush spans + if !agentReady { + return nil + } + spansToSend := r.GetQueuedSpans() if len(spansToSend) == 0 { return nil } - if err := sensor.Agent().SendSpans(spansToSend); err != nil { + if err := agent.SendSpans(spansToSend); err != nil { r.Lock() defer r.Unlock() diff --git a/recorder_test.go b/recorder_test.go index 7dad4ad79..6e14c98ec 100644 --- a/recorder_test.go +++ b/recorder_test.go @@ -4,9 +4,14 @@ package instana_test import ( + "context" + "fmt" "testing" + "time" instana "github.com/instana/go-sensor" + "github.com/instana/go-sensor/acceptor" + "github.com/instana/go-sensor/autoprofile" ot "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" "github.com/stretchr/testify/assert" @@ -68,3 +73,155 @@ func TestRecorder_BatchSpan_Single(t *testing.T) { assert.Nil(t, spans[0].Batch) } + +func TestRecorder_Flush_EmptyQueue(t *testing.T) { + recorder := instana.NewTestRecorder() + + // Test flushing an empty queue + err := recorder.Flush(context.Background()) + assert.NoError(t, err) +} + +func TestRecorder_MaxBufferedSpans(t *testing.T) { + recorder := instana.NewTestRecorder() + c := instana.InitCollector(&instana.Options{ + AgentClient: alwaysReadyClient{}, + Recorder: recorder, + MaxBufferedSpans: 3, // Set a small buffer size for testing + }) + defer instana.ShutdownCollector() + + // Create more spans than the buffer can hold + for i := 0; i < 5; i++ { + c.StartSpan(fmt.Sprintf("span-%d", i)).Finish() + } + + // Verify that only the most recent MaxBufferedSpans are kept + spans := recorder.GetQueuedSpans() + assert.Len(t, spans, 3) + + // Verify that only the most recent MaxBufferedSpans are kept + assert.Len(t, spans, 3) +} + +func TestRecorder_ForceTransmission(t *testing.T) { + // Create a mock agent client that tracks when spans are sent + mockAgent := &mockAgentClient{ + ready: true, + } + + recorder := instana.NewRecorder() + c := instana.InitCollector(&instana.Options{ + AgentClient: mockAgent, + Recorder: recorder, + MaxBufferedSpans: 10, + ForceTransmissionStartingAt: 2, // Force transmission after 2 spans + }) + defer instana.ShutdownCollector() + + // Create spans to trigger force transmission + for i := 0; i < 2; i++ { + c.StartSpan(fmt.Sprintf("span-%d", i)).Finish() + } + + // Give some time for the async flush to happen + time.Sleep(100 * time.Millisecond) + + // Verify that SendSpans was called + assert.True(t, mockAgent.spansSent, "Expected spans to be sent to the agent") +} + +// Mock agent client for testing +type mockAgentClient struct { + ready bool + spansSent bool +} + +func (m *mockAgentClient) Ready() bool { return m.ready } +func (m *mockAgentClient) SendMetrics(data acceptor.Metrics) error { return nil } +func (m *mockAgentClient) SendEvent(event *instana.EventData) error { return nil } +func (m *mockAgentClient) SendSpans(spans []instana.Span) error { + m.spansSent = true + return nil +} +func (m *mockAgentClient) SendProfiles(profiles []autoprofile.Profile) error { return nil } +func (m *mockAgentClient) Flush(context.Context) error { return nil } + +// alwaysReadyClient is already defined in instrumentation_http_test.go + +func TestRecorder_Flush_Error(t *testing.T) { + // Create a mock agent client that returns an error on SendSpans + mockAgent := &errorAgentClient{ + ready: true, + } + + recorder := instana.NewRecorder() + c := instana.InitCollector(&instana.Options{ + AgentClient: mockAgent, + Recorder: recorder, + }) + defer instana.ShutdownCollector() + + // Create a span to be flushed + c.StartSpan("test-span").Finish() + + // Flush should return an error + err := recorder.Flush(context.Background()) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failed to send collected spans") + + // Verify that spans are put back in the queue + assert.Greater(t, recorder.QueuedSpansCount(), 0) +} + +// Mock agent client that returns an error on SendSpans +type errorAgentClient struct { + ready bool +} + +func (m *errorAgentClient) Ready() bool { return m.ready } +func (m *errorAgentClient) SendMetrics(data acceptor.Metrics) error { return nil } +func (m *errorAgentClient) SendEvent(event *instana.EventData) error { return nil } +func (m *errorAgentClient) SendSpans(spans []instana.Span) error { return fmt.Errorf("mock error") } +func (m *errorAgentClient) SendProfiles(profiles []autoprofile.Profile) error { return nil } +func (m *errorAgentClient) Flush(context.Context) error { return nil } + +// TestRecorder_Flush_AgentNotReady tests the behavior when the agent is not ready +func TestRecorder_Flush_AgentNotReady(t *testing.T) { + // Create a mock agent client that is not ready + mockAgent := ¬ReadyAgentClient{} + + // Use a regular recorder, not a test recorder + recorder := instana.NewRecorder() + c := instana.InitCollector(&instana.Options{ + AgentClient: mockAgent, + Recorder: recorder, + }) + defer instana.ShutdownCollector() + + // Create a span to be flushed + c.StartSpan("test-span").Finish() + + // Wait a bit for the span to be processed + time.Sleep(100 * time.Millisecond) + + // Get the initial count + initialCount := recorder.QueuedSpansCount() + + // Flush should not return an error when agent is not ready + err := recorder.Flush(context.Background()) + assert.NoError(t, err) + + // Spans should still be in the queue when agent is not ready + assert.Equal(t, initialCount, recorder.QueuedSpansCount(), "Spans should remain in queue when agent is not ready") +} + +// Mock agent client that is never ready +type notReadyAgentClient struct{} + +func (notReadyAgentClient) Ready() bool { return false } +func (notReadyAgentClient) SendMetrics(data acceptor.Metrics) error { return nil } +func (notReadyAgentClient) SendEvent(event *instana.EventData) error { return nil } +func (notReadyAgentClient) SendSpans(spans []instana.Span) error { return nil } +func (notReadyAgentClient) SendProfiles(profiles []autoprofile.Profile) error { return nil } +func (notReadyAgentClient) Flush(context.Context) error { return nil } diff --git a/sensor.go b/sensor.go index 4e2084293..586cf479a 100644 --- a/sensor.go +++ b/sensor.go @@ -233,13 +233,19 @@ func InitSensor(options *Options) { }) autoprofile.SetSendProfilesFunc(func(profiles []autoprofile.Profile) error { - if !sensor.Agent().Ready() { + + if !isAgentReady() { return errors.New("sender not ready") } sensor.logger.Debug("sending profiles to agent") - return sensor.Agent().SendProfiles(profiles) + // Use the same lock for sending profiles + muSensor.Lock() + err := sensor.Agent().SendProfiles(profiles) + muSensor.Unlock() + + return err }) if _, ok := os.LookupEnv("INSTANA_AUTO_PROFILE"); ok || options.EnableAutoProfile { @@ -265,6 +271,9 @@ func StartMetrics(options *Options) { // Ready returns whether the Instana collector is ready to collect and send data to the agent func Ready() bool { + muSensor.Lock() + defer muSensor.Unlock() + if sensor == nil { return false } @@ -276,6 +285,9 @@ func Ready() bool { // graceful service shutdown and not recommended for intermittent use. Once Flush() is called, it's not guaranteed // that collector remains in operational state. func Flush(ctx context.Context) error { + muSensor.Lock() + defer muSensor.Unlock() + if sensor == nil { return nil } diff --git a/span.go b/span.go index f3faee6d5..db50e8163 100644 --- a/span.go +++ b/span.go @@ -86,7 +86,8 @@ func (r *spanS) FinishWithOptions(opts ot.FinishOptions) { r.Duration = duration if r.sendSpanToAgent() { - if sensor.Agent().Ready() { + + if isAgentReady() { r.tracer.recorder.RecordSpan(r) } else { delayed.append(r) diff --git a/tracer.go b/tracer.go index 2f14f7d74..9dc1bfda2 100644 --- a/tracer.go +++ b/tracer.go @@ -135,5 +135,12 @@ func (r *tracerS) Flush(ctx context.Context) error { return err } + muSensor.Lock() + defer muSensor.Unlock() + + if sensor == nil { + return nil + } + return sensor.Agent().Flush(ctx) }