diff --git a/agent/agent.go b/agent/agent.go index 2eaa15c..cd5dded 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -197,6 +197,37 @@ func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) err return err } + if a.config.OrbAgent.ConfigManager.Active == "fleet" { + const otlpBridgeEndpoint = "grpc://localhost:4317" + if commonBackend, exists := a.config.OrbAgent.Backends["common"]; exists { + if commonMap, ok := commonBackend.(map[string]any); ok { + if otlpSection, ok := commonMap["otlp"].(map[string]any); ok { + grpcURL, _ := otlpSection["grpc"].(string) + if grpcURL != "" { + a.logger.Warn("Overriding OTLP gRPC URL for fleet config manager", "url", grpcURL) + } + otlpSection["grpc"] = otlpBridgeEndpoint + a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", "localhost:4317") + + } else { + // otlp section doesn't exist, create it + commonMap["otlp"] = map[string]any{ + "grpc": otlpBridgeEndpoint, + } + a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", "localhost:4317") + } + } + } else { + // common backend doesn't exist, create it with otlp config + a.config.OrbAgent.Backends["common"] = map[string]any{ + "otlp": map[string]any{ + "grpc": otlpBridgeEndpoint, + }, + } + a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", "localhost:4317") + } + } + if err = a.startBackends(agentCtx, a.config.OrbAgent.Backends, a.config.OrbAgent.Labels); err != nil { return err } diff --git a/agent/agent_test.go b/agent/agent_test.go index 0b216c2..d1173af 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -7,10 +7,12 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/netboxlabs/orb-agent/agent/backend" "github.com/netboxlabs/orb-agent/agent/config" "github.com/netboxlabs/orb-agent/agent/configmgr" + "github.com/netboxlabs/orb-agent/agent/policies" ) // mockConfigManager implements configmgr.Manager for testing Stop delegation @@ -40,3 +42,205 @@ func TestAgentStop_DelegatesToConfigManagerStop(t *testing.T) { assert.True(t, mockMgr.stopCalled, "expected configManager.Stop to be called") } + +// mockPolicyManager implements policymgr.PolicyManager for testing +type mockPolicyManager struct { + repo policies.PolicyRepo +} + +func (m *mockPolicyManager) ManagePolicy(_ config.PolicyPayload) {} +func (m *mockPolicyManager) RemovePolicyDataset(_ string, _ string, _ backend.Backend) {} +func (m *mockPolicyManager) GetPolicyState() ([]policies.PolicyData, error) { + return nil, nil +} + +func (m *mockPolicyManager) GetRepo() policies.PolicyRepo { + return m.repo +} + +func (m *mockPolicyManager) ApplyBackendPolicies(_ backend.Backend) error { + return nil +} + +func (m *mockPolicyManager) RemoveBackendPolicies(_ backend.Backend, _ bool) error { + return nil +} + +func (m *mockPolicyManager) RemovePolicy(_ string, _ string, _ string) error { + return nil +} + +// mockSecretsManager implements secretsmgr.Manager for testing +type mockSecretsManager struct{} + +func (m *mockSecretsManager) Start(_ context.Context) error { + return nil +} +func (m *mockSecretsManager) RegisterUpdatePoliciesCallback(_ func(map[string]bool)) {} +func (m *mockSecretsManager) SolvePolicySecrets(payload config.PolicyPayload) (config.PolicyPayload, error) { + return payload, nil +} + +func (m *mockSecretsManager) SolveConfigSecrets(backends map[string]any, configManager config.ManagerConfig) (map[string]any, config.ManagerConfig, error) { + return backends, configManager, nil +} + +func TestStart_FleetConfig_OverridesExistingOTLPGrpcURL(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + repo, err := policies.NewMemRepo() + require.NoError(t, err) + + cfg := config.Config{ + OrbAgent: config.OrbAgent{ + Backends: map[string]any{ + "common": map[string]any{ + "otlp": map[string]any{ + "grpc": "original:4317", + }, + }, + }, + ConfigManager: config.ManagerConfig{ + Active: "fleet", + }, + SecretsManager: config.ManagerSecrets{ + Active: "", + }, + }, + } + + agent, err := New(logger, cfg) + require.NoError(t, err) + + orbAgent := agent.(*orbAgent) + orbAgent.secretsManager = &mockSecretsManager{} + orbAgent.policyManager = &mockPolicyManager{repo: repo} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start will fail when trying to start backends, but we can check the config before that + err = orbAgent.Start(ctx, cancel) + // We expect an error because there are no actual backends configured + // But the important thing is that the config was modified + require.Error(t, err) + + // Verify the config was modified by checking backendsCommon which is set in startBackends + // The OTLP configuration happens before startBackends, so backendsCommon should have the updated value + assert.Equal(t, "grpc://localhost:4317", orbAgent.backendsCommon.Otlp.Grpc) +} + +func TestStart_FleetConfig_CreatesOTLPSectionWhenMissing(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + repo, err := policies.NewMemRepo() + require.NoError(t, err) + + cfg := config.Config{ + OrbAgent: config.OrbAgent{ + Backends: map[string]any{ + "common": map[string]any{ + "other": "value", + }, + }, + ConfigManager: config.ManagerConfig{ + Active: "fleet", + }, + SecretsManager: config.ManagerSecrets{ + Active: "", + }, + }, + } + + agent, err := New(logger, cfg) + require.NoError(t, err) + + orbAgent := agent.(*orbAgent) + orbAgent.secretsManager = &mockSecretsManager{} + orbAgent.policyManager = &mockPolicyManager{repo: repo} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err = orbAgent.Start(ctx, cancel) + require.Error(t, err) // Expected to fail when starting backends + + // Verify the config was modified by checking backendsCommon which is set in startBackends + // The OTLP configuration happens before startBackends, so backendsCommon should have the updated value + assert.Equal(t, "grpc://localhost:4317", orbAgent.backendsCommon.Otlp.Grpc) +} + +func TestStart_FleetConfig_CreatesCommonBackendWhenMissing(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + repo, err := policies.NewMemRepo() + require.NoError(t, err) + + cfg := config.Config{ + OrbAgent: config.OrbAgent{ + Backends: map[string]any{}, + ConfigManager: config.ManagerConfig{ + Active: "fleet", + }, + SecretsManager: config.ManagerSecrets{ + Active: "", + }, + }, + } + + agent, err := New(logger, cfg) + require.NoError(t, err) + + orbAgent := agent.(*orbAgent) + orbAgent.secretsManager = &mockSecretsManager{} + orbAgent.policyManager = &mockPolicyManager{repo: repo} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err = orbAgent.Start(ctx, cancel) + require.Error(t, err) // Expected to fail when starting backends + + // Verify the config was modified by checking backendsCommon which is set in startBackends + // The OTLP configuration happens before startBackends, so backendsCommon should have the updated value + assert.Equal(t, "grpc://localhost:4317", orbAgent.backendsCommon.Otlp.Grpc) +} + +func TestStart_NonFleetConfig_DoesNotModifyConfig(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + repo, err := policies.NewMemRepo() + require.NoError(t, err) + + originalGrpcURL := "original:4317" + cfg := config.Config{ + OrbAgent: config.OrbAgent{ + Backends: map[string]any{ + "common": map[string]any{ + "otlp": map[string]any{ + "grpc": originalGrpcURL, + }, + }, + }, + ConfigManager: config.ManagerConfig{ + Active: "local", // Not fleet + }, + SecretsManager: config.ManagerSecrets{ + Active: "", + }, + }, + } + + agent, err := New(logger, cfg) + require.NoError(t, err) + + orbAgent := agent.(*orbAgent) + orbAgent.secretsManager = &mockSecretsManager{} + orbAgent.policyManager = &mockPolicyManager{repo: repo} + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err = orbAgent.Start(ctx, cancel) + require.Error(t, err) // Expected to fail when starting backends + + // Verify the config was NOT modified by checking backendsCommon which is set in startBackends + // For non-fleet config, the original value should remain + assert.Equal(t, originalGrpcURL, orbAgent.backendsCommon.Otlp.Grpc, "grpc URL should remain unchanged for non-fleet config") +} diff --git a/agent/configmgr/fleet.go b/agent/configmgr/fleet.go index 1a45de5..842cdf3 100644 --- a/agent/configmgr/fleet.go +++ b/agent/configmgr/fleet.go @@ -102,7 +102,8 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st "capabilities_topic", topics.Capabilities, "inbox_topic", topics.Inbox, "outbox_topic", topics.Outbox, - "otlp_topic", topics.Ingest) + "otlp_topic", topics.Ingest, + "telemetry_topic", topics.Telemetry) connectionDetails := fleet.ConnectionDetails{ MQTTURL: jwtClaims.MqttURL, @@ -157,7 +158,7 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st fleetManager.logger.Info("MQTT connection ready, initializing OTLP bridge") bridgeConfig := otlpbridge.BridgeConfig{ ListenAddr: ":4317", - Encoding: "protobuf", + Encoding: "json", } var err error fleetManager.otlpBridge, err = otlpbridge.NewBridgeServer(bridgeConfig, fleetManager.policyManager.GetRepo(), fleetManager.logger) @@ -177,7 +178,10 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st pub := otlpbridge.NewCMAdapterPublisher(cm) fleetManager.otlpBridge.SetPublisher(pub) fleetManager.otlpBridge.SetIngestTopic(topics.Ingest) - fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", slog.String("topic", topics.Ingest)) + fleetManager.otlpBridge.SetTelemetryTopic(topics.Telemetry) + fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", + slog.String("ingest_topic", topics.Ingest), + slog.String("telemetry_topic", topics.Telemetry)) }) // Start goroutine to handle reconnect requests (JWT refresh) diff --git a/agent/configmgr/fleet/connection_hooks_test.go b/agent/configmgr/fleet/connection_hooks_test.go index 5733e07..c42a74c 100644 --- a/agent/configmgr/fleet/connection_hooks_test.go +++ b/agent/configmgr/fleet/connection_hooks_test.go @@ -56,7 +56,7 @@ func TestConnect_StoresTopicsBeforeConnecting(t *testing.T) { MQTTURL: "mqtt://localhost:1883", Token: "", AgentID: "agent-1", - Topics: TokenResponseTopics{Inbox: "inbox/x", Heartbeat: "hb/x", Capabilities: "cap/x", Outbox: "out/x", Ingest: "otlp/x"}, + Topics: TokenResponseTopics{Inbox: "inbox/x", Heartbeat: "hb/x", Capabilities: "cap/x", Outbox: "out/x", Ingest: "otlp/x", Telemetry: "telemetry/x"}, ClientID: "client-1", Zone: "zone-a", } diff --git a/agent/configmgr/fleet/jwt_claims_test.go b/agent/configmgr/fleet/jwt_claims_test.go index 8025ded..653236e 100644 --- a/agent/configmgr/fleet/jwt_claims_test.go +++ b/agent/configmgr/fleet/jwt_claims_test.go @@ -173,6 +173,8 @@ func GestGenerateTopicsFromTemplate(t *testing.T) { Capabilities: "orgs/test-org/agents/test-client-123/capabilities", Inbox: "orgs/test-org/agents/test-client-123/inbox", Outbox: "orgs/test-org/agents/test-client-123/outbox", + Ingest: "orgs/test-org/agents/test-client-123/ingest", + Telemetry: "orgs/test-org/agents/test-client-123/telemetry", }, }, { @@ -184,6 +186,8 @@ func GestGenerateTopicsFromTemplate(t *testing.T) { Capabilities: "orgs/prod-company/agents/test-agent-123/capabilities", Inbox: "orgs/prod-company/agents/test-agent-123/inbox", Outbox: "orgs/prod-company/agents/test-agent-123/outbox", + Ingest: "orgs/prod-company/agents/test-agent-123/ingest", + Telemetry: "orgs/prod-company/agents/test-agent-123/telemetry", }, }, } diff --git a/agent/configmgr/fleet/topics.go b/agent/configmgr/fleet/topics.go index 2e9ec18..b830129 100644 --- a/agent/configmgr/fleet/topics.go +++ b/agent/configmgr/fleet/topics.go @@ -18,6 +18,7 @@ const ( inboxTemplate = "orgs/{org_id}/agents/{agent_id}/inbox" outboxTemplate = "orgs/{org_id}/agents/{agent_id}/outbox" ingestTemplate = "orgs/{org_id}/agents/{agent_id}/ingest" + telemetryTemplate = "orgs/{org_id}/agents/{agent_id}/telemetry" groupsTemplate = "orgs/{org_id}/groups/{group_id}" ) @@ -29,6 +30,7 @@ type TokenResponseTopics struct { Inbox string `json:"inbox"` Outbox string `json:"outbox"` Ingest string `json:"ingest"` + Telemetry string `json:"telemetry"` } // GenerateTopicsFromTemplate creates actual topic names from templates using JWT claims and config agent_id @@ -39,6 +41,7 @@ func GenerateTopicsFromTemplate(jwtClaims *JWTClaims) (*TokenResponseTopics, err Inbox: fillTopicTemplate(inboxTemplate, jwtClaims), Outbox: fillTopicTemplate(outboxTemplate, jwtClaims), Ingest: fillTopicTemplate(ingestTemplate, jwtClaims), + Telemetry: fillTopicTemplate(telemetryTemplate, jwtClaims), }, nil } diff --git a/agent/configmgr/fleet/topics_test.go b/agent/configmgr/fleet/topics_test.go index 875fb3f..d7d7bbd 100644 --- a/agent/configmgr/fleet/topics_test.go +++ b/agent/configmgr/fleet/topics_test.go @@ -13,3 +13,15 @@ func TestGenerateTopicsFromTemplate_IncludesIngest(t *testing.T) { t.Fatalf("expected ingest topic %q, got %q", expected, topics.Ingest) } } + +func TestGenerateTopicsFromTemplate_IncludesTelemetry(t *testing.T) { + claims := &JWTClaims{OrgID: "org-123", AgentID: "agent-abc"} + topics, err := GenerateTopicsFromTemplate(claims) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + expected := "orgs/org-123/agents/agent-abc/telemetry" + if topics.Telemetry != expected { + t.Fatalf("expected telemetry topic %q, got %q", expected, topics.Telemetry) + } +} diff --git a/agent/configmgr/fleet_test.go b/agent/configmgr/fleet_test.go index a90da68..586198e 100644 --- a/agent/configmgr/fleet_test.go +++ b/agent/configmgr/fleet_test.go @@ -721,7 +721,10 @@ func TestFleetConfigManager_OnReadyHook_InitializesBridgeOnFirstCall(t *testing. pub := otlpbridge.NewCMAdapterPublisher(cm) fleetManager.otlpBridge.SetPublisher(pub) fleetManager.otlpBridge.SetIngestTopic(topics.Ingest) - fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", slog.String("topic", topics.Ingest)) + fleetManager.otlpBridge.SetTelemetryTopic(topics.Telemetry) + fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", + slog.String("ingest_topic", topics.Ingest), + slog.String("telemetry_topic", topics.Telemetry)) } // Register the hook @@ -729,7 +732,8 @@ func TestFleetConfigManager_OnReadyHook_InitializesBridgeOnFirstCall(t *testing. // Simulate first connection ready event topics := fleet.TokenResponseTopics{ - Ingest: "test/otlp/topic", + Ingest: "test/otlp/topic", + Telemetry: "test/telemetry/topic", } // Call the hook manually (simulating first connection) @@ -738,6 +742,7 @@ func TestFleetConfigManager_OnReadyHook_InitializesBridgeOnFirstCall(t *testing. // Verify bridge was initialized require.NotNil(t, fleetManager.otlpBridge, "bridge should be initialized after first hook call") assert.Equal(t, "test/otlp/topic", fleetManager.otlpBridge.GetIngestTopic(), "bridge should have correct ingest topic") + assert.Equal(t, "test/telemetry/topic", fleetManager.otlpBridge.GetTelemetryTopic(), "bridge should have correct telemetry topic") // Cleanup if fleetManager.otlpBridge != nil { @@ -791,7 +796,10 @@ func TestFleetConfigManager_OnReadyHook_SkipsInitializationOnReconnect(t *testin pub := otlpbridge.NewCMAdapterPublisher(cm) fleetManager.otlpBridge.SetPublisher(pub) fleetManager.otlpBridge.SetIngestTopic(topics.Ingest) - fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", slog.String("topic", topics.Ingest)) + fleetManager.otlpBridge.SetTelemetryTopic(topics.Telemetry) + fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", + slog.String("ingest_topic", topics.Ingest), + slog.String("telemetry_topic", topics.Telemetry)) } // Register the hook @@ -799,7 +807,8 @@ func TestFleetConfigManager_OnReadyHook_SkipsInitializationOnReconnect(t *testin // Simulate reconnection ready event topics := fleet.TokenResponseTopics{ - Ingest: "test/otlp/topic/reconnect", + Ingest: "test/otlp/topic/reconnect", + Telemetry: "test/telemetry/topic/reconnect", } // Call the hook manually (simulating reconnection) @@ -808,6 +817,7 @@ func TestFleetConfigManager_OnReadyHook_SkipsInitializationOnReconnect(t *testin // Verify bridge was NOT recreated (same instance) assert.Equal(t, originalBridge, fleetManager.otlpBridge, "bridge should not be recreated on reconnect") assert.Equal(t, "test/otlp/topic/reconnect", fleetManager.otlpBridge.GetIngestTopic(), "bridge should have updated ingest topic") + assert.Equal(t, "test/telemetry/topic/reconnect", fleetManager.otlpBridge.GetTelemetryTopic(), "bridge should have updated telemetry topic") // Cleanup if fleetManager.otlpBridge != nil { diff --git a/agent/otlpbridge/handlers.go b/agent/otlpbridge/handlers.go index 9c7bdf5..613e732 100644 --- a/agent/otlpbridge/handlers.go +++ b/agent/otlpbridge/handlers.go @@ -9,8 +9,12 @@ import ( collectormetrics "go.opentelemetry.io/proto/otlp/collector/metrics/v1" collectortrace "go.opentelemetry.io/proto/otlp/collector/trace/v1" commonv1 "go.opentelemetry.io/proto/otlp/common/v1" + + "github.com/netboxlabs/orb-agent/agent/policies" ) +const diodePolicyNameAttributeKey = "diode.metadata.policy_name" + // Trace service handler type traceServer struct { bridge *BridgeServer @@ -74,56 +78,114 @@ func (s *logsServer) Export(ctx context.Context, req *collectorlogs.ExportLogsSe if pub == nil { return nil, fmt.Errorf("publisher not yet initialized") } + if s.isIngestRequest(req) { + repo := s.bridge.GetPolicyRepo() + enrichLogsWithDatasets(req, repo) + s.bridge.logger.Info("ingesting enriched logs with dataset_ids", "request", req) + err := s.publishToIngestTopic(ctx, req, pub) + if err != nil { + return nil, err + } + } else { + err := s.publishToTelemetryTopic(ctx, req, pub) + if err != nil { + return nil, err + } + } + return &collectorlogs.ExportLogsServiceResponse{}, nil +} + +// isIngestRequest checks if the request contains a policy_name attribute in resource or scope attributes +func (s *logsServer) isIngestRequest(req *collectorlogs.ExportLogsServiceRequest) bool { + for _, rl := range req.ResourceLogs { + if rl == nil { + continue + } + // Check Resource attributes first + if rl.Resource != nil && rl.Resource.Attributes != nil { + for _, attr := range rl.Resource.Attributes { + if attr != nil && attr.Key == diodePolicyNameAttributeKey && attr.Value != nil { + return true + } + } + } + // Also check Scope attributes for backward compatibility + for _, sl := range rl.ScopeLogs { + if sl == nil || sl.Scope == nil || sl.Scope.Attributes == nil { + continue + } + for _, attr := range sl.Scope.Attributes { + if attr != nil && attr.Key == diodePolicyNameAttributeKey && attr.Value != nil { + return true + } + } + } + } + return false +} + +func (s *logsServer) publishToIngestTopic(ctx context.Context, req *collectorlogs.ExportLogsServiceRequest, pub Publisher) error { topic := s.bridge.GetIngestTopic() if topic == "" { - return nil, fmt.Errorf("topic not yet initialized") + return fmt.Errorf("ingest topic not yet initialized") } - repo := s.bridge.GetPolicyRepo() - // Enrich logs with dataset_ids based on policy_name - if repo != nil { - enrichLogsWithDatasets(req, repo) + return s.publish(ctx, req, pub, topic) +} + +func (s *logsServer) publishToTelemetryTopic(ctx context.Context, req *collectorlogs.ExportLogsServiceRequest, pub Publisher) error { + topic := s.bridge.GetTelemetryTopic() + if topic == "" { + return fmt.Errorf("telemetrytopic not yet initialized") } + return s.publish(ctx, req, pub, topic) +} + +func (s *logsServer) publish(ctx context.Context, req *collectorlogs.ExportLogsServiceRequest, pub Publisher, topic string) error { payload, err := s.bridge.enc.Marshal(req) if err != nil { - return nil, err + return err } if err := pub.Publish(ctx, topic, payload); err != nil { - return nil, err + return err } - return &collectorlogs.ExportLogsServiceResponse{}, nil + return nil } // enrichLogsWithDatasets adds dataset_ids to ScopeLogs attributes based on policy_name. -func enrichLogsWithDatasets(req *collectorlogs.ExportLogsServiceRequest, repo interface{}) { - // Type assertion to allow for easier future mocking if needed - policyRepo, ok := repo.(interface { - GetByName(policyName string) (interface { - GetDatasetIDs() []string - }, error) - }) - if !ok { - // If repo doesn't have the right interface, skip enrichment - return - } - +func enrichLogsWithDatasets(req *collectorlogs.ExportLogsServiceRequest, repo policies.PolicyRepo) { for _, rl := range req.ResourceLogs { if rl == nil { continue } + + // Find policy_name attribute in Resource attributes first + policyName := "" + if rl.Resource != nil && rl.Resource.Attributes != nil { + for _, attr := range rl.Resource.Attributes { + if attr != nil && attr.Key == diodePolicyNameAttributeKey && attr.Value != nil { + if sv := attr.Value.GetStringValue(); sv != "" { + policyName = sv + break + } + } + } + } + for _, sl := range rl.ScopeLogs { if sl == nil || sl.Scope == nil || sl.Scope.Attributes == nil { continue } - // Find policy_name attribute - policyName := "" - for _, attr := range sl.Scope.Attributes { - if attr != nil && attr.Key == "policy_name" && attr.Value != nil { - if sv := attr.Value.GetStringValue(); sv != "" { - policyName = sv - break + // If not found in Resource, check Scope attributes for backward compatibility + if policyName == "" { + for _, attr := range sl.Scope.Attributes { + if attr != nil && attr.Key == diodePolicyNameAttributeKey && attr.Value != nil { + if sv := attr.Value.GetStringValue(); sv != "" { + policyName = sv + break + } } } } @@ -133,7 +195,7 @@ func enrichLogsWithDatasets(req *collectorlogs.ExportLogsServiceRequest, repo in } // Lookup policy and get dataset IDs - policy, err := policyRepo.GetByName(policyName) + policy, err := repo.GetByName(policyName) if err != nil { // Policy not found; skip enrichment for this scope slog.Debug("policy not found", "name", policyName, "error", err) diff --git a/agent/otlpbridge/handlers_test.go b/agent/otlpbridge/handlers_test.go index 406d00b..23a832b 100644 --- a/agent/otlpbridge/handlers_test.go +++ b/agent/otlpbridge/handlers_test.go @@ -64,7 +64,7 @@ func TestLogsHandler_Export_Publishes(t *testing.T) { enc: ProtobufEncoder{}, } bridge.SetPublisher(fp) - bridge.SetIngestTopic("logs") + bridge.SetTelemetryTopic("logs") s := &logsServer{bridge: bridge} _, err := s.Export(context.Background(), &collectorlogs.ExportLogsServiceRequest{}) if err != nil { diff --git a/agent/otlpbridge/server.go b/agent/otlpbridge/server.go index 95b5c39..b016150 100644 --- a/agent/otlpbridge/server.go +++ b/agent/otlpbridge/server.go @@ -25,11 +25,12 @@ type BridgeServer struct { closeOnce sync.Once // Shared runtime state - mu sync.RWMutex - publisher Publisher - ingestTopic string - policyRepo policies.PolicyRepo - logger *slog.Logger + mu sync.RWMutex + publisher Publisher + ingestTopic string + telemetryTopic string + policyRepo policies.PolicyRepo + logger *slog.Logger } // NewBridgeServer builds a BridgeServer but does not start it. @@ -80,6 +81,20 @@ func (s *BridgeServer) GetIngestTopic() string { return s.ingestTopic } +// SetTelemetryTopic sets the telemetry topic for publishing. +func (s *BridgeServer) SetTelemetryTopic(topic string) { + s.mu.Lock() + defer s.mu.Unlock() + s.telemetryTopic = topic +} + +// GetTelemetryTopic returns the current telemetry topic (for handlers). +func (s *BridgeServer) GetTelemetryTopic() string { + s.mu.RLock() + defer s.mu.RUnlock() + return s.telemetryTopic +} + // GetPolicyRepo returns the policy repo (for handlers). func (s *BridgeServer) GetPolicyRepo() policies.PolicyRepo { s.mu.RLock()