|
4 | 4 | "context" |
5 | 5 | "fmt" |
6 | 6 | "log/slog" |
| 7 | + "strconv" |
7 | 8 | "time" |
8 | 9 |
|
9 | 10 | "github.com/eclipse/paho.golang/autopaho" |
@@ -138,51 +139,67 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st |
138 | 139 | fleetManager.connection.AddOnReadyHook(func(cm *autopaho.ConnectionManager, topics fleet.TokenResponseTopics) { |
139 | 140 | fleetManager.logger.Info("MQTT connection ready, initializing OTLP bridges") |
140 | 141 |
|
| 142 | + // Get configurable ports with defaults |
| 143 | + otlpPort := getOTLPPort(cfg.OrbAgent.ConfigManager.Sources.Fleet.OTLPPort, 4317) |
| 144 | + telemetryPort := getOTLPPort(cfg.OrbAgent.ConfigManager.Sources.Fleet.TelemetryPort, 4319) |
| 145 | + |
141 | 146 | // Create publisher adapter (shared by both bridges) |
142 | 147 | pub := otlpbridge.NewCMAdapterPublisher(cm) |
143 | 148 |
|
144 | | - // Initialize main OTLP bridge on port 4317 |
| 149 | + // Initialize main OTLP bridge - this is critical, so failure stops initialization |
145 | 150 | bridgeConfig := otlpbridge.BridgeConfig{ |
146 | | - ListenAddr: ":4317", |
| 151 | + ListenAddr: ":" + strconv.Itoa(otlpPort), |
147 | 152 | Encoding: "protobuf", |
148 | 153 | } |
149 | 154 | var err error |
150 | 155 | fleetManager.otlpBridge, err = otlpbridge.NewBridgeServer(bridgeConfig, fleetManager.policyManager.GetRepo(), fleetManager.logger) |
151 | 156 | if err != nil { |
152 | | - fleetManager.logger.Error("failed to create OTLP bridge", slog.Any("error", err)) |
| 157 | + fleetManager.logger.Error("failed to create OTLP bridge", slog.Any("error", err), slog.Int("port", otlpPort)) |
153 | 158 | return |
154 | 159 | } |
155 | 160 | if err := fleetManager.otlpBridge.Start(context.Background()); err != nil { |
156 | | - fleetManager.logger.Error("failed to start OTLP bridge", slog.Any("error", err)) |
| 161 | + fleetManager.logger.Error("failed to start OTLP bridge", slog.Any("error", err), slog.Int("port", otlpPort)) |
| 162 | + // Clean up the bridge instance if start failed |
| 163 | + fleetManager.otlpBridge = nil |
157 | 164 | return |
158 | 165 | } |
159 | 166 | fleetManager.otlpBridge.SetPublisher(pub) |
160 | 167 | fleetManager.otlpBridge.SetTopic(topics.Ingest) |
161 | | - fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", slog.String("topic", topics.Ingest), slog.String("port", "4317")) |
| 168 | + fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", slog.String("topic", topics.Ingest), slog.Int("port", otlpPort)) |
162 | 169 |
|
163 | | - // Initialize telemetry OTLP bridge on port 4318 |
| 170 | + // Initialize telemetry OTLP bridge - this is non-critical, so failure is logged but doesn't stop initialization |
164 | 171 | telemetryBridgeConfig := otlpbridge.BridgeConfig{ |
165 | | - ListenAddr: ":4318", |
| 172 | + ListenAddr: ":" + strconv.Itoa(telemetryPort), |
166 | 173 | Encoding: "protobuf", |
167 | 174 | } |
168 | 175 | fleetManager.telemetryBridge, err = otlpbridge.NewBridgeServer(telemetryBridgeConfig, fleetManager.policyManager.GetRepo(), fleetManager.logger) |
169 | 176 | if err != nil { |
170 | | - fleetManager.logger.Error("failed to create telemetry OTLP bridge", slog.Any("error", err)) |
| 177 | + fleetManager.logger.Warn("failed to create telemetry OTLP bridge, continuing without it", slog.Any("error", err), slog.Int("port", telemetryPort)) |
171 | 178 | return |
172 | 179 | } |
173 | 180 | if err := fleetManager.telemetryBridge.Start(context.Background()); err != nil { |
174 | | - fleetManager.logger.Error("failed to start telemetry OTLP bridge", slog.Any("error", err)) |
| 181 | + fleetManager.logger.Warn("failed to start telemetry OTLP bridge, continuing without it", slog.Any("error", err), slog.Int("port", telemetryPort)) |
| 182 | + // Clean up the bridge instance if start failed |
| 183 | + fleetManager.telemetryBridge = nil |
175 | 184 | return |
176 | 185 | } |
177 | 186 | telemetryTopic := topics.Ingest + "/telemetry" |
178 | 187 | fleetManager.telemetryBridge.SetPublisher(pub) |
179 | 188 | fleetManager.telemetryBridge.SetTopic(telemetryTopic) |
180 | | - fleetManager.logger.Info("Telemetry OTLP bridge bound to Fleet MQTT", slog.String("topic", telemetryTopic), slog.String("port", "4318")) |
| 189 | + fleetManager.logger.Info("Telemetry OTLP bridge bound to Fleet MQTT", slog.String("topic", telemetryTopic), slog.Int("port", telemetryPort)) |
181 | 190 | }) |
182 | 191 |
|
183 | 192 | return nil |
184 | 193 | } |
185 | 194 |
|
| 195 | +// getOTLPPort returns the configured port or the default if not set. |
| 196 | +func getOTLPPort(configuredPort *int, defaultPort int) int { |
| 197 | + if configuredPort != nil && *configuredPort > 0 { |
| 198 | + return *configuredPort |
| 199 | + } |
| 200 | + return defaultPort |
| 201 | +} |
| 202 | + |
186 | 203 | func (fleetManager *fleetConfigManager) configToSafeString(cfg config.Config) (string, error) { |
187 | 204 | if cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientSecret != "" { |
188 | 205 | cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientSecret = "******" |
|
0 commit comments