Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 112 additions & 25 deletions controllers/aga/globalaccelerator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,18 @@ package controllers
import (
"context"
"fmt"
"time"

"github.com/aws/aws-sdk-go-v2/service/globalaccelerator/types"
"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/services"
agadeploy "sigs.k8s.io/aws-load-balancer-controller/pkg/deploy/aga"
"sigs.k8s.io/aws-load-balancer-controller/pkg/shared_constants"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -42,6 +49,7 @@ import (
agamodel "sigs.k8s.io/aws-load-balancer-controller/pkg/model/aga"
"sigs.k8s.io/aws-load-balancer-controller/pkg/model/core"
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
agastatus "sigs.k8s.io/aws-load-balancer-controller/pkg/status/aga"
)

const (
Expand All @@ -52,24 +60,30 @@ const (
agaResourcesGroupVersion = "aga.k8s.aws/v1beta1"
globalAcceleratorKind = "GlobalAccelerator"

// Requeue constants for provisioning state monitoring
requeueMessage = "Monitoring provisioning state"
statusUpdateRequeueTime = 1 * time.Minute

// Metric stage constants
MetricStageFetchGlobalAccelerator = "fetch_globalAccelerator"
MetricStageAddFinalizers = "add_finalizers"
MetricStageBuildModel = "build_model"
MetricStageDeployStack = "deploy_stack"
MetricStageReconcileGlobalAccelerator = "reconcile_globalaccelerator"

// Metric error constants
MetricErrorAddFinalizers = "add_finalizers_error"
MetricErrorRemoveFinalizers = "remove_finalizers_error"
MetricErrorBuildModel = "build_model_error"
MetricErrorDeployStack = "deploy_stack_error"
MetricErrorReconcileGlobalAccelerator = "reconcile_globalaccelerator_error"
)

// NewGlobalAcceleratorReconciler constructs new globalAcceleratorReconciler
func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder record.EventRecorder, finalizerManager k8s.FinalizerManager, config config.ControllerConfig, logger logr.Logger, metricsCollector lbcmetrics.MetricCollector, reconcileCounters *metricsutil.ReconcileCounters) *globalAcceleratorReconciler {
func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder record.EventRecorder, finalizerManager k8s.FinalizerManager, config config.ControllerConfig, cloud services.Cloud, logger logr.Logger, metricsCollector lbcmetrics.MetricCollector, reconcileCounters *metricsutil.ReconcileCounters) *globalAcceleratorReconciler {

// Create tracking provider
trackingProvider := tracking.NewDefaultProvider(agaTagPrefix, config.ClusterName)
trackingProvider := tracking.NewDefaultProvider(agaTagPrefix, config.ClusterName, tracking.WithRegion(config.AWSConfig.Region))

// Create model builder
agaModelBuilder := aga.NewDefaultModelBuilder(
Expand All @@ -78,6 +92,7 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
trackingProvider,
config.FeatureGates,
config.ClusterName,
config.AWSConfig.Region,
config.DefaultTags,
config.ExternalManagedTags,
logger.WithName("aga-model-builder"),
Expand All @@ -87,17 +102,26 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
// Create stack marshaller
stackMarshaller := deploy.NewDefaultStackMarshaller()

// Create AGA stack deployer
stackDeployer := agadeploy.NewDefaultStackDeployer(cloud, config, trackingProvider, logger.WithName("aga-stack-deployer"), metricsCollector, controllerName)

// Create status updater
statusUpdater := agastatus.NewStatusUpdater(k8sClient, logger)

return &globalAcceleratorReconciler{
k8sClient: k8sClient,
eventRecorder: eventRecorder,
finalizerManager: finalizerManager,
logger: logger,
modelBuilder: agaModelBuilder,
stackMarshaller: stackMarshaller,
stackDeployer: stackDeployer,
statusUpdater: statusUpdater,
metricsCollector: metricsCollector,
reconcileTracker: reconcileCounters.IncrementAGA,

maxConcurrentReconciles: config.GlobalAcceleratorMaxConcurrentReconciles,
maxConcurrentReconciles: config.GlobalAcceleratorMaxConcurrentReconciles,
maxExponentialBackoffDelay: config.GlobalAcceleratorMaxExponentialBackoffDelay,
}
}

Expand All @@ -108,11 +132,14 @@ type globalAcceleratorReconciler struct {
finalizerManager k8s.FinalizerManager
modelBuilder aga.ModelBuilder
stackMarshaller deploy.StackMarshaller
stackDeployer agadeploy.StackDeployer
statusUpdater agastatus.StatusUpdater
logger logr.Logger
metricsCollector lbcmetrics.MetricCollector
reconcileTracker func(namespaceName types.NamespacedName)
reconcileTracker func(namespaceName ktypes.NamespacedName)

maxConcurrentReconciles int
maxConcurrentReconciles int
maxExponentialBackoffDelay time.Duration
}

// +kubebuilder:rbac:groups=aga.k8s.aws,resources=globalaccelerators,verbs=get;list;watch;patch
Expand Down Expand Up @@ -155,26 +182,19 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAccelerator(ctx context.Con
return ctrlerrors.NewErrorWithMetrics(controllerName, MetricErrorAddFinalizers, err, r.metricsCollector)
}

// TODO: Implement GlobalAccelerator resource management
// This would include:
// 1. Creating/updating AWS Global Accelerator
// 2. Managing listeners and endpoint groups
// 3. Handling endpoint discovery from Services/Ingresses/Gateways
reconcileResourceFn := func() {
err = r.reconcileGlobalAcceleratorResources(ctx, ga)
}
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageReconcileGlobalAccelerator, reconcileResourceFn)
if err != nil {
return ctrlerrors.NewErrorWithMetrics(controllerName, MetricErrorReconcileGlobalAccelerator, err, r.metricsCollector)
}

r.eventRecorder.Event(ga, corev1.EventTypeNormal, k8s.GlobalAcceleratorEventReasonSuccessfullyReconciled, "Successfully reconciled")
return nil
}

func (r *globalAcceleratorReconciler) cleanupGlobalAccelerator(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
if k8s.HasFinalizer(ga, shared_constants.GlobalAcceleratorFinalizer) {
// TODO: Implement cleanup logic for AWS Global Accelerator resources
// TODO: Implement cleanup logic for AWS Global Accelerator resources (Only cleaning up accelerator for now)
if err := r.cleanupGlobalAcceleratorResources(ctx, ga); err != nil {
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedCleanup, fmt.Sprintf("Failed cleanup due to %v", err))
return err
Expand Down Expand Up @@ -203,7 +223,7 @@ func (r *globalAcceleratorReconciler) buildModel(ctx context.Context, ga *agaapi
}

func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
r.logger.Info("Reconciling GlobalAccelerator resources", "name", ga.Name, "namespace", ga.Namespace)
r.logger.Info("Reconciling GlobalAccelerator resources", "globalAccelerator", k8s.NamespacedName(ga))
var stack core.Stack
var accelerator *agamodel.Accelerator
var err error
Expand All @@ -212,25 +232,91 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx co
}
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageBuildModel, buildModelFn)
if err != nil {
// Update status to indicate model building failure
if statusErr := r.statusUpdater.UpdateStatusFailure(ctx, ga, agadeploy.ModelBuildFailed, fmt.Sprintf("Failed to build model: %v", err)); statusErr != nil {
r.logger.Error(statusErr, "Failed to update GlobalAccelerator status after model build failure")
}
return ctrlerrors.NewErrorWithMetrics(controllerName, MetricErrorBuildModel, err, r.metricsCollector)
}

// Log the built model for debugging
r.logger.Info("Built model successfully", "accelerator", accelerator.ID(), "stackID", stack.StackID())
// Deploy the stack to create/update AWS Global Accelerator resources
deployStackFn := func() {
err = r.stackDeployer.Deploy(ctx, stack, r.metricsCollector, controllerName)
}
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageDeployStack, deployStackFn)
if err != nil {
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedDeploy, fmt.Sprintf("Failed to deploy stack due to %v", err))

// Update status to indicate deployment failure
if statusErr := r.statusUpdater.UpdateStatusFailure(ctx, ga, agadeploy.DeploymentFailed, fmt.Sprintf("Failed to deploy stack: %v", err)); statusErr != nil {
r.logger.Error(statusErr, "Failed to update GlobalAccelerator status after deployment failure")
}

return ctrlerrors.NewErrorWithMetrics(controllerName, MetricErrorDeployStack, err, r.metricsCollector)
}

r.logger.Info("Successfully deployed GlobalAccelerator stack", "stackID", stack.StackID())

// Update GlobalAccelerator status after successful deployment
requeueNeeded, err := r.statusUpdater.UpdateStatusSuccess(ctx, ga, accelerator)
if err != nil {
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedUpdateStatus, fmt.Sprintf("Failed update status due to %v", err))
return err
}
if requeueNeeded {
return ctrlerrors.NewRequeueNeededAfter(requeueMessage, statusUpdateRequeueTime)
}

// TODO: Implement the deploy phase
// This would include:
// 1. Deploy the stack to create/update AWS Global Accelerator resources
// 2. Update the GlobalAccelerator status with the created resources
// 3. Handle any deployment errors and update status accordingly
r.eventRecorder.Event(ga, corev1.EventTypeNormal, k8s.GlobalAcceleratorEventReasonSuccessfullyReconciled, "Successfully reconciled")

return nil
}

func (r *globalAcceleratorReconciler) cleanupGlobalAcceleratorResources(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
// TODO: Implement the actual AWS Global Accelerator resource cleanup
// This is a placeholder implementation
r.logger.Info("Cleaning up GlobalAccelerator resources", "name", ga.Name, "namespace", ga.Namespace)
r.logger.Info("Cleaning up GlobalAccelerator resources", "globalAccelerator", k8s.NamespacedName(ga))

// TODO we will handle cleaning up dependent resources when we implement those
// 1. Find the accelerator ARN from the CRD status
if ga.Status.AcceleratorARN == nil {
r.logger.Info("No accelerator ARN found in status, nothing to clean up", "globalAccelerator", k8s.NamespacedName(ga))
return nil
}

acceleratorARN := *ga.Status.AcceleratorARN
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a fallback mechanism. Suppose that we provision an accelerator but are unable to persist the ARN. If before the next reconcile run happens and the customer deletes the resource from the cluster, we will orphan the accelerator because of this line.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried implementing this after your suggestion but realized that since we will do look up based on tags fro finding the accelerator, we may end up getting multiple resources if the customer configures these resource tags on any other accelerator manually (very rare but they could) and we might delete the unwanted resource due to this tags error and disrupt the traffic. Since this is edge case, I don't think its worth to delete the orphaned resource instead of cleaning up a resource which might be serving traffic. What do you think?

if acceleratorARN == "" {
r.logger.Info("Empty accelerator ARN in status, nothing to clean up", "globalAccelerator", k8s.NamespacedName(ga))
return nil
}

// 2. Delete the accelerator using accelerator delete manager
acceleratorManager := r.stackDeployer.GetAcceleratorManager()
r.logger.Info("Deleting accelerator", "acceleratorARN", acceleratorARN, "globalAccelerator", k8s.NamespacedName(ga))

// Initialize reference to existing accelerator for deletion
acceleratorWithTags := agadeploy.AcceleratorWithTags{
Accelerator: &types.Accelerator{
AcceleratorArn: &acceleratorARN,
},
Tags: nil,
}

if err := acceleratorManager.Delete(ctx, acceleratorWithTags); err != nil {
// Check if it's an AcceleratorNotDisabledError
var notDisabledErr *agadeploy.AcceleratorNotDisabledError
if errors.As(err, &notDisabledErr) {
// Update status to indicate we're waiting for the accelerator to be disabled
if updateErr := r.statusUpdater.UpdateStatusDeletion(ctx, ga); updateErr != nil {
r.logger.Error(updateErr, "Failed to update status during accelerator deletion")
}
return ctrlerrors.NewRequeueNeeded("Waiting for accelerator to be disabled")
}

// Any other error
r.logger.Error(err, "Failed to delete accelerator", "acceleratorARN", acceleratorARN, "globalAccelerator", k8s.NamespacedName(ga))
return fmt.Errorf("failed to delete accelerator %s: %w", acceleratorARN, err)
}

r.logger.Info("Successfully cleaned up all GlobalAccelerator resources", "globalAccelerator", k8s.NamespacedName(ga))
return nil
}

Expand Down Expand Up @@ -259,6 +345,7 @@ func (r *globalAcceleratorReconciler) SetupWithManager(ctx context.Context, mgr
Named(controllerName).
WithOptions(controller.Options{
MaxConcurrentReconciles: r.maxConcurrentReconciles,
RateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](5*time.Second, r.maxExponentialBackoffDelay),
}).
Complete(r)
}
Expand Down
2 changes: 2 additions & 0 deletions docs/deploy/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ Currently, you can set only 1 namespace to watch in this flag. See [this Kuberne
| [sync-period](#sync-period) | duration | 10h0m0s | Period at which the controller forces the repopulation of its local object stores |
| targetgroupbinding-max-concurrent-reconciles | int | 3 | Maximum number of concurrently running reconcile loops for targetGroupBinding |
| targetgroupbinding-max-exponential-backoff-delay | duration | 16m40s | Maximum duration of exponential backoff for targetGroupBinding reconcile failures |
| globalaccelerator-max-concurrent-reconciles | int | 1 | Maximum number of concurrently running reconcile loops for GlobalAccelerator objects |
| globalaccelerator-max-exponential-backoff-delay | duration | 16m40s | Maximum duration of exponential backoff for GlobalAccelerator reconcile failures |
| [lb-stabilization-monitor-interval](#lb-stabilization-monitor-interval) | duration | 2m | Interval at which the controller monitors the state of load balancer after creation
| tolerate-non-existent-backend-service | boolean | true | Whether to allow rules which refer to backend services that do not exist (When enabled, it will return 503 error if backend service not exist) |
| tolerate-non-existent-backend-action | boolean | true | Whether to allow rules which refer to backend actions that do not exist (When enabled, it will return 503 error if backend action not exist) |
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/appmesh v1.27.7
github.com/aws/aws-sdk-go-v2/service/ec2 v1.173.0
github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 v1.51.0
github.com/aws/aws-sdk-go-v2/service/globalaccelerator v1.26.3
github.com/aws/aws-sdk-go-v2/service/resourcegroupstaggingapi v1.23.3
github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.31.7
github.com/aws/aws-sdk-go-v2/service/shield v1.27.3
Expand Down Expand Up @@ -147,6 +148,7 @@ require (
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/cast v1.7.0 // indirect
github.com/spf13/cobra v1.9.1 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.34.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ github.com/aws/aws-sdk-go-v2/service/ec2 v1.173.0 h1:ta62lid9JkIpKZtZZXSj6rP2AqY
github.com/aws/aws-sdk-go-v2/service/ec2 v1.173.0/go.mod h1:o6QDjdVKpP5EF0dp/VlvqckzuSDATr1rLdHt3A5m0YY=
github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 v1.51.0 h1:Zy1yjx+R6cR4pAwzFFJ8nWJh4ri8I44H76PDJ77tcJo=
github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2 v1.51.0/go.mod h1:RuZwE3p8IrWqK1kZhwH2TymlHLPuiI/taBMb8vrD39Q=
github.com/aws/aws-sdk-go-v2/service/globalaccelerator v1.26.3 h1:G8qcrur/MG4c7Wu+LMtpAPUSzmmaOa4ssHgYtefeJoo=
github.com/aws/aws-sdk-go-v2/service/globalaccelerator v1.26.3/go.mod h1:SJbyMV7JHSdKF1V0femihek4k7t2u5quWKiHzG8pihc=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3 h1:dT3MqvGhSoaIhRseqw2I0yH81l7wiR2vjs57O51EAm8=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.3/go.mod h1:GlAeCkHwugxdHaueRr4nhPuY+WW+gR8UjlcqzPr1SPI=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 h1:HGErhhrxZlQ044RiM+WdoZxp0p+EGM62y3L6pwA4olE=
Expand Down
6 changes: 6 additions & 0 deletions helm/aws-load-balancer-controller/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ spec:
{{- if .Values.targetgroupbindingMaxExponentialBackoffDelay }}
- --targetgroupbinding-max-exponential-backoff-delay={{ .Values.targetgroupbindingMaxExponentialBackoffDelay }}
{{- end }}
{{- if .Values.globalAcceleratorMaxConcurrentReconciles }}
- --globalaccelerator-max-concurrent-reconciles={{ .Values.globalAcceleratorMaxConcurrentReconciles }}
{{- end }}
{{- if .Values.globalAcceleratorMaxExponentialBackoffDelay }}
- --globalaccelerator-max-exponential-backoff-delay={{ .Values.globalAcceleratorMaxExponentialBackoffDelay }}
{{- end }}
{{- if .Values.lbStabilizationMonitorInterval }}
- --lb-stabilization-monitor-interval={{ .Values.lbStabilizationMonitorInterval }}
{{- end }}
Expand Down
6 changes: 6 additions & 0 deletions helm/aws-load-balancer-controller/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ targetgroupbindingMaxConcurrentReconciles:
# Maximum duration of exponential backoff for targetGroupBinding reconcile failures
targetgroupbindingMaxExponentialBackoffDelay:

# Maximum number of concurrently running reconcile loops for GlobalAccelerator objects
globalAcceleratorMaxConcurrentReconciles:

# Maximum duration of exponential backoff for GlobalAccelerator reconcile failures
globalAcceleratorMaxExponentialBackoffDelay:

# Interval at which the controller monitors the state of load balancer after creation for stabilization
lbStabilizationMonitorInterval:

Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,9 @@ func main() {
}

// Setup GlobalAccelerator controller only if enabled
if controllerCFG.FeatureGates.Enabled(config.AGAController) {
if shared_utils.IsAGAControllerEnabled(controllerCFG.FeatureGates, controllerCFG.AWSConfig.Region) {
agaReconciler := agacontroller.NewGlobalAcceleratorReconciler(mgr.GetClient(), mgr.GetEventRecorderFor("globalAccelerator"),
finalizerManager, controllerCFG, ctrl.Log.WithName("controllers").WithName("globalAccelerator"), lbcMetricsCollector, reconcileCounters)
finalizerManager, controllerCFG, cloud, ctrl.Log.WithName("controllers").WithName("globalAccelerator"), lbcMetricsCollector, reconcileCounters)
if err := agaReconciler.SetupWithManager(ctx, mgr, clientSet); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "GlobalAccelerator")
os.Exit(1)
Expand Down
Loading
Loading