@@ -20,22 +20,32 @@ import (
2020var _ Manager = (* fleetConfigManager )(nil )
2121
2222type fleetConfigManager struct {
23- logger * slog.Logger
24- connection * fleet.MQTTConnection
25- authTokenManager * fleet.AuthTokenManager
26- resetChan chan struct {}
27- backendState backend.StateRetriever
28- policyManager policymgr.PolicyManager
29- otlpBridge * otlpbridge.BridgeServer
23+ logger * slog.Logger
24+ connection * fleet.MQTTConnection
25+ authTokenManager * fleet.AuthTokenManager
26+ resetChan chan struct {}
27+ reconnectChan chan struct {}
28+ backendState backend.StateRetriever
29+ policyManager policymgr.PolicyManager
30+ otlpBridge * otlpbridge.BridgeServer
31+ config config.Config
32+ backends map [string ]backend.Backend
33+ labels map [string ]string
34+ configYaml string
35+ connectionDetails fleet.ConnectionDetails
36+ monitorCtx context.Context
37+ monitorCancel context.CancelFunc
3038}
3139
3240func newFleetConfigManager (logger * slog.Logger , pMgr policymgr.PolicyManager , backendState backend.StateRetriever ) * fleetConfigManager {
3341 resetChan := make (chan struct {}, 1 )
42+ reconnectChan := make (chan struct {}, 1 )
3443 return & fleetConfigManager {
3544 logger : logger ,
36- connection : fleet .NewMQTTConnection (logger , pMgr , resetChan , backendState ),
45+ connection : fleet .NewMQTTConnection (logger , pMgr , resetChan , reconnectChan , backendState ),
3746 authTokenManager : fleet .NewAuthTokenManager (logger ),
3847 resetChan : resetChan ,
48+ reconnectChan : reconnectChan ,
3949 backendState : backendState ,
4050 policyManager : pMgr ,
4151 }
@@ -106,6 +116,14 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st
106116 if err != nil {
107117 return fmt .Errorf ("failed to convert config to safe string: %w" , err )
108118 }
119+
120+ // Store connection state for reconnection
121+ fleetManager .config = cfg
122+ fleetManager .backends = backends
123+ fleetManager .labels = cfg .OrbAgent .Labels
124+ fleetManager .configYaml = string (configYaml )
125+ fleetManager .connectionDetails = connectionDetails
126+
109127 err = fleetManager .connection .Connect (ctx , connectionDetails , backends , cfg .OrbAgent .Labels , string (configYaml ))
110128 if err != nil {
111129 return err
@@ -158,6 +176,69 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st
158176 fleetManager .logger .Info ("OTLP bridge bound to Fleet MQTT" , slog .String ("topic" , topics .Ingest ))
159177 })
160178
179+ // Start goroutine to handle reconnect requests (JWT refresh)
180+ go func () {
181+ for range fleetManager .reconnectChan {
182+ fleetManager .logger .Info ("JWT refresh and reconnection requested" )
183+ if err := fleetManager .refreshAndReconnect (ctx , timeout ); err != nil {
184+ fleetManager .logger .Error ("failed to refresh and reconnect" , "error" , err )
185+ }
186+ }
187+ }()
188+
189+ // Start background goroutine to monitor token expiry and trigger proactive reconnection
190+ fleetManager .monitorCtx , fleetManager .monitorCancel = context .WithCancel (context .Background ())
191+ go fleetManager .monitorTokenExpiry ()
192+
193+ return nil
194+ }
195+
196+ // refreshAndReconnect refreshes the JWT token and reconnects to MQTT
197+ func (fleetManager * fleetConfigManager ) refreshAndReconnect (ctx context.Context , timeout time.Duration ) error {
198+ // Refresh JWT token
199+ token , err := fleetManager .authTokenManager .RefreshToken (ctx )
200+ if err != nil {
201+ return fmt .Errorf ("failed to refresh token: %w" , err )
202+ }
203+
204+ // Parse new JWT claims
205+ jwtClaims , err := fleet .ParseJWTClaims (token .AccessToken )
206+ if err != nil {
207+ return fmt .Errorf ("failed to parse JWT claims: %w" , err )
208+ }
209+
210+ // Regenerate topics
211+ topics , err := fleet .GenerateTopicsFromTemplate (jwtClaims )
212+ if err != nil {
213+ return fmt .Errorf ("failed to generate topics: %w" , err )
214+ }
215+
216+ fleetManager .logger .Info ("refreshed JWT and generated new topics" ,
217+ "heartbeat_topic" , topics .Heartbeat ,
218+ "capabilities_topic" , topics .Capabilities ,
219+ "inbox_topic" , topics .Inbox ,
220+ "outbox_topic" , topics .Outbox )
221+
222+ // Update connection details
223+ newConnectionDetails := fleet.ConnectionDetails {
224+ MQTTURL : jwtClaims .MqttURL ,
225+ Token : token .AccessToken ,
226+ AgentID : jwtClaims .AgentID ,
227+ Topics : * topics ,
228+ ClientID : fleetManager .config .OrbAgent .ConfigManager .Sources .Fleet .ClientID ,
229+ Zone : jwtClaims .Zone ,
230+ }
231+
232+ // Store updated connection details
233+ fleetManager .connectionDetails = newConnectionDetails
234+
235+ // Reconnect with new token
236+ err = fleetManager .connection .Reconnect (ctx , newConnectionDetails , fleetManager .backends , fleetManager .labels , fleetManager .configYaml , timeout )
237+ if err != nil {
238+ return fmt .Errorf ("failed to reconnect: %w" , err )
239+ }
240+
241+ fleetManager .logger .Info ("successfully refreshed JWT and reconnected" )
161242 return nil
162243}
163244
@@ -178,8 +259,73 @@ func (fleetManager *fleetConfigManager) GetContext(ctx context.Context) context.
178259 return ctx
179260}
180261
181- // Stop gracefully shuts down the OTLP bridge.
262+ // monitorTokenExpiry periodically checks token expiry and triggers reconnection before token expires
263+ func (fleetManager * fleetConfigManager ) monitorTokenExpiry () {
264+ // Check interval: default 30 seconds, configurable via config
265+ checkInterval := 30 * time .Second
266+ if fleetManager .config .OrbAgent .ConfigManager .Sources .Fleet .TokenExpiryCheckInterval != nil && * fleetManager .config .OrbAgent .ConfigManager .Sources .Fleet .TokenExpiryCheckInterval > 0 {
267+ checkInterval = time .Duration (* fleetManager .config .OrbAgent .ConfigManager .Sources .Fleet .TokenExpiryCheckInterval ) * time .Second
268+ }
269+
270+ // Reconnect buffer: default 2 minutes before expiry, configurable via config
271+ reconnectBuffer := 2 * time .Minute
272+ if fleetManager .config .OrbAgent .ConfigManager .Sources .Fleet .TokenReconnectBuffer != nil && * fleetManager .config .OrbAgent .ConfigManager .Sources .Fleet .TokenReconnectBuffer > 0 {
273+ reconnectBuffer = time .Duration (* fleetManager .config .OrbAgent .ConfigManager .Sources .Fleet .TokenReconnectBuffer ) * time .Second
274+ }
275+
276+ ticker := time .NewTicker (checkInterval )
277+ defer ticker .Stop ()
278+
279+ fleetManager .logger .Info ("starting token expiry monitor" ,
280+ "check_interval" , checkInterval ,
281+ "reconnect_buffer" , reconnectBuffer )
282+
283+ for {
284+ select {
285+ case <- fleetManager .monitorCtx .Done ():
286+ fleetManager .logger .Info ("token expiry monitor stopped" )
287+ return
288+ case <- ticker .C :
289+ // Check if token is expired or expiring soon
290+ if fleetManager .authTokenManager .IsTokenExpired () {
291+ fleetManager .logger .Warn ("JWT token has expired, triggering reconnection" ,
292+ "expiry_time" , fleetManager .authTokenManager .GetTokenExpiryTime ())
293+ select {
294+ case fleetManager .reconnectChan <- struct {}{}:
295+ fleetManager .logger .Debug ("reconnection signal sent due to expired token" )
296+ default :
297+ fleetManager .logger .Debug ("reconnection already in progress, skipping duplicate trigger" )
298+ }
299+ } else if fleetManager .authTokenManager .IsTokenExpiringSoon (reconnectBuffer ) {
300+ fleetManager .logger .Warn ("JWT token expiring soon, triggering proactive reconnection" ,
301+ "expiry_time" , fleetManager .authTokenManager .GetTokenExpiryTime (),
302+ "reconnect_buffer" , reconnectBuffer )
303+ select {
304+ case fleetManager .reconnectChan <- struct {}{}:
305+ fleetManager .logger .Debug ("reconnection signal sent due to imminent token expiry" )
306+ default :
307+ fleetManager .logger .Debug ("reconnection already in progress, skipping duplicate trigger" )
308+ }
309+ } else {
310+ expiryTime := fleetManager .authTokenManager .GetTokenExpiryTime ()
311+ if ! expiryTime .IsZero () {
312+ timeUntilExpiry := time .Until (expiryTime )
313+ fleetManager .logger .Debug ("token expiry check passed" ,
314+ "expiry_time" , expiryTime ,
315+ "time_until_expiry" , timeUntilExpiry )
316+ }
317+ }
318+ }
319+ }
320+ }
321+
322+ // Stop gracefully shuts down the OTLP bridge and token expiry monitor.
182323func (fleetManager * fleetConfigManager ) Stop (ctx context.Context ) error {
324+ // Stop token expiry monitor
325+ if fleetManager .monitorCancel != nil {
326+ fleetManager .monitorCancel ()
327+ }
328+
183329 if fleetManager .otlpBridge != nil {
184330 if err := fleetManager .otlpBridge .Stop (ctx ); err != nil {
185331 fleetManager .logger .Error ("error while stopping OTLP bridge" , slog .Any ("error" , err ))
0 commit comments