Skip to content

Commit a9f7b53

Browse files
committed
adds telemetry topic
1 parent 99eb075 commit a9f7b53

File tree

8 files changed

+65
-12
lines changed

8 files changed

+65
-12
lines changed

agent/agent_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,19 @@ func (m *mockPolicyManager) RemovePolicyDataset(_ string, _ string, _ backend.Ba
5353
func (m *mockPolicyManager) GetPolicyState() ([]policies.PolicyData, error) {
5454
return nil, nil
5555
}
56+
5657
func (m *mockPolicyManager) GetRepo() policies.PolicyRepo {
5758
return m.repo
5859
}
60+
5961
func (m *mockPolicyManager) ApplyBackendPolicies(_ backend.Backend) error {
6062
return nil
6163
}
64+
6265
func (m *mockPolicyManager) RemoveBackendPolicies(_ backend.Backend, _ bool) error {
6366
return nil
6467
}
68+
6569
func (m *mockPolicyManager) RemovePolicy(_ string, _ string, _ string) error {
6670
return nil
6771
}
@@ -76,6 +80,7 @@ func (m *mockSecretsManager) RegisterUpdatePoliciesCallback(_ func(map[string]bo
7680
func (m *mockSecretsManager) SolvePolicySecrets(payload config.PolicyPayload) (config.PolicyPayload, error) {
7781
return payload, nil
7882
}
83+
7984
func (m *mockSecretsManager) SolveConfigSecrets(backends map[string]any, configManager config.ManagerConfig) (map[string]any, config.ManagerConfig, error) {
8085
return backends, configManager, nil
8186
}

agent/configmgr/fleet.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st
102102
"capabilities_topic", topics.Capabilities,
103103
"inbox_topic", topics.Inbox,
104104
"outbox_topic", topics.Outbox,
105-
"otlp_topic", topics.Ingest)
105+
"otlp_topic", topics.Ingest,
106+
"telemetry_topic", topics.Telemetry)
106107

107108
connectionDetails := fleet.ConnectionDetails{
108109
MQTTURL: jwtClaims.MqttURL,
@@ -177,7 +178,10 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st
177178
pub := otlpbridge.NewCMAdapterPublisher(cm)
178179
fleetManager.otlpBridge.SetPublisher(pub)
179180
fleetManager.otlpBridge.SetIngestTopic(topics.Ingest)
180-
fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", slog.String("topic", topics.Ingest))
181+
fleetManager.otlpBridge.SetTelemetryTopic(topics.Telemetry)
182+
fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT",
183+
slog.String("ingest_topic", topics.Ingest),
184+
slog.String("telemetry_topic", topics.Telemetry))
181185
})
182186

183187
// Start goroutine to handle reconnect requests (JWT refresh)

agent/configmgr/fleet/connection_hooks_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func TestConnect_StoresTopicsBeforeConnecting(t *testing.T) {
5656
MQTTURL: "mqtt://localhost:1883",
5757
Token: "",
5858
AgentID: "agent-1",
59-
Topics: TokenResponseTopics{Inbox: "inbox/x", Heartbeat: "hb/x", Capabilities: "cap/x", Outbox: "out/x", Ingest: "otlp/x"},
59+
Topics: TokenResponseTopics{Inbox: "inbox/x", Heartbeat: "hb/x", Capabilities: "cap/x", Outbox: "out/x", Ingest: "otlp/x", Telemetry: "telemetry/x"},
6060
ClientID: "client-1",
6161
Zone: "zone-a",
6262
}

agent/configmgr/fleet/jwt_claims_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,8 @@ func GestGenerateTopicsFromTemplate(t *testing.T) {
173173
Capabilities: "orgs/test-org/agents/test-client-123/capabilities",
174174
Inbox: "orgs/test-org/agents/test-client-123/inbox",
175175
Outbox: "orgs/test-org/agents/test-client-123/outbox",
176+
Ingest: "orgs/test-org/agents/test-client-123/ingest",
177+
Telemetry: "orgs/test-org/agents/test-client-123/telemetry",
176178
},
177179
},
178180
{
@@ -184,6 +186,8 @@ func GestGenerateTopicsFromTemplate(t *testing.T) {
184186
Capabilities: "orgs/prod-company/agents/test-agent-123/capabilities",
185187
Inbox: "orgs/prod-company/agents/test-agent-123/inbox",
186188
Outbox: "orgs/prod-company/agents/test-agent-123/outbox",
189+
Ingest: "orgs/prod-company/agents/test-agent-123/ingest",
190+
Telemetry: "orgs/prod-company/agents/test-agent-123/telemetry",
187191
},
188192
},
189193
}

agent/configmgr/fleet/topics.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const (
1818
inboxTemplate = "orgs/{org_id}/agents/{agent_id}/inbox"
1919
outboxTemplate = "orgs/{org_id}/agents/{agent_id}/outbox"
2020
ingestTemplate = "orgs/{org_id}/agents/{agent_id}/ingest"
21+
telemetryTemplate = "orgs/{org_id}/agents/{agent_id}/telemetry"
2122

2223
groupsTemplate = "orgs/{org_id}/groups/{group_id}"
2324
)
@@ -29,6 +30,7 @@ type TokenResponseTopics struct {
2930
Inbox string `json:"inbox"`
3031
Outbox string `json:"outbox"`
3132
Ingest string `json:"ingest"`
33+
Telemetry string `json:"telemetry"`
3234
}
3335

3436
// GenerateTopicsFromTemplate creates actual topic names from templates using JWT claims and config agent_id
@@ -39,6 +41,7 @@ func GenerateTopicsFromTemplate(jwtClaims *JWTClaims) (*TokenResponseTopics, err
3941
Inbox: fillTopicTemplate(inboxTemplate, jwtClaims),
4042
Outbox: fillTopicTemplate(outboxTemplate, jwtClaims),
4143
Ingest: fillTopicTemplate(ingestTemplate, jwtClaims),
44+
Telemetry: fillTopicTemplate(telemetryTemplate, jwtClaims),
4245
}, nil
4346
}
4447

agent/configmgr/fleet/topics_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,15 @@ func TestGenerateTopicsFromTemplate_IncludesIngest(t *testing.T) {
1313
t.Fatalf("expected ingest topic %q, got %q", expected, topics.Ingest)
1414
}
1515
}
16+
17+
func TestGenerateTopicsFromTemplate_IncludesTelemetry(t *testing.T) {
18+
claims := &JWTClaims{OrgID: "org-123", AgentID: "agent-abc"}
19+
topics, err := GenerateTopicsFromTemplate(claims)
20+
if err != nil {
21+
t.Fatalf("unexpected error: %v", err)
22+
}
23+
expected := "orgs/org-123/agents/agent-abc/telemetry"
24+
if topics.Telemetry != expected {
25+
t.Fatalf("expected telemetry topic %q, got %q", expected, topics.Telemetry)
26+
}
27+
}

agent/configmgr/fleet_test.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -721,15 +721,19 @@ func TestFleetConfigManager_OnReadyHook_InitializesBridgeOnFirstCall(t *testing.
721721
pub := otlpbridge.NewCMAdapterPublisher(cm)
722722
fleetManager.otlpBridge.SetPublisher(pub)
723723
fleetManager.otlpBridge.SetIngestTopic(topics.Ingest)
724-
fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", slog.String("topic", topics.Ingest))
724+
fleetManager.otlpBridge.SetTelemetryTopic(topics.Telemetry)
725+
fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT",
726+
slog.String("ingest_topic", topics.Ingest),
727+
slog.String("telemetry_topic", topics.Telemetry))
725728
}
726729

727730
// Register the hook
728731
fleetManager.connection.AddOnReadyHook(hookFunc)
729732

730733
// Simulate first connection ready event
731734
topics := fleet.TokenResponseTopics{
732-
Ingest: "test/otlp/topic",
735+
Ingest: "test/otlp/topic",
736+
Telemetry: "test/telemetry/topic",
733737
}
734738

735739
// Call the hook manually (simulating first connection)
@@ -738,6 +742,7 @@ func TestFleetConfigManager_OnReadyHook_InitializesBridgeOnFirstCall(t *testing.
738742
// Verify bridge was initialized
739743
require.NotNil(t, fleetManager.otlpBridge, "bridge should be initialized after first hook call")
740744
assert.Equal(t, "test/otlp/topic", fleetManager.otlpBridge.GetIngestTopic(), "bridge should have correct ingest topic")
745+
assert.Equal(t, "test/telemetry/topic", fleetManager.otlpBridge.GetTelemetryTopic(), "bridge should have correct telemetry topic")
741746

742747
// Cleanup
743748
if fleetManager.otlpBridge != nil {
@@ -791,15 +796,19 @@ func TestFleetConfigManager_OnReadyHook_SkipsInitializationOnReconnect(t *testin
791796
pub := otlpbridge.NewCMAdapterPublisher(cm)
792797
fleetManager.otlpBridge.SetPublisher(pub)
793798
fleetManager.otlpBridge.SetIngestTopic(topics.Ingest)
794-
fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", slog.String("topic", topics.Ingest))
799+
fleetManager.otlpBridge.SetTelemetryTopic(topics.Telemetry)
800+
fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT",
801+
slog.String("ingest_topic", topics.Ingest),
802+
slog.String("telemetry_topic", topics.Telemetry))
795803
}
796804

797805
// Register the hook
798806
fleetManager.connection.AddOnReadyHook(hookFunc)
799807

800808
// Simulate reconnection ready event
801809
topics := fleet.TokenResponseTopics{
802-
Ingest: "test/otlp/topic/reconnect",
810+
Ingest: "test/otlp/topic/reconnect",
811+
Telemetry: "test/telemetry/topic/reconnect",
803812
}
804813

805814
// Call the hook manually (simulating reconnection)
@@ -808,6 +817,7 @@ func TestFleetConfigManager_OnReadyHook_SkipsInitializationOnReconnect(t *testin
808817
// Verify bridge was NOT recreated (same instance)
809818
assert.Equal(t, originalBridge, fleetManager.otlpBridge, "bridge should not be recreated on reconnect")
810819
assert.Equal(t, "test/otlp/topic/reconnect", fleetManager.otlpBridge.GetIngestTopic(), "bridge should have updated ingest topic")
820+
assert.Equal(t, "test/telemetry/topic/reconnect", fleetManager.otlpBridge.GetTelemetryTopic(), "bridge should have updated telemetry topic")
811821

812822
// Cleanup
813823
if fleetManager.otlpBridge != nil {

agent/otlpbridge/server.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ type BridgeServer struct {
2525
closeOnce sync.Once
2626

2727
// Shared runtime state
28-
mu sync.RWMutex
29-
publisher Publisher
30-
ingestTopic string
31-
policyRepo policies.PolicyRepo
32-
logger *slog.Logger
28+
mu sync.RWMutex
29+
publisher Publisher
30+
ingestTopic string
31+
telemetryTopic string
32+
policyRepo policies.PolicyRepo
33+
logger *slog.Logger
3334
}
3435

3536
// NewBridgeServer builds a BridgeServer but does not start it.
@@ -80,6 +81,20 @@ func (s *BridgeServer) GetIngestTopic() string {
8081
return s.ingestTopic
8182
}
8283

84+
// SetTelemetryTopic sets the telemetry topic for publishing.
85+
func (s *BridgeServer) SetTelemetryTopic(topic string) {
86+
s.mu.Lock()
87+
defer s.mu.Unlock()
88+
s.telemetryTopic = topic
89+
}
90+
91+
// GetTelemetryTopic returns the current telemetry topic (for handlers).
92+
func (s *BridgeServer) GetTelemetryTopic() string {
93+
s.mu.RLock()
94+
defer s.mu.RUnlock()
95+
return s.telemetryTopic
96+
}
97+
8398
// GetPolicyRepo returns the policy repo (for handlers).
8499
func (s *BridgeServer) GetPolicyRepo() policies.PolicyRepo {
85100
s.mu.RLock()

0 commit comments

Comments
 (0)