diff --git a/agent/config/types.go b/agent/config/types.go index 13559c9..35614e5 100644 --- a/agent/config/types.go +++ b/agent/config/types.go @@ -35,12 +35,14 @@ type GitManager struct { // FleetManager represents the Fleet ConfigManager configuration. type FleetManager struct { - URL string `yaml:"url"` - TokenURL string `yaml:"token_url"` - Timeout *int `yaml:"timeout,omitempty"` - SkipTLS bool `yaml:"skip_tls"` - ClientID string `yaml:"client_id"` - ClientSecret string `yaml:"client_secret"` + URL string `yaml:"url"` + TokenURL string `yaml:"token_url"` + Timeout *int `yaml:"timeout,omitempty"` + SkipTLS bool `yaml:"skip_tls"` + ClientID string `yaml:"client_id"` + ClientSecret string `yaml:"client_secret"` + TokenExpiryCheckInterval *int `yaml:"token_expiry_check_interval,omitempty"` // Check interval in seconds (default: 30) + TokenReconnectBuffer *int `yaml:"token_reconnect_buffer,omitempty"` // Reconnect buffer in seconds before expiry (default: 120) } // Sources represents the configuration for manager sources, including cloud, local and git. diff --git a/agent/configmgr/fleet.go b/agent/configmgr/fleet.go index f829435..d6477ba 100644 --- a/agent/configmgr/fleet.go +++ b/agent/configmgr/fleet.go @@ -20,22 +20,32 @@ import ( var _ Manager = (*fleetConfigManager)(nil) type fleetConfigManager struct { - logger *slog.Logger - connection *fleet.MQTTConnection - authTokenManager *fleet.AuthTokenManager - resetChan chan struct{} - backendState backend.StateRetriever - policyManager policymgr.PolicyManager - otlpBridge *otlpbridge.BridgeServer + logger *slog.Logger + connection *fleet.MQTTConnection + authTokenManager *fleet.AuthTokenManager + resetChan chan struct{} + reconnectChan chan struct{} + backendState backend.StateRetriever + policyManager policymgr.PolicyManager + otlpBridge *otlpbridge.BridgeServer + config config.Config + backends map[string]backend.Backend + labels map[string]string + configYaml string + connectionDetails fleet.ConnectionDetails + monitorCtx context.Context + monitorCancel context.CancelFunc } func newFleetConfigManager(logger *slog.Logger, pMgr policymgr.PolicyManager, backendState backend.StateRetriever) *fleetConfigManager { resetChan := make(chan struct{}, 1) + reconnectChan := make(chan struct{}, 1) return &fleetConfigManager{ logger: logger, - connection: fleet.NewMQTTConnection(logger, pMgr, resetChan, backendState), + connection: fleet.NewMQTTConnection(logger, pMgr, resetChan, reconnectChan, backendState), authTokenManager: fleet.NewAuthTokenManager(logger), resetChan: resetChan, + reconnectChan: reconnectChan, backendState: backendState, policyManager: pMgr, } @@ -106,6 +116,14 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st if err != nil { return fmt.Errorf("failed to convert config to safe string: %w", err) } + + // Store connection state for reconnection + fleetManager.config = cfg + fleetManager.backends = backends + fleetManager.labels = cfg.OrbAgent.Labels + fleetManager.configYaml = string(configYaml) + fleetManager.connectionDetails = connectionDetails + err = fleetManager.connection.Connect(ctx, connectionDetails, backends, cfg.OrbAgent.Labels, string(configYaml)) if err != nil { return err @@ -158,6 +176,69 @@ func (fleetManager *fleetConfigManager) Start(cfg config.Config, backends map[st fleetManager.logger.Info("OTLP bridge bound to Fleet MQTT", slog.String("topic", topics.Ingest)) }) + // Start goroutine to handle reconnect requests (JWT refresh) + go func() { + for range fleetManager.reconnectChan { + fleetManager.logger.Info("JWT refresh and reconnection requested") + if err := fleetManager.refreshAndReconnect(ctx, timeout); err != nil { + fleetManager.logger.Error("failed to refresh and reconnect", "error", err) + } + } + }() + + // Start background goroutine to monitor token expiry and trigger proactive reconnection + fleetManager.monitorCtx, fleetManager.monitorCancel = context.WithCancel(context.Background()) + go fleetManager.monitorTokenExpiry() + + return nil +} + +// refreshAndReconnect refreshes the JWT token and reconnects to MQTT +func (fleetManager *fleetConfigManager) refreshAndReconnect(ctx context.Context, timeout time.Duration) error { + // Refresh JWT token + token, err := fleetManager.authTokenManager.RefreshToken(ctx) + if err != nil { + return fmt.Errorf("failed to refresh token: %w", err) + } + + // Parse new JWT claims + jwtClaims, err := fleet.ParseJWTClaims(token.AccessToken) + if err != nil { + return fmt.Errorf("failed to parse JWT claims: %w", err) + } + + // Regenerate topics + topics, err := fleet.GenerateTopicsFromTemplate(jwtClaims) + if err != nil { + return fmt.Errorf("failed to generate topics: %w", err) + } + + fleetManager.logger.Info("refreshed JWT and generated new topics", + "heartbeat_topic", topics.Heartbeat, + "capabilities_topic", topics.Capabilities, + "inbox_topic", topics.Inbox, + "outbox_topic", topics.Outbox) + + // Update connection details + newConnectionDetails := fleet.ConnectionDetails{ + MQTTURL: jwtClaims.MqttURL, + Token: token.AccessToken, + AgentID: jwtClaims.AgentID, + Topics: *topics, + ClientID: fleetManager.config.OrbAgent.ConfigManager.Sources.Fleet.ClientID, + Zone: jwtClaims.Zone, + } + + // Store updated connection details + fleetManager.connectionDetails = newConnectionDetails + + // Reconnect with new token + err = fleetManager.connection.Reconnect(ctx, newConnectionDetails, fleetManager.backends, fleetManager.labels, fleetManager.configYaml, timeout) + if err != nil { + return fmt.Errorf("failed to reconnect: %w", err) + } + + fleetManager.logger.Info("successfully refreshed JWT and reconnected") return nil } @@ -178,8 +259,73 @@ func (fleetManager *fleetConfigManager) GetContext(ctx context.Context) context. return ctx } -// Stop gracefully shuts down the OTLP bridge. +// monitorTokenExpiry periodically checks token expiry and triggers reconnection before token expires +func (fleetManager *fleetConfigManager) monitorTokenExpiry() { + // Check interval: default 30 seconds, configurable via config + checkInterval := 30 * time.Second + if fleetManager.config.OrbAgent.ConfigManager.Sources.Fleet.TokenExpiryCheckInterval != nil && *fleetManager.config.OrbAgent.ConfigManager.Sources.Fleet.TokenExpiryCheckInterval > 0 { + checkInterval = time.Duration(*fleetManager.config.OrbAgent.ConfigManager.Sources.Fleet.TokenExpiryCheckInterval) * time.Second + } + + // Reconnect buffer: default 2 minutes before expiry, configurable via config + reconnectBuffer := 2 * time.Minute + if fleetManager.config.OrbAgent.ConfigManager.Sources.Fleet.TokenReconnectBuffer != nil && *fleetManager.config.OrbAgent.ConfigManager.Sources.Fleet.TokenReconnectBuffer > 0 { + reconnectBuffer = time.Duration(*fleetManager.config.OrbAgent.ConfigManager.Sources.Fleet.TokenReconnectBuffer) * time.Second + } + + ticker := time.NewTicker(checkInterval) + defer ticker.Stop() + + fleetManager.logger.Info("starting token expiry monitor", + "check_interval", checkInterval, + "reconnect_buffer", reconnectBuffer) + + for { + select { + case <-fleetManager.monitorCtx.Done(): + fleetManager.logger.Info("token expiry monitor stopped") + return + case <-ticker.C: + // Check if token is expired or expiring soon + if fleetManager.authTokenManager.IsTokenExpired() { + fleetManager.logger.Warn("JWT token has expired, triggering reconnection", + "expiry_time", fleetManager.authTokenManager.GetTokenExpiryTime()) + select { + case fleetManager.reconnectChan <- struct{}{}: + fleetManager.logger.Debug("reconnection signal sent due to expired token") + default: + fleetManager.logger.Debug("reconnection already in progress, skipping duplicate trigger") + } + } else if fleetManager.authTokenManager.IsTokenExpiringSoon(reconnectBuffer) { + fleetManager.logger.Warn("JWT token expiring soon, triggering proactive reconnection", + "expiry_time", fleetManager.authTokenManager.GetTokenExpiryTime(), + "reconnect_buffer", reconnectBuffer) + select { + case fleetManager.reconnectChan <- struct{}{}: + fleetManager.logger.Debug("reconnection signal sent due to imminent token expiry") + default: + fleetManager.logger.Debug("reconnection already in progress, skipping duplicate trigger") + } + } else { + expiryTime := fleetManager.authTokenManager.GetTokenExpiryTime() + if !expiryTime.IsZero() { + timeUntilExpiry := time.Until(expiryTime) + fleetManager.logger.Debug("token expiry check passed", + "expiry_time", expiryTime, + "time_until_expiry", timeUntilExpiry) + } + } + } + } +} + +// Stop gracefully shuts down the OTLP bridge and token expiry monitor. func (fleetManager *fleetConfigManager) Stop(ctx context.Context) error { + // Stop token expiry monitor + if fleetManager.monitorCancel != nil { + fleetManager.monitorCancel() + } + if fleetManager.otlpBridge != nil { if err := fleetManager.otlpBridge.Stop(ctx); err != nil { fleetManager.logger.Error("error while stopping OTLP bridge", slog.Any("error", err)) diff --git a/agent/configmgr/fleet/auth.go b/agent/configmgr/fleet/auth.go index 0d824b6..402e912 100644 --- a/agent/configmgr/fleet/auth.go +++ b/agent/configmgr/fleet/auth.go @@ -12,11 +12,21 @@ import ( "net/url" "strings" "time" + + "github.com/go-jose/go-jose/v4" + "github.com/go-jose/go-jose/v4/jwt" ) // AuthTokenManager manages auth tokens type AuthTokenManager struct { - logger *slog.Logger + logger *slog.Logger + tokenURL string + skipTLS bool + timeout time.Duration + clientID string + clientSecret string + lastToken *TokenResponse + tokenExpiresAt time.Time } // NewAuthTokenManager creates a new AuthTokenManager @@ -46,6 +56,13 @@ func (fleetManager *AuthTokenManager) GetToken(ctx context.Context, tokenURL str return nil, fmt.Errorf("client secret cannot be empty") } + // Store credentials for future refresh + fleetManager.tokenURL = tokenURL + fleetManager.skipTLS = skipTLS + fleetManager.timeout = timeout + fleetManager.clientID = clientID + fleetManager.clientSecret = clientSecret + fleetManager.logger.Debug("requesting access token", "token_url", tokenURL, "client_id", clientID) scopes := []string{ @@ -60,8 +77,6 @@ func (fleetManager *AuthTokenManager) GetToken(ctx context.Context, tokenURL str data.Set("client_secret", clientSecret) data.Set("audience", "orb") - fleetManager.logger.Debug("sending token request", "url", tokenURL, "data", data, "client_id", clientID) //, "client_secret", clientSecret) - req, err := http.NewRequest("POST", tokenURL, bytes.NewBufferString(data.Encode())) if err != nil { fleetManager.logger.Error("failed to create token request", "error", err, "token_url", tokenURL) @@ -77,7 +92,8 @@ func (fleetManager *AuthTokenManager) GetToken(ctx context.Context, tokenURL str }, } - fleetManager.logger.Debug("sending token request", "url", tokenURL) + fleetManager.logger.Debug("sending token request", "url", tokenURL, "data", data, "client_id", clientID) + resp, err := httpClient.Do(req.WithContext(ctx)) if err != nil { fleetManager.logger.Error("failed to send token request", "error", err, "token_url", tokenURL) @@ -121,5 +137,83 @@ func (fleetManager *AuthTokenManager) GetToken(ctx context.Context, tokenURL str "expires_in", TokenResponse.ExpiresIn, "mqtt_url", TokenResponse.MQTTURL) + // Store token and calculate expiration time + fleetManager.lastToken = &TokenResponse + + // Try to parse JWT exp claim for more accurate expiry tracking + var expiryTime time.Time + if parsedExpiry, err := parseJWTExpiry(TokenResponse.AccessToken); err == nil && !parsedExpiry.IsZero() { + // Use JWT exp claim with 5-minute buffer for safety + expiryTime = parsedExpiry.Add(-5 * time.Minute) + fleetManager.logger.Debug("using JWT exp claim for token expiry", "expiry", parsedExpiry, "buffer_applied", expiryTime) + } else if TokenResponse.ExpiresIn > 0 { + // Fallback to ExpiresIn from response (with 5-minute buffer) + expiryTime = time.Now().Add(time.Duration(TokenResponse.ExpiresIn)*time.Second - 5*time.Minute) + fleetManager.logger.Debug("using ExpiresIn for token expiry", "expires_in", TokenResponse.ExpiresIn, "buffer_applied", expiryTime) + } + + fleetManager.tokenExpiresAt = expiryTime + return &TokenResponse, nil } + +// RefreshToken refreshes the auth token using stored credentials +func (fleetManager *AuthTokenManager) RefreshToken(ctx context.Context) (*TokenResponse, error) { + if fleetManager.tokenURL == "" { + return nil, fmt.Errorf("cannot refresh token: credentials not initialized") + } + + fleetManager.logger.Info("refreshing JWT token") + return fleetManager.GetToken(ctx, fleetManager.tokenURL, fleetManager.skipTLS, fleetManager.timeout, fleetManager.clientID, fleetManager.clientSecret) +} + +// IsTokenExpired checks if the current token is expired or will expire soon +func (fleetManager *AuthTokenManager) IsTokenExpired() bool { + if fleetManager.lastToken == nil { + return true + } + return time.Now().After(fleetManager.tokenExpiresAt) +} + +// IsTokenExpiringSoon checks if the token will expire within the specified duration +func (fleetManager *AuthTokenManager) IsTokenExpiringSoon(buffer time.Duration) bool { + if fleetManager.lastToken == nil { + return true + } + if fleetManager.tokenExpiresAt.IsZero() { + return true + } + return time.Now().Add(buffer).After(fleetManager.tokenExpiresAt) +} + +// GetTokenExpiryTime returns the time when the current token expires (with buffer already applied) +func (fleetManager *AuthTokenManager) GetTokenExpiryTime() time.Time { + return fleetManager.tokenExpiresAt +} + +// parseJWTExpiry extracts the exp claim from a JWT token +func parseJWTExpiry(tokenString string) (time.Time, error) { + if tokenString == "" { + return time.Time{}, fmt.Errorf("empty token string") + } + + // Parse the JWT token without verification + token, err := jwt.ParseSigned(tokenString, []jose.SignatureAlgorithm{jose.HS256, jose.HS384, jose.HS512, jose.RS256, jose.RS384, jose.RS512, jose.ES256, jose.ES384, jose.ES512}) + if err != nil { + return time.Time{}, fmt.Errorf("failed to parse JWT token: %w", err) + } + + var claims jwt.Claims + + // Extract standard claims without verification + if err := token.UnsafeClaimsWithoutVerification(&claims, nil); err != nil { + return time.Time{}, fmt.Errorf("failed to extract claims from JWT: %w", err) + } + + // Check if exp claim exists + if claims.Expiry == nil { + return time.Time{}, fmt.Errorf("exp claim not found in JWT token") + } + + return claims.Expiry.Time(), nil +} diff --git a/agent/configmgr/fleet/auth_test.go b/agent/configmgr/fleet/auth_test.go index a993f76..f6c317f 100644 --- a/agent/configmgr/fleet/auth_test.go +++ b/agent/configmgr/fleet/auth_test.go @@ -151,3 +151,311 @@ func TestTokenResponse_Marshaling(t *testing.T) { assert.Equal(t, original.MQTTURL, unmarshaled.MQTTURL) assert.Equal(t, original.ExpiresIn, unmarshaled.ExpiresIn) } + +func TestAuthTokenManager_GetToken_WithJWTExpClaim(t *testing.T) { + // Arrange + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + authTokenManager := NewAuthTokenManager(logger) + + // Create a JWT token with exp claim + futureExpiry := time.Now().Add(1 * time.Hour) + jwtToken := RawJWTWithClaims(map[string]any{ + "exp": futureExpiry.Unix(), + "iat": time.Now().Unix(), + }) + + // Create mock HTTP server + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + response := TokenResponse{ + AccessToken: jwtToken, + MQTTURL: "mqtt://test.example.com:1883", + ExpiresIn: 3600, // This should be ignored if JWT exp is present + } + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + // Act + ctx := context.Background() + token, err := authTokenManager.GetToken(ctx, server.URL, true, 60*time.Second, "test_client_id", "test_client_secret") + + // Assert + require.NoError(t, err) + assert.NotNil(t, token) + + // Verify expiry time is set (should be futureExpiry - 5 minutes buffer) + expiryTime := authTokenManager.GetTokenExpiryTime() + assert.False(t, expiryTime.IsZero()) + assert.WithinDuration(t, futureExpiry.Add(-5*time.Minute), expiryTime, 1*time.Second) +} + +func TestAuthTokenManager_GetToken_WithExpiresInFallback(t *testing.T) { + // Arrange + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + authTokenManager := NewAuthTokenManager(logger) + + // Create a JWT token without exp claim + jwtToken := RawJWTWithClaims(map[string]any{ + "iat": time.Now().Unix(), + // No exp claim + }) + + // Create mock HTTP server + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + response := TokenResponse{ + AccessToken: jwtToken, + MQTTURL: "mqtt://test.example.com:1883", + ExpiresIn: 3600, // Should use this + } + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + // Act + ctx := context.Background() + beforeTime := time.Now() + token, err := authTokenManager.GetToken(ctx, server.URL, true, 60*time.Second, "test_client_id", "test_client_secret") + afterTime := time.Now() + + // Assert + require.NoError(t, err) + assert.NotNil(t, token) + + // Verify expiry time is set using ExpiresIn (3600 seconds - 5 minutes buffer) + expiryTime := authTokenManager.GetTokenExpiryTime() + assert.False(t, expiryTime.IsZero()) + expectedExpiry := beforeTime.Add(3600*time.Second - 5*time.Minute) + assert.True(t, expiryTime.After(expectedExpiry.Add(-1*time.Second)) && expiryTime.Before(afterTime.Add(3600*time.Second-5*time.Minute).Add(1*time.Second))) +} + +func TestAuthTokenManager_GetTokenExpiryTime(t *testing.T) { + // Arrange + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + authTokenManager := NewAuthTokenManager(logger) + + // Test with no token + expiryTime := authTokenManager.GetTokenExpiryTime() + assert.True(t, expiryTime.IsZero()) + + // Get a token + futureExpiry := time.Now().Add(1 * time.Hour) + jwtToken := RawJWTWithClaims(map[string]any{ + "exp": futureExpiry.Unix(), + "iat": time.Now().Unix(), + }) + + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + response := TokenResponse{ + AccessToken: jwtToken, + MQTTURL: "mqtt://test.example.com:1883", + ExpiresIn: 3600, + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + ctx := context.Background() + _, err := authTokenManager.GetToken(ctx, server.URL, true, 60*time.Second, "test_client_id", "test_client_secret") + require.NoError(t, err) + + // Test with token + expiryTime = authTokenManager.GetTokenExpiryTime() + assert.False(t, expiryTime.IsZero()) + assert.WithinDuration(t, futureExpiry.Add(-5*time.Minute), expiryTime, 1*time.Second) +} + +func TestAuthTokenManager_IsTokenExpired_ExpiredCases(t *testing.T) { + // Arrange + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + authTokenManager := NewAuthTokenManager(logger) + + // Test with no token + assert.True(t, authTokenManager.IsTokenExpired()) + + // Test with expired token + pastExpiry := time.Now().Add(-1 * time.Hour) + jwtTokenExpired := RawJWTWithClaims(map[string]any{ + "exp": pastExpiry.Unix(), + "iat": time.Now().Add(-2 * time.Hour).Unix(), + }) + + serverExpired := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + response := TokenResponse{ + AccessToken: jwtTokenExpired, + MQTTURL: "mqtt://test.example.com:1883", + ExpiresIn: 1, + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(response) + })) + defer serverExpired.Close() + + ctx := context.Background() + _, err := authTokenManager.GetToken(ctx, serverExpired.URL, true, 60*time.Second, "test_client_id", "test_client_secret") + require.NoError(t, err) + + // Wait a moment to ensure time has passed + time.Sleep(2 * time.Second) + assert.True(t, authTokenManager.IsTokenExpired()) +} + +func TestAuthTokenManager_IsTokenExpired_ValidToken(t *testing.T) { + // Arrange + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + authTokenManager := NewAuthTokenManager(logger) + + // Test with valid token + futureExpiry := time.Now().Add(1 * time.Hour) + jwtTokenValid := RawJWTWithClaims(map[string]any{ + "exp": futureExpiry.Unix(), + "iat": time.Now().Unix(), + }) + + serverValid := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + response := TokenResponse{ + AccessToken: jwtTokenValid, + MQTTURL: "mqtt://test.example.com:1883", + ExpiresIn: 3600, + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(response) + })) + defer serverValid.Close() + + ctx := context.Background() + _, err := authTokenManager.GetToken(ctx, serverValid.URL, true, 60*time.Second, "test_client_id", "test_client_secret") + require.NoError(t, err) + + assert.False(t, authTokenManager.IsTokenExpired()) +} + +func TestAuthTokenManager_IsTokenExpiringSoon(t *testing.T) { + // Arrange + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + authTokenManager := NewAuthTokenManager(logger) + + // Test with no token + assert.True(t, authTokenManager.IsTokenExpiringSoon(2*time.Minute)) + + // Test with token expiring soon (within buffer) + soonExpiry := time.Now().Add(1 * time.Minute) // Will expire in 1 minute, buffer is 2 minutes + jwtTokenSoon := RawJWTWithClaims(map[string]any{ + "exp": soonExpiry.Unix(), + "iat": time.Now().Unix(), + }) + + serverSoon := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + response := TokenResponse{ + AccessToken: jwtTokenSoon, + MQTTURL: "mqtt://test.example.com:1883", + ExpiresIn: 60, + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(response) + })) + defer serverSoon.Close() + + ctx := context.Background() + _, err := authTokenManager.GetToken(ctx, serverSoon.URL, true, 60*time.Second, "test_client_id", "test_client_secret") + require.NoError(t, err) + + // With 2 minute buffer, token expiring in 1 minute should be considered expiring soon + // But note: GetToken applies a 5-minute buffer, so the actual expiry check is against (soonExpiry - 5 minutes) + // So if soonExpiry is 1 minute from now, tokenExpiresAt will be (1 minute - 5 minutes) = -4 minutes ago + // So IsTokenExpiringSoon(2 minutes) should return true + assert.True(t, authTokenManager.IsTokenExpiringSoon(2*time.Minute)) + + // Test with token not expiring soon + futureExpiry := time.Now().Add(1 * time.Hour) + jwtTokenFuture := RawJWTWithClaims(map[string]any{ + "exp": futureExpiry.Unix(), + "iat": time.Now().Unix(), + }) + + serverFuture := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + response := TokenResponse{ + AccessToken: jwtTokenFuture, + MQTTURL: "mqtt://test.example.com:1883", + ExpiresIn: 3600, + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(response) + })) + defer serverFuture.Close() + + authTokenManager2 := NewAuthTokenManager(logger) + _, err = authTokenManager2.GetToken(ctx, serverFuture.URL, true, 60*time.Second, "test_client_id", "test_client_secret") + require.NoError(t, err) + + // With 2 minute buffer, token expiring in ~55 minutes should not be considered expiring soon + assert.False(t, authTokenManager2.IsTokenExpiringSoon(2*time.Minute)) +} + +func TestAuthTokenManager_RefreshToken(t *testing.T) { + // Arrange + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + authTokenManager := NewAuthTokenManager(logger) + + // Test refresh without initial token + ctx := context.Background() + _, err := authTokenManager.RefreshToken(ctx) + assert.Error(t, err) + assert.Contains(t, err.Error(), "cannot refresh token") + + // Get initial token + futureExpiry := time.Now().Add(1 * time.Hour) + jwtToken := RawJWTWithClaims(map[string]any{ + "exp": futureExpiry.Unix(), + "iat": time.Now().Unix(), + }) + + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + response := TokenResponse{ + AccessToken: jwtToken, + MQTTURL: "mqtt://test.example.com:1883", + ExpiresIn: 3600, + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + _, err = authTokenManager.GetToken(ctx, server.URL, true, 60*time.Second, "test_client_id", "test_client_secret") + require.NoError(t, err) + + // Test refresh with stored credentials + newFutureExpiry := time.Now().Add(2 * time.Hour) + newJwtToken := RawJWTWithClaims(map[string]any{ + "exp": newFutureExpiry.Unix(), + "iat": time.Now().Unix(), + }) + + server2 := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + response := TokenResponse{ + AccessToken: newJwtToken, + MQTTURL: "mqtt://test.example.com:1883", + ExpiresIn: 7200, + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(response) + })) + defer server2.Close() + + // Update the token URL to point to new server + authTokenManager.tokenURL = server2.URL + + // Refresh token + token, err := authTokenManager.RefreshToken(ctx) + require.NoError(t, err) + assert.NotNil(t, token) + assert.Equal(t, newJwtToken, token.AccessToken) + + // Verify expiry time was updated + newExpiryTime := authTokenManager.GetTokenExpiryTime() + assert.WithinDuration(t, newFutureExpiry.Add(-5*time.Minute), newExpiryTime, 1*time.Second) +} diff --git a/agent/configmgr/fleet/connection.go b/agent/configmgr/fleet/connection.go index 6bc4871..3d1a95f 100644 --- a/agent/configmgr/fleet/connection.go +++ b/agent/configmgr/fleet/connection.go @@ -17,17 +17,21 @@ import ( // MQTTConnection manages the MQTT connection type MQTTConnection struct { - logger *slog.Logger - connectionManager *autopaho.ConnectionManager - heartbeater *heartbeater - messaging *Messaging - resetChan chan struct{} - onReadyHooks []func(cm *autopaho.ConnectionManager, topics TokenResponseTopics) - connectionTopics TokenResponseTopics + logger *slog.Logger + connectionManager *autopaho.ConnectionManager + heartbeater *heartbeater + messaging *Messaging + resetChan chan struct{} + onReadyHooks []func(cm *autopaho.ConnectionManager, topics TokenResponseTopics) + connectionTopics TokenResponseTopics + reconnectChan chan struct{} + capabilitiesFailCount int + groupMembershipFailCount int + heartbeatFailCount int } // NewMQTTConnection creates a new MQTTConnection -func NewMQTTConnection(logger *slog.Logger, pMgr policymgr.PolicyManager, resetChan chan struct{}, backendState backend.StateRetriever) *MQTTConnection { +func NewMQTTConnection(logger *slog.Logger, pMgr policymgr.PolicyManager, resetChan chan struct{}, reconnectChan chan struct{}, backendState backend.StateRetriever) *MQTTConnection { groupManager := newGroupManager() return &MQTTConnection{ connectionManager: nil, @@ -36,6 +40,7 @@ func NewMQTTConnection(logger *slog.Logger, pMgr policymgr.PolicyManager, resetC messaging: NewMessaging(logger, pMgr, resetChan, &groupManager), resetChan: resetChan, onReadyHooks: make([]func(cm *autopaho.ConnectionManager, topics TokenResponseTopics), 0), + reconnectChan: reconnectChan, } } @@ -103,7 +108,23 @@ func (connection *MQTTConnection) Connect(ctx context.Context, details Connectio } // start heartbeat loop bound to the same connection-level context - go connection.heartbeater.sendHeartbeats(ctx, func() {}, details.Topics.Heartbeat, details.ClientID, connection.publishToTopic) + go connection.heartbeater.sendHeartbeats(ctx, func() {}, details.Topics.Heartbeat, details.ClientID, connection.publishToTopic, func() { + // Track heartbeat failures + connection.heartbeatFailCount++ + connection.logger.Error("heartbeat publish failed", + "fail_count", connection.heartbeatFailCount) + + // After 5 consecutive failures, trigger reconnect + if connection.heartbeatFailCount >= 5 { + connection.logger.Warn("heartbeat publish failed 5 times, triggering JWT refresh and reconnect") + select { + case connection.reconnectChan <- struct{}{}: + default: + connection.logger.Debug("reconnect already in progress") + } + connection.heartbeatFailCount = 0 + } + }) connection.messaging.sendCapabilities(ctx, backends, labels, configFile, func(ctx context.Context, payload []byte) error { _, err := cm.Publish(ctx, &paho.Publish{ @@ -113,11 +134,26 @@ func (connection *MQTTConnection) Connect(ctx context.Context, details Connectio Retain: false, }) if err != nil { - // TODO: reconnect? - connection.logger.Error("failed to publish capabilities", "error", err) + connection.capabilitiesFailCount++ + connection.logger.Error("failed to publish capabilities", + "error", err, + "fail_count", connection.capabilitiesFailCount) + + // After 1 retry (2 failures), trigger reconnect + if connection.capabilitiesFailCount >= 2 { + connection.logger.Warn("capabilities publish failed twice, triggering JWT refresh and reconnect") + select { + case connection.reconnectChan <- struct{}{}: + default: + connection.logger.Debug("reconnect already in progress") + } + connection.capabilitiesFailCount = 0 + } return err } + // Reset counter on success + connection.capabilitiesFailCount = 0 connection.logger.Debug("capabilities sent", "topic", details.Topics.Capabilities, "payload", string(payload), @@ -135,9 +171,26 @@ func (connection *MQTTConnection) Connect(ctx context.Context, details Connectio Retain: false, }) if err != nil { - connection.logger.Error("failed to publish group memberships request", "error", err) + connection.groupMembershipFailCount++ + connection.logger.Error("failed to publish group memberships request", + "error", err, + "fail_count", connection.groupMembershipFailCount) + + // After 1 retry (2 failures), trigger reconnect + if connection.groupMembershipFailCount >= 2 { + connection.logger.Warn("group membership publish failed twice, triggering JWT refresh and reconnect") + select { + case connection.reconnectChan <- struct{}{}: + default: + connection.logger.Debug("reconnect already in progress") + } + connection.groupMembershipFailCount = 0 + } return err } + + // Reset counter on success + connection.groupMembershipFailCount = 0 return nil }) }, @@ -205,6 +258,37 @@ func (connection *MQTTConnection) Connect(ctx context.Context, details Connectio return nil } +// Reconnect reconnects to the MQTT broker with new connection details (e.g., refreshed JWT) +func (connection *MQTTConnection) Reconnect(ctx context.Context, details ConnectionDetails, backends map[string]backend.Backend, labels map[string]string, configFile string, timeout time.Duration) error { + connection.logger.Info("reconnecting to MQTT broker with refreshed credentials") + + // Disconnect the existing connection + if connection.connectionManager != nil { + disconnectCtx, cancel := context.WithTimeout(ctx, timeout) + connection.heartbeater.stop(details.Topics.Heartbeat, connection.publishToTopic) + err := connection.connectionManager.Disconnect(disconnectCtx) + cancel() + if err != nil { + connection.logger.Error("failed to disconnect during reconnect", "error", err) + // Continue anyway to try to establish new connection + } + } + + // Reset failure counters + connection.capabilitiesFailCount = 0 + connection.groupMembershipFailCount = 0 + connection.heartbeatFailCount = 0 + + // Connect with new details + err := connection.Connect(ctx, details, backends, labels, configFile) + if err != nil { + return fmt.Errorf("failed to connect during reconnect: %w", err) + } + + connection.logger.Info("successfully reconnected to MQTT broker") + return nil +} + // Disconnect disconnects from the MQTT broker func (connection *MQTTConnection) Disconnect(ctx context.Context, heartbeatTopic string) error { connection.heartbeater.stop(heartbeatTopic, connection.publishToTopic) @@ -239,5 +323,8 @@ func (connection *MQTTConnection) publishToTopic(ctx context.Context, topic stri connection.logger.Error("failed to publish to topic", "topic", topic, "error", err) return err } + // Reset heartbeat failure counter on successful publish + // (heartbeats use this function, so successful publish means connection is ok) + connection.heartbeatFailCount = 0 return nil } diff --git a/agent/configmgr/fleet/connection_hooks_test.go b/agent/configmgr/fleet/connection_hooks_test.go index 04baa3d..5733e07 100644 --- a/agent/configmgr/fleet/connection_hooks_test.go +++ b/agent/configmgr/fleet/connection_hooks_test.go @@ -32,7 +32,8 @@ func (noopBackendState) Get() map[string]*backend.State { return map[string]*bac func TestAddOnReadyHook_RegistersHook(t *testing.T) { logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) reset := make(chan struct{}, 1) - conn := NewMQTTConnection(logger, noopPM{}, reset, noopBackendState{}) + reconnect := make(chan struct{}, 1) + conn := NewMQTTConnection(logger, noopPM{}, reset, reconnect, noopBackendState{}) if len(conn.onReadyHooks) != 0 { t.Fatalf("expected 0 hooks initially, got %d", len(conn.onReadyHooks)) @@ -48,7 +49,8 @@ func TestAddOnReadyHook_RegistersHook(t *testing.T) { func TestConnect_StoresTopicsBeforeConnecting(t *testing.T) { logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) reset := make(chan struct{}, 1) - conn := NewMQTTConnection(logger, noopPM{}, reset, noopBackendState{}) + reconnect := make(chan struct{}, 1) + conn := NewMQTTConnection(logger, noopPM{}, reset, reconnect, noopBackendState{}) details := ConnectionDetails{ MQTTURL: "mqtt://localhost:1883", diff --git a/agent/configmgr/fleet/connection_test.go b/agent/configmgr/fleet/connection_test.go index 00d676c..682e20d 100644 --- a/agent/configmgr/fleet/connection_test.go +++ b/agent/configmgr/fleet/connection_test.go @@ -70,7 +70,8 @@ func TestFleetConfigManager_Connect_InvalidURL(t *testing.T) { logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) mockPMgr := &mockPolicyManagerForFleet{} resetChan := make(chan struct{}, 1) - connection := NewMQTTConnection(logger, mockPMgr, resetChan, &mockBackendState{}) + reconnectChan := make(chan struct{}, 1) + connection := NewMQTTConnection(logger, mockPMgr, resetChan, reconnectChan, &mockBackendState{}) // Act with invalid URL backends := make(map[string]backend.Backend) @@ -93,7 +94,8 @@ func TestFleetConfigManager_Connect_ValidURL(t *testing.T) { logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) mockPMgr := &mockPolicyManagerForFleet{} resetChan := make(chan struct{}, 1) - connection := NewMQTTConnection(logger, mockPMgr, resetChan, &mockBackendState{}) + reconnectChan := make(chan struct{}, 1) + connection := NewMQTTConnection(logger, mockPMgr, resetChan, reconnectChan, &mockBackendState{}) // Act with valid URL but don't expect successful connection // since we don't have a real MQTT server diff --git a/agent/configmgr/fleet/heartbeats.go b/agent/configmgr/fleet/heartbeats.go index 15343a0..0c8f60a 100644 --- a/agent/configmgr/fleet/heartbeats.go +++ b/agent/configmgr/fleet/heartbeats.go @@ -37,10 +37,10 @@ func newHeartbeater(logger *slog.Logger, backendState backend.StateRetriever, po func (hb *heartbeater) stop(heartbeatTopic string, publishFunc func(ctx context.Context, topic string, payload []byte) error) { hb.hbTicker.Stop() - hb.sendSingleHeartbeat(hb.heartbeatCtx, heartbeatTopic, publishFunc, "", time.Now(), messages.HeartbeatState(messages.Offline)) + hb.sendSingleHeartbeat(hb.heartbeatCtx, heartbeatTopic, publishFunc, "", time.Now(), messages.HeartbeatState(messages.Offline), nil) } -func (hb *heartbeater) sendSingleHeartbeat(ctx context.Context, heartbeatTopic string, publishFunc func(ctx context.Context, topic string, payload []byte) error, _ string, _ time.Time, _ messages.HeartbeatState) { +func (hb *heartbeater) sendSingleHeartbeat(ctx context.Context, heartbeatTopic string, publishFunc func(ctx context.Context, topic string, payload []byte) error, _ string, _ time.Time, _ messages.HeartbeatState, onFailure func()) { hbData := messages.Heartbeat{ SchemaVersion: messages.CurrentHeartbeatSchemaVersion, TimeStamp: time.Now().UTC(), @@ -58,6 +58,9 @@ func (hb *heartbeater) sendSingleHeartbeat(ctx context.Context, heartbeatTopic s if err := publishFunc(ctx, heartbeatTopic, body); err != nil { hb.logger.Error("error sending heartbeat", "error", err) + if onFailure != nil { + onFailure() + } } else { hb.logger.Debug("heartbeat sent", "payload", string(body)) } @@ -114,23 +117,23 @@ func (hb *heartbeater) getGroupState() map[string]messages.GroupStateInfo { // supplied context is cancelled. The cancelFunc parameter is ignored by the // implementation but is accepted for backward-compatibility with unit tests // that expect to pass it. -func (hb *heartbeater) sendHeartbeats(ctx context.Context, _ context.CancelFunc, heartbeatTopic string, agentID string, publishFunc func(ctx context.Context, topic string, payload []byte) error) { +func (hb *heartbeater) sendHeartbeats(ctx context.Context, _ context.CancelFunc, heartbeatTopic string, agentID string, publishFunc func(ctx context.Context, topic string, payload []byte) error, onFailure func()) { // Update our internal reference so other methods that read hb.heartbeatCtx // (if any) remain accurate. hb.heartbeatCtx = ctx hb.logger.Debug("start heartbeats routine") - hb.sendSingleHeartbeat(ctx, heartbeatTopic, publishFunc, agentID, time.Now(), messages.Online) + hb.sendSingleHeartbeat(ctx, heartbeatTopic, publishFunc, agentID, time.Now(), messages.Online, onFailure) for { select { case <-ctx.Done(): hb.logger.Debug("context done, stopping heartbeats routine") - hb.sendSingleHeartbeat(ctx, heartbeatTopic, publishFunc, agentID, time.Now(), messages.Offline) + hb.sendSingleHeartbeat(ctx, heartbeatTopic, publishFunc, agentID, time.Now(), messages.Offline, nil) hb.heartbeatCtx = nil return case t := <-hb.hbTicker.C: - hb.sendSingleHeartbeat(ctx, heartbeatTopic, publishFunc, agentID, t, messages.Online) + hb.sendSingleHeartbeat(ctx, heartbeatTopic, publishFunc, agentID, t, messages.Online, onFailure) } } } diff --git a/agent/configmgr/fleet/heartbeats_test.go b/agent/configmgr/fleet/heartbeats_test.go index 6ca2469..7b81e38 100644 --- a/agent/configmgr/fleet/heartbeats_test.go +++ b/agent/configmgr/fleet/heartbeats_test.go @@ -142,7 +142,7 @@ func TestHeartbeater_SendSingleHeartbeat_Success(t *testing.T) { mockPublish.On("Publish", ctx, testTopic, mock.AnythingOfType("[]uint8")).Return(nil) // Act - hb.sendSingleHeartbeat(ctx, testTopic, mockPublish.Publish, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, mockPublish.Publish, "test-agent-id", testTime, messages.Online, nil) // Assert: ensure one publish happened with a valid heartbeat payload calls := mockPublish.Calls @@ -172,7 +172,7 @@ func TestHeartbeater_SendSingleHeartbeat_PublishError(t *testing.T) { mockPublish.On("Publish", ctx, testTopic, mock.AnythingOfType("[]uint8")).Return(publishError) // Act - should not panic despite publish error - hb.sendSingleHeartbeat(ctx, testTopic, mockPublish.Publish, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, mockPublish.Publish, "test-agent-id", testTime, messages.Online, nil) // Assert mockPublish.AssertExpectations(t) @@ -194,7 +194,7 @@ func TestHeartbeater_SendSingleHeartbeat_HeartbeatContent(t *testing.T) { testTime := time.Now() // Act - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -225,7 +225,7 @@ func TestHeartbeater_SendHeartbeats_InitialHeartbeat(t *testing.T) { mockPublish.On("Publish", ctx, testTopic, mock.AnythingOfType("[]uint8")).Return(nil).Once() // Act - go hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", mockPublish.Publish) + go hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", mockPublish.Publish, nil) // Give some time for initial heartbeat time.Sleep(10 * time.Millisecond) @@ -254,7 +254,7 @@ func TestHeartbeater_SendHeartbeats_PeriodicHeartbeats(t *testing.T) { mockPublish.On("Publish", ctx, testTopic, mock.AnythingOfType("[]uint8")).Return(nil).Times(4) // Act - go hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", mockPublish.Publish) + go hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", mockPublish.Publish, nil) // Wait for some periodic heartbeats (ticker is 50ms in test) time.Sleep(120 * time.Millisecond) @@ -291,7 +291,7 @@ func TestHeartbeater_SendHeartbeats_ContextCancellation(t *testing.T) { // Act go func() { - hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", publishFunc) + hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", publishFunc, nil) done <- true }() @@ -328,7 +328,7 @@ func TestHeartbeater_SendHeartbeats_PublishErrors(t *testing.T) { mockPublish.On("Publish", ctx, testTopic, mock.AnythingOfType("[]uint8")).Return(publishError).Times(4) // Act - go hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", mockPublish.Publish) + go hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", mockPublish.Publish, nil) // Wait for some heartbeats with errors time.Sleep(120 * time.Millisecond) @@ -357,7 +357,7 @@ func TestHeartbeater_SendHeartbeats_ConcurrentCancellation(t *testing.T) { mockPublish.On("Publish", ctx, testTopic, mock.AnythingOfType("[]uint8")).Return(nil).Maybe() // Act - start heartbeats - go hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", mockPublish.Publish) + go hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", mockPublish.Publish, nil) // Cancel immediately in a separate goroutine go func() { @@ -402,7 +402,7 @@ func TestHeartbeater_SendHeartbeats_HeartbeatStates(t *testing.T) { // Act go func() { - hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", publishFunc) + hb.sendHeartbeats(ctx, cancel, testTopic, "test-agent-id", publishFunc, nil) done <- true }() @@ -557,7 +557,7 @@ func TestHeartbeater_SendSingleHeartbeat_WithBackendState(t *testing.T) { ctx := context.Background() // Act - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -611,7 +611,7 @@ func TestHeartbeater_SendSingleHeartbeat_WithoutBackendState(t *testing.T) { testTime := time.Now() // Act - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -641,7 +641,7 @@ func TestHeartbeater_SendSingleHeartbeat_WithEmptyBackendState(t *testing.T) { testTime := time.Now() // Act - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -696,7 +696,7 @@ func TestHeartbeater_SendSingleHeartbeat_BackendStateAllStatuses(t *testing.T) { testTime := time.Now() // Act - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -897,7 +897,7 @@ func TestHeartbeater_SendSingleHeartbeat_WithPolicyState(t *testing.T) { ctx := context.Background() // Act - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -959,7 +959,7 @@ func TestHeartbeater_SendSingleHeartbeat_WithPolicyStateError(t *testing.T) { testTime := time.Now() // Act - should not panic despite policy state error - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -994,7 +994,7 @@ func TestHeartbeater_SendSingleHeartbeat_WithEmptyPolicyState(t *testing.T) { testTime := time.Now() // Act - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -1065,7 +1065,7 @@ func TestHeartbeater_SendSingleHeartbeat_WithEmptyGroupState(t *testing.T) { testTime := time.Now() // Act - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -1104,7 +1104,7 @@ func TestHeartbeater_SendSingleHeartbeat_WithGroupState(t *testing.T) { testTime := time.Now() // Act - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -1188,7 +1188,7 @@ func TestHeartbeater_SendSingleHeartbeat_WithCompleteState(t *testing.T) { ctx := context.Background() // Act - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) // Assert require.NotNil(t, capturedPayload) @@ -1248,7 +1248,7 @@ func TestHeartbeater_SendSingleHeartbeat_GroupStateAfterRemoval(t *testing.T) { testTime := time.Now() // Send initial heartbeat with 2 groups - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) require.NotNil(t, capturedPayload) var heartbeat1 messages.Heartbeat @@ -1261,7 +1261,7 @@ func TestHeartbeater_SendSingleHeartbeat_GroupStateAfterRemoval(t *testing.T) { // Send second heartbeat after removal capturedPayload = nil - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) require.NotNil(t, capturedPayload) var heartbeat2 messages.Heartbeat @@ -1302,7 +1302,7 @@ func TestHeartbeater_SendSingleHeartbeat_GroupStateAfterRemoveAll(t *testing.T) testTime := time.Now() // Send initial heartbeat with 2 groups - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) require.NotNil(t, capturedPayload) var heartbeat1 messages.Heartbeat @@ -1315,7 +1315,7 @@ func TestHeartbeater_SendSingleHeartbeat_GroupStateAfterRemoveAll(t *testing.T) // Send second heartbeat after removal capturedPayload = nil - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) require.NotNil(t, capturedPayload) var heartbeat2 messages.Heartbeat @@ -1345,7 +1345,7 @@ func TestHeartbeater_SendSingleHeartbeat_DynamicGroupUpdates(t *testing.T) { testTime := time.Now() // Heartbeat 1: No groups - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) var hb1 messages.Heartbeat require.NoError(t, json.Unmarshal(capturedPayload, &hb1)) assert.Empty(t, hb1.GroupState) @@ -1354,7 +1354,7 @@ func TestHeartbeater_SendSingleHeartbeat_DynamicGroupUpdates(t *testing.T) { gm.Add(messages.GroupMembershipData{GroupID: "group-1", Name: "Group 1"}) // Heartbeat 2: 1 group - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) var hb2 messages.Heartbeat require.NoError(t, json.Unmarshal(capturedPayload, &hb2)) assert.Len(t, hb2.GroupState, 1) @@ -1363,7 +1363,7 @@ func TestHeartbeater_SendSingleHeartbeat_DynamicGroupUpdates(t *testing.T) { gm.Add(messages.GroupMembershipData{GroupID: "group-2", Name: "Group 2"}) // Heartbeat 3: 2 groups - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) var hb3 messages.Heartbeat require.NoError(t, json.Unmarshal(capturedPayload, &hb3)) assert.Len(t, hb3.GroupState, 2) @@ -1372,7 +1372,7 @@ func TestHeartbeater_SendSingleHeartbeat_DynamicGroupUpdates(t *testing.T) { gm.Remove("group-1") // Heartbeat 4: 1 group - hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online) + hb.sendSingleHeartbeat(ctx, testTopic, publishFunc, "test-agent-id", testTime, messages.Online, nil) var hb4 messages.Heartbeat require.NoError(t, json.Unmarshal(capturedPayload, &hb4)) assert.Len(t, hb4.GroupState, 1) diff --git a/agent/configmgr/fleet_test.go b/agent/configmgr/fleet_test.go index 29deed0..f375b5c 100644 --- a/agent/configmgr/fleet_test.go +++ b/agent/configmgr/fleet_test.go @@ -9,6 +9,7 @@ import ( "os" "strings" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -339,3 +340,342 @@ func TestFleetConfigManager_configToSafeString_DoesNotModifyOriginal(t *testing. assert.Equal(t, originalSecret, cfg.OrbAgent.ConfigManager.Sources.Fleet.ClientSecret, "original config should not be modified") } + +func TestFleetConfigManager_MonitorTokenExpiry_Configuration(t *testing.T) { + // Test that monitorTokenExpiry uses default values when config is not set + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + mockPMgr := &mockPolicyManagerForFleet{} + fleetManager := newFleetConfigManager(logger, mockPMgr, &mockBackendState{}) + + // Set config with no monitoring settings (should use defaults) + cfg := config.Config{ + OrbAgent: config.OrbAgent{ + ConfigManager: config.ManagerConfig{ + Sources: config.Sources{ + Fleet: config.FleetManager{ + TokenURL: "https://example.com/token", + ClientID: "test-client-id", + ClientSecret: "test-secret", + }, + }, + }, + }, + } + fleetManager.config = cfg + + // Initialize monitor context + fleetManager.monitorCtx, fleetManager.monitorCancel = context.WithCancel(context.Background()) + + // Start monitor in a goroutine + done := make(chan bool) + go func() { + fleetManager.monitorTokenExpiry() + done <- true + }() + + // Cancel monitor context to stop the monitor + fleetManager.monitorCancel() + + // Wait for monitor to stop + select { + case <-done: + // Monitor stopped successfully + case <-time.After(1 * time.Second): + t.Fatal("monitor did not stop within timeout") + } +} + +func TestFleetConfigManager_MonitorTokenExpiry_CustomConfiguration(t *testing.T) { + // Test that monitorTokenExpiry uses custom config values + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + mockPMgr := &mockPolicyManagerForFleet{} + fleetManager := newFleetConfigManager(logger, mockPMgr, &mockBackendState{}) + + // Set config with custom monitoring settings + checkInterval := 60 + reconnectBuffer := 180 + cfg := config.Config{ + OrbAgent: config.OrbAgent{ + ConfigManager: config.ManagerConfig{ + Sources: config.Sources{ + Fleet: config.FleetManager{ + TokenURL: "https://example.com/token", + ClientID: "test-client-id", + ClientSecret: "test-secret", + TokenExpiryCheckInterval: &checkInterval, + TokenReconnectBuffer: &reconnectBuffer, + }, + }, + }, + }, + } + fleetManager.config = cfg + + // Initialize monitor context + fleetManager.monitorCtx, fleetManager.monitorCancel = context.WithCancel(context.Background()) + + // Start monitor in a goroutine + done := make(chan bool) + go func() { + fleetManager.monitorTokenExpiry() + done <- true + }() + + // Cancel monitor context to stop the monitor + fleetManager.monitorCancel() + + // Wait for monitor to stop + select { + case <-done: + // Monitor stopped successfully + case <-time.After(1 * time.Second): + t.Fatal("monitor did not stop within timeout") + } +} + +func TestFleetConfigManager_Stop_CancelsMonitor(t *testing.T) { + // Test that Stop() properly cancels the token expiry monitor + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + mockPMgr := &mockPolicyManagerForFleet{} + fleetManager := newFleetConfigManager(logger, mockPMgr, &mockBackendState{}) + + // Initialize monitor context + fleetManager.monitorCtx, fleetManager.monitorCancel = context.WithCancel(context.Background()) + + // Verify monitor context is not cancelled initially + select { + case <-fleetManager.monitorCtx.Done(): + t.Fatal("monitor context should not be cancelled initially") + default: + // Good, context is not cancelled + } + + // Call Stop() + ctx := context.Background() + err := fleetManager.Stop(ctx) + + // Assert + require.NoError(t, err) + + // Verify monitor context is now cancelled + select { + case <-fleetManager.monitorCtx.Done(): + // Good, context is cancelled + case <-time.After(100 * time.Millisecond): + t.Fatal("monitor context should be cancelled after Stop()") + } +} + +func TestFleetConfigManager_Stop_HandlesNilMonitorCancel(t *testing.T) { + // Test that Stop() handles nil monitorCancel gracefully + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + mockPMgr := &mockPolicyManagerForFleet{} + fleetManager := newFleetConfigManager(logger, mockPMgr, &mockBackendState{}) + + // Don't initialize monitorCancel (should be nil) + fleetManager.monitorCancel = nil + + // Call Stop() - should not panic + ctx := context.Background() + err := fleetManager.Stop(ctx) + + // Assert + require.NoError(t, err) +} + +func TestFleetConfigManager_MonitorTokenExpiry_DetectsExpiredToken(t *testing.T) { + // Test that monitor detects expired token and triggers reconnection + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + mockPMgr := &mockPolicyManagerForFleet{} + fleetManager := newFleetConfigManager(logger, mockPMgr, &mockBackendState{}) + + // Set up config + cfg := config.Config{ + OrbAgent: config.OrbAgent{ + ConfigManager: config.ManagerConfig{ + Sources: config.Sources{ + Fleet: config.FleetManager{ + TokenURL: "https://example.com/token", + ClientID: "test-client-id", + ClientSecret: "test-secret", + }, + }, + }, + }, + } + fleetManager.config = cfg + + // Set up expired token by getting a token and then manually setting expiry to past + // We'll use a test server to get a token first + pastExpiry := time.Now().Add(-1 * time.Hour) + jwtToken := fleet.RawJWTWithClaims(map[string]any{ + "exp": pastExpiry.Unix(), + "iat": time.Now().Add(-2 * time.Hour).Unix(), + }) + + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + response := fleet.TokenResponse{ + AccessToken: jwtToken, + MQTTURL: "mqtt://test.example.com:1883", + ExpiresIn: 1, + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + ctx := context.Background() + _, err := fleetManager.authTokenManager.GetToken(ctx, server.URL, true, 60*time.Second, "test_client_id", "test_client_secret") + require.NoError(t, err) + + // Verify token is expired + assert.True(t, fleetManager.authTokenManager.IsTokenExpired()) + + // Create monitor context + fleetManager.monitorCtx, fleetManager.monitorCancel = context.WithCancel(context.Background()) + defer fleetManager.monitorCancel() + + // Start monitor with short check interval for testing + done := make(chan bool) + go func() { + // Use a very short check interval for testing + checkInterval := 100 * time.Millisecond + + ticker := time.NewTicker(checkInterval) + defer ticker.Stop() + + for { + select { + case <-fleetManager.monitorCtx.Done(): + done <- true + return + case <-ticker.C: + if fleetManager.authTokenManager.IsTokenExpired() { + // Signal reconnection + select { + case fleetManager.reconnectChan <- struct{}{}: + // Reconnection triggered + default: + // Channel full, skip + } + done <- true + return + } + } + } + }() + + // Wait for monitor to detect expired token + select { + case <-done: + // Monitor detected expired token + case <-time.After(500 * time.Millisecond): + t.Fatal("monitor did not detect expired token within timeout") + } + + // Verify reconnection was triggered + select { + case <-fleetManager.reconnectChan: + // Reconnection signal received + default: + t.Fatal("reconnection signal should have been sent") + } +} + +func TestFleetConfigManager_MonitorTokenExpiry_DetectsExpiringSoonToken(t *testing.T) { + // Test that monitor detects token expiring soon and triggers reconnection + logger := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelError})) + mockPMgr := &mockPolicyManagerForFleet{} + fleetManager := newFleetConfigManager(logger, mockPMgr, &mockBackendState{}) + + // Set up config + cfg := config.Config{ + OrbAgent: config.OrbAgent{ + ConfigManager: config.ManagerConfig{ + Sources: config.Sources{ + Fleet: config.FleetManager{ + TokenURL: "https://example.com/token", + ClientID: "test-client-id", + ClientSecret: "test-secret", + }, + }, + }, + }, + } + fleetManager.config = cfg + + // Set up token expiring soon (within buffer) by getting a token with soon expiry + soonExpiry := time.Now().Add(1 * time.Minute) // Expires in 1 minute + jwtToken := fleet.RawJWTWithClaims(map[string]any{ + "exp": soonExpiry.Unix(), + "iat": time.Now().Unix(), + }) + + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + response := fleet.TokenResponse{ + AccessToken: jwtToken, + MQTTURL: "mqtt://test.example.com:1883", + ExpiresIn: 60, + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + ctx := context.Background() + _, err := fleetManager.authTokenManager.GetToken(ctx, server.URL, true, 60*time.Second, "test_client_id", "test_client_secret") + require.NoError(t, err) + + // Verify token is expiring soon with 2 minute buffer + reconnectBuffer := 2 * time.Minute + assert.True(t, fleetManager.authTokenManager.IsTokenExpiringSoon(reconnectBuffer)) + + // Create monitor context + fleetManager.monitorCtx, fleetManager.monitorCancel = context.WithCancel(context.Background()) + defer fleetManager.monitorCancel() + + // Start monitor with short check interval for testing + done := make(chan bool) + go func() { + checkInterval := 100 * time.Millisecond + + ticker := time.NewTicker(checkInterval) + defer ticker.Stop() + + for { + select { + case <-fleetManager.monitorCtx.Done(): + done <- true + return + case <-ticker.C: + if fleetManager.authTokenManager.IsTokenExpiringSoon(reconnectBuffer) { + // Signal reconnection + select { + case fleetManager.reconnectChan <- struct{}{}: + // Reconnection triggered + default: + // Channel full, skip + } + done <- true + return + } + } + } + }() + + // Wait for monitor to detect expiring token + select { + case <-done: + // Monitor detected expiring token + case <-time.After(500 * time.Millisecond): + t.Fatal("monitor did not detect expiring token within timeout") + } + + // Verify reconnection was triggered + select { + case <-fleetManager.reconnectChan: + // Reconnection signal received + default: + t.Fatal("reconnection signal should have been sent") + } +} diff --git a/go.mod b/go.mod index bd1eae5..290e324 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/netboxlabs/orb-agent -go 1.24.4 +go 1.24.10 require ( github.com/eclipse/paho.golang v0.22.0 @@ -22,6 +22,9 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.14.0 go.opentelemetry.io/otel/sdk v1.38.0 go.opentelemetry.io/otel/sdk/log v0.14.0 + go.opentelemetry.io/proto/otlp v1.7.1 + google.golang.org/grpc v1.75.0 + google.golang.org/protobuf v1.36.8 gopkg.in/yaml.v3 v3.0.1 ) @@ -33,11 +36,13 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cloudflare/circl v1.6.1 // indirect + github.com/containerd/errdefs v1.0.0 // indirect + github.com/containerd/errdefs/pkg v0.3.0 // indirect github.com/containerd/log v0.1.0 // indirect github.com/cyphar/filepath-securejoin v0.4.1 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/distribution/reference v0.6.0 // indirect - github.com/docker/docker v28.0.0+incompatible // indirect + github.com/docker/docker v28.3.3+incompatible // indirect github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/emirpasic/gods v1.18.1 // indirect @@ -67,15 +72,17 @@ require ( github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect github.com/jonboulle/clockwork v0.4.0 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect - github.com/klauspost/compress v1.17.11 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect - github.com/moby/patternmatcher v0.5.0 // indirect - github.com/moby/sys/sequential v0.5.0 // indirect - github.com/moby/sys/user v0.3.0 // indirect + github.com/moby/go-archive v0.1.0 // indirect + github.com/moby/patternmatcher v0.6.0 // indirect + github.com/moby/sys/atomicwriter v0.1.0 // indirect + github.com/moby/sys/sequential v0.6.0 // indirect + github.com/moby/sys/user v0.4.0 // indirect github.com/moby/sys/userns v0.1.0 // indirect github.com/moby/term v0.5.2 // indirect github.com/morikuni/aec v1.0.0 // indirect @@ -100,7 +107,6 @@ require ( go.opentelemetry.io/otel/log v0.14.0 // indirect go.opentelemetry.io/otel/metric v1.38.0 // indirect go.opentelemetry.io/otel/trace v1.38.0 // indirect - go.opentelemetry.io/proto/otlp v1.7.1 // indirect go.uber.org/atomic v1.11.0 // indirect golang.org/x/crypto v0.41.0 // indirect golang.org/x/exp v0.0.0-20250531010427-b6e5de432a8b // indirect @@ -110,8 +116,5 @@ require ( golang.org/x/time v0.11.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 // indirect - google.golang.org/grpc v1.75.0 // indirect - google.golang.org/protobuf v1.36.8 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect - gotest.tools/v3 v3.5.2 // indirect ) diff --git a/go.sum b/go.sum index bcf4ace..896f96c 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ dario.cat/mergo v1.0.1 h1:Ra4+bf83h2ztPIQYNP99R6m+Y7KfnARDfID+a+vLl4s= dario.cat/mergo v1.0.1/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= -github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU= -github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6 h1:He8afgbRMd7mFxO99hRNu+6tazq8nFF9lIwo9JFroBk= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20240806141605-e8a1dd7889d6/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c h1:udKWzYgxTojEKWjV8V+WSxDXJ4NFATAsZjh8iIbsQIg= github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= github.com/Microsoft/go-winio v0.5.2/go.mod h1:WpS1mjBmmwHBEWmogvA2mj8546UReBk4v8QkMxJ6pZY= @@ -21,6 +21,10 @@ github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1x github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/cloudflare/circl v1.6.1 h1:zqIqSPIndyBh1bjLVVDHMPpVKqp8Su/V+6MeDzzQBQ0= github.com/cloudflare/circl v1.6.1/go.mod h1:uddAzsPgqdMAYatqJ0lsjX1oECcQLIlRpzZh3pJrofs= +github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI= +github.com/containerd/errdefs v1.0.0/go.mod h1:+YBYIdtsnF4Iw6nWZhJcqGSg/dwvV7tyJ/kCkyJ2k+M= +github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151Xdx3ZPPE= +github.com/containerd/errdefs/pkg v0.3.0/go.mod h1:NJw6s9HwNuRhnjJhM7pylWwMyAkmCQvQ4GpJHEqRLVk= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= @@ -32,8 +36,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= -github.com/docker/docker v28.0.0+incompatible h1:Olh0KS820sJ7nPsBKChVhk5pzqcwDR15fumfAd/p9hM= -github.com/docker/docker v28.0.0+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/docker v28.3.3+incompatible h1:Dypm25kh4rmk49v1eiVbsAtpAsYURjYkaKubwuBdxEI= +github.com/docker/docker v28.3.3+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= @@ -135,8 +139,8 @@ github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4 github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= -github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= @@ -158,12 +162,16 @@ github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 h1:BpfhmL github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= -github.com/moby/patternmatcher v0.5.0 h1:YCZgJOeULcxLw1Q+sVR636pmS7sPEn1Qo2iAN6M7DBo= -github.com/moby/patternmatcher v0.5.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc= -github.com/moby/sys/sequential v0.5.0 h1:OPvI35Lzn9K04PBbCLW0g4LcFAJgHsvXsRyewg5lXtc= -github.com/moby/sys/sequential v0.5.0/go.mod h1:tH2cOOs5V9MlPiXcQzRC+eEyab644PWKGRYaaV5ZZlo= -github.com/moby/sys/user v0.3.0 h1:9ni5DlcW5an3SvRSx4MouotOygvzaXbaSrc/wGDFWPo= -github.com/moby/sys/user v0.3.0/go.mod h1:bG+tYYYJgaMtRKgEmuueC0hJEAZWwtIbZTB+85uoHjs= +github.com/moby/go-archive v0.1.0 h1:Kk/5rdW/g+H8NHdJW2gsXyZ7UnzvJNOy6VKJqueWdcQ= +github.com/moby/go-archive v0.1.0/go.mod h1:G9B+YoujNohJmrIYFBpSd54GTUB4lt9S+xVQvsJyFuo= +github.com/moby/patternmatcher v0.6.0 h1:GmP9lR19aU5GqSSFko+5pRqHi+Ohk1O69aFiKkVGiPk= +github.com/moby/patternmatcher v0.6.0/go.mod h1:hDPoyOpDY7OrrMDLaYoY3hf52gNCR/YOUYxkhApJIxc= +github.com/moby/sys/atomicwriter v0.1.0 h1:kw5D/EqkBwsBFi0ss9v1VG3wIkVhzGvLklJ+w3A14Sw= +github.com/moby/sys/atomicwriter v0.1.0/go.mod h1:Ul8oqv2ZMNHOceF643P6FKPXeCmYtlQMvpizfsSoaWs= +github.com/moby/sys/sequential v0.6.0 h1:qrx7XFUd/5DxtqcoH1h438hF5TmOvzC/lspjy7zgvCU= +github.com/moby/sys/sequential v0.6.0/go.mod h1:uyv8EUTrca5PnDsdMGXhZe6CCe8U/UiTWd+lL+7b/Ko= +github.com/moby/sys/user v0.4.0 h1:jhcMKit7SA80hivmFJcbB1vqmw//wU61Zdui2eQXuMs= +github.com/moby/sys/user v0.4.0/go.mod h1:bG+tYYYJgaMtRKgEmuueC0hJEAZWwtIbZTB+85uoHjs= github.com/moby/sys/userns v0.1.0 h1:tVLXkFOxVu9A64/yh59slHVv9ahO9UIev4JZusOLG/g= github.com/moby/sys/userns v0.1.0/go.mod h1:IHUYgu/kao6N8YZlp9Cf444ySSvCmDlmzUcYfDHOl28= github.com/moby/term v0.5.2 h1:6qk3FJAFDs6i/q3W/pQ97SX192qKfZgGjCQqfCJkgzQ=