Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,36 @@ func (a *orbAgent) Start(ctx context.Context, cancelFunc context.CancelFunc) err
return err
}

if a.config.OrbAgent.ConfigManager.Active == "fleet" {
if commonBackend, exists := a.config.OrbAgent.Backends["common"]; exists {
if commonMap, ok := commonBackend.(map[string]any); ok {
if otlpSection, ok := commonMap["otlp"].(map[string]any); ok {
grpcURL, _ := otlpSection["grpc"].(string)
if grpcURL != "" {
a.logger.Warn("Overriding OTLP gRPC URL for fleet config manager", "url", grpcURL)
}
otlpSection["grpc"] = "localhost:4317"
a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", "localhost:4317")

} else {
// otlp section doesn't exist, create it
commonMap["otlp"] = map[string]any{
"grpc": "localhost:4317",
}
a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", "localhost:4317")
}
}
} else {
// common backend doesn't exist, create it with otlp config
a.config.OrbAgent.Backends["common"] = map[string]any{
"otlp": map[string]any{
"grpc": "localhost:4317",
},
}
a.logger.Info("auto-configured OTLP gRPC URL for fleet config manager", "url", "localhost:4317")
}
}

if err = a.startBackends(agentCtx, a.config.OrbAgent.Backends, a.config.OrbAgent.Labels); err != nil {
return err
}
Expand Down
204 changes: 204 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/netboxlabs/orb-agent/agent/backend"
"github.com/netboxlabs/orb-agent/agent/config"
"github.com/netboxlabs/orb-agent/agent/configmgr"
"github.com/netboxlabs/orb-agent/agent/policies"
)

// mockConfigManager implements configmgr.Manager for testing Stop delegation
Expand Down Expand Up @@ -40,3 +42,205 @@ func TestAgentStop_DelegatesToConfigManagerStop(t *testing.T) {

assert.True(t, mockMgr.stopCalled, "expected configManager.Stop to be called")
}

// mockPolicyManager implements policymgr.PolicyManager for testing
type mockPolicyManager struct {
repo policies.PolicyRepo
}

func (m *mockPolicyManager) ManagePolicy(_ config.PolicyPayload) {}
func (m *mockPolicyManager) RemovePolicyDataset(_ string, _ string, _ backend.Backend) {}
func (m *mockPolicyManager) GetPolicyState() ([]policies.PolicyData, error) {
return nil, nil
}

func (m *mockPolicyManager) GetRepo() policies.PolicyRepo {
return m.repo
}

func (m *mockPolicyManager) ApplyBackendPolicies(_ backend.Backend) error {
return nil
}

func (m *mockPolicyManager) RemoveBackendPolicies(_ backend.Backend, _ bool) error {
return nil
}

func (m *mockPolicyManager) RemovePolicy(_ string, _ string, _ string) error {
return nil
}

// mockSecretsManager implements secretsmgr.Manager for testing
type mockSecretsManager struct{}

func (m *mockSecretsManager) Start(_ context.Context) error {
return nil
}
func (m *mockSecretsManager) RegisterUpdatePoliciesCallback(_ func(map[string]bool)) {}
func (m *mockSecretsManager) SolvePolicySecrets(payload config.PolicyPayload) (config.PolicyPayload, error) {
return payload, nil
}

func (m *mockSecretsManager) SolveConfigSecrets(backends map[string]any, configManager config.ManagerConfig) (map[string]any, config.ManagerConfig, error) {
return backends, configManager, nil
}

func TestStart_FleetConfig_OverridesExistingOTLPGrpcURL(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
repo, err := policies.NewMemRepo()
require.NoError(t, err)

cfg := config.Config{
OrbAgent: config.OrbAgent{
Backends: map[string]any{
"common": map[string]any{
"otlp": map[string]any{
"grpc": "original:4317",
},
},
},
ConfigManager: config.ManagerConfig{
Active: "fleet",
},
SecretsManager: config.ManagerSecrets{
Active: "",
},
},
}

agent, err := New(logger, cfg)
require.NoError(t, err)

orbAgent := agent.(*orbAgent)
orbAgent.secretsManager = &mockSecretsManager{}
orbAgent.policyManager = &mockPolicyManager{repo: repo}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Start will fail when trying to start backends, but we can check the config before that
err = orbAgent.Start(ctx, cancel)
// We expect an error because there are no actual backends configured
// But the important thing is that the config was modified
require.Error(t, err)

// Verify the config was modified by checking backendsCommon which is set in startBackends
// The OTLP configuration happens before startBackends, so backendsCommon should have the updated value
assert.Equal(t, "localhost:4317", orbAgent.backendsCommon.Otlp.Grpc, "grpc URL should be overridden to localhost:4317")
}

func TestStart_FleetConfig_CreatesOTLPSectionWhenMissing(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
repo, err := policies.NewMemRepo()
require.NoError(t, err)

cfg := config.Config{
OrbAgent: config.OrbAgent{
Backends: map[string]any{
"common": map[string]any{
"other": "value",
},
},
ConfigManager: config.ManagerConfig{
Active: "fleet",
},
SecretsManager: config.ManagerSecrets{
Active: "",
},
},
}

agent, err := New(logger, cfg)
require.NoError(t, err)

orbAgent := agent.(*orbAgent)
orbAgent.secretsManager = &mockSecretsManager{}
orbAgent.policyManager = &mockPolicyManager{repo: repo}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err = orbAgent.Start(ctx, cancel)
require.Error(t, err) // Expected to fail when starting backends

// Verify the config was modified by checking backendsCommon which is set in startBackends
// The OTLP configuration happens before startBackends, so backendsCommon should have the updated value
assert.Equal(t, "localhost:4317", orbAgent.backendsCommon.Otlp.Grpc, "grpc URL should be set to localhost:4317")
}

func TestStart_FleetConfig_CreatesCommonBackendWhenMissing(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
repo, err := policies.NewMemRepo()
require.NoError(t, err)

cfg := config.Config{
OrbAgent: config.OrbAgent{
Backends: map[string]any{},
ConfigManager: config.ManagerConfig{
Active: "fleet",
},
SecretsManager: config.ManagerSecrets{
Active: "",
},
},
}

agent, err := New(logger, cfg)
require.NoError(t, err)

orbAgent := agent.(*orbAgent)
orbAgent.secretsManager = &mockSecretsManager{}
orbAgent.policyManager = &mockPolicyManager{repo: repo}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err = orbAgent.Start(ctx, cancel)
require.Error(t, err) // Expected to fail when starting backends

// Verify the config was modified by checking backendsCommon which is set in startBackends
// The OTLP configuration happens before startBackends, so backendsCommon should have the updated value
assert.Equal(t, "localhost:4317", orbAgent.backendsCommon.Otlp.Grpc, "grpc URL should be set to localhost:4317")
}

func TestStart_NonFleetConfig_DoesNotModifyConfig(t *testing.T) {
logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
repo, err := policies.NewMemRepo()
require.NoError(t, err)

originalGrpcURL := "original:4317"
cfg := config.Config{
OrbAgent: config.OrbAgent{
Backends: map[string]any{
"common": map[string]any{
"otlp": map[string]any{
"grpc": originalGrpcURL,
},
},
},
ConfigManager: config.ManagerConfig{
Active: "local", // Not fleet
},
SecretsManager: config.ManagerSecrets{
Active: "",
},
},
}

agent, err := New(logger, cfg)
require.NoError(t, err)

orbAgent := agent.(*orbAgent)
orbAgent.secretsManager = &mockSecretsManager{}
orbAgent.policyManager = &mockPolicyManager{repo: repo}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err = orbAgent.Start(ctx, cancel)
require.Error(t, err) // Expected to fail when starting backends

// Verify the config was NOT modified by checking backendsCommon which is set in startBackends
// For non-fleet config, the original value should remain
assert.Equal(t, originalGrpcURL, orbAgent.backendsCommon.Otlp.Grpc, "grpc URL should remain unchanged for non-fleet config")
}
8 changes: 6 additions & 2 deletions agent/configmgr/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st
"capabilities_topic", topics.Capabilities,
"inbox_topic", topics.Inbox,
"outbox_topic", topics.Outbox,
"otlp_topic", topics.Ingest)
"otlp_topic", topics.Ingest,
"telemetry_topic", topics.Telemetry)

connectionDetails := fleet.ConnectionDetails{
MQTTURL: jwtClaims.MqttURL,
Expand Down Expand Up @@ -177,7 +178,10 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st
pub := otlpbridge.NewCMAdapterPublisher(cm)
fleetManager.otlpBridge.SetPublisher(pub)
fleetManager.otlpBridge.SetIngestTopic(topics.Ingest)
fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", slog.String("topic", topics.Ingest))
fleetManager.otlpBridge.SetTelemetryTopic(topics.Telemetry)
fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT",
slog.String("ingest_topic", topics.Ingest),
slog.String("telemetry_topic", topics.Telemetry))
})

// Start goroutine to handle reconnect requests (JWT refresh)
Expand Down
2 changes: 1 addition & 1 deletion agent/configmgr/fleet/connection_hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestConnect_StoresTopicsBeforeConnecting(t *testing.T) {
MQTTURL: "mqtt://localhost:1883",
Token: "",
AgentID: "agent-1",
Topics: TokenResponseTopics{Inbox: "inbox/x", Heartbeat: "hb/x", Capabilities: "cap/x", Outbox: "out/x", Ingest: "otlp/x"},
Topics: TokenResponseTopics{Inbox: "inbox/x", Heartbeat: "hb/x", Capabilities: "cap/x", Outbox: "out/x", Ingest: "otlp/x", Telemetry: "telemetry/x"},
ClientID: "client-1",
Zone: "zone-a",
}
Expand Down
4 changes: 4 additions & 0 deletions agent/configmgr/fleet/jwt_claims_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,8 @@ func GestGenerateTopicsFromTemplate(t *testing.T) {
Capabilities: "orgs/test-org/agents/test-client-123/capabilities",
Inbox: "orgs/test-org/agents/test-client-123/inbox",
Outbox: "orgs/test-org/agents/test-client-123/outbox",
Ingest: "orgs/test-org/agents/test-client-123/ingest",
Telemetry: "orgs/test-org/agents/test-client-123/telemetry",
},
},
{
Expand All @@ -184,6 +186,8 @@ func GestGenerateTopicsFromTemplate(t *testing.T) {
Capabilities: "orgs/prod-company/agents/test-agent-123/capabilities",
Inbox: "orgs/prod-company/agents/test-agent-123/inbox",
Outbox: "orgs/prod-company/agents/test-agent-123/outbox",
Ingest: "orgs/prod-company/agents/test-agent-123/ingest",
Telemetry: "orgs/prod-company/agents/test-agent-123/telemetry",
},
},
}
Expand Down
3 changes: 3 additions & 0 deletions agent/configmgr/fleet/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
inboxTemplate = "orgs/{org_id}/agents/{agent_id}/inbox"
outboxTemplate = "orgs/{org_id}/agents/{agent_id}/outbox"
ingestTemplate = "orgs/{org_id}/agents/{agent_id}/ingest"
telemetryTemplate = "orgs/{org_id}/agents/{agent_id}/telemetry"

groupsTemplate = "orgs/{org_id}/groups/{group_id}"
)
Expand All @@ -29,6 +30,7 @@ type TokenResponseTopics struct {
Inbox string `json:"inbox"`
Outbox string `json:"outbox"`
Ingest string `json:"ingest"`
Telemetry string `json:"telemetry"`
}

// GenerateTopicsFromTemplate creates actual topic names from templates using JWT claims and config agent_id
Expand All @@ -39,6 +41,7 @@ func GenerateTopicsFromTemplate(jwtClaims *JWTClaims) (*TokenResponseTopics, err
Inbox: fillTopicTemplate(inboxTemplate, jwtClaims),
Outbox: fillTopicTemplate(outboxTemplate, jwtClaims),
Ingest: fillTopicTemplate(ingestTemplate, jwtClaims),
Telemetry: fillTopicTemplate(telemetryTemplate, jwtClaims),
}, nil
}

Expand Down
12 changes: 12 additions & 0 deletions agent/configmgr/fleet/topics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,15 @@ func TestGenerateTopicsFromTemplate_IncludesIngest(t *testing.T) {
t.Fatalf("expected ingest topic %q, got %q", expected, topics.Ingest)
}
}

func TestGenerateTopicsFromTemplate_IncludesTelemetry(t *testing.T) {
claims := &JWTClaims{OrgID: "org-123", AgentID: "agent-abc"}
topics, err := GenerateTopicsFromTemplate(claims)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
expected := "orgs/org-123/agents/agent-abc/telemetry"
if topics.Telemetry != expected {
t.Fatalf("expected telemetry topic %q, got %q", expected, topics.Telemetry)
}
}
Loading