Skip to content

Commit 2518efe

Browse files
committed
add aga deployer
1 parent 2f7d153 commit 2518efe

16 files changed

+2336
-28
lines changed

controllers/aga/globalaccelerator_controller.go

Lines changed: 112 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,18 @@ package controllers
1919
import (
2020
"context"
2121
"fmt"
22+
"time"
23+
24+
"github.com/aws/aws-sdk-go-v2/service/globalaccelerator/types"
2225
"github.com/go-logr/logr"
26+
"github.com/pkg/errors"
2327
corev1 "k8s.io/api/core/v1"
24-
"k8s.io/apimachinery/pkg/types"
28+
ktypes "k8s.io/apimachinery/pkg/types"
2529
"k8s.io/client-go/kubernetes"
2630
"k8s.io/client-go/tools/record"
31+
"k8s.io/client-go/util/workqueue"
32+
"sigs.k8s.io/aws-load-balancer-controller/pkg/aws/services"
33+
agadeploy "sigs.k8s.io/aws-load-balancer-controller/pkg/deploy/aga"
2734
"sigs.k8s.io/aws-load-balancer-controller/pkg/shared_constants"
2835
ctrl "sigs.k8s.io/controller-runtime"
2936
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -42,6 +49,7 @@ import (
4249
agamodel "sigs.k8s.io/aws-load-balancer-controller/pkg/model/aga"
4350
"sigs.k8s.io/aws-load-balancer-controller/pkg/model/core"
4451
"sigs.k8s.io/aws-load-balancer-controller/pkg/runtime"
52+
agastatus "sigs.k8s.io/aws-load-balancer-controller/pkg/status/aga"
4553
)
4654

4755
const (
@@ -52,24 +60,30 @@ const (
5260
agaResourcesGroupVersion = "aga.k8s.aws/v1beta1"
5361
globalAcceleratorKind = "GlobalAccelerator"
5462

63+
// Requeue constants for provisioning state monitoring
64+
requeueMessage = "Monitoring provisioning state"
65+
statusUpdateRequeueTime = 1 * time.Minute
66+
5567
// Metric stage constants
5668
MetricStageFetchGlobalAccelerator = "fetch_globalAccelerator"
5769
MetricStageAddFinalizers = "add_finalizers"
5870
MetricStageBuildModel = "build_model"
71+
MetricStageDeployStack = "deploy_stack"
5972
MetricStageReconcileGlobalAccelerator = "reconcile_globalaccelerator"
6073

6174
// Metric error constants
6275
MetricErrorAddFinalizers = "add_finalizers_error"
6376
MetricErrorRemoveFinalizers = "remove_finalizers_error"
6477
MetricErrorBuildModel = "build_model_error"
78+
MetricErrorDeployStack = "deploy_stack_error"
6579
MetricErrorReconcileGlobalAccelerator = "reconcile_globalaccelerator_error"
6680
)
6781

6882
// NewGlobalAcceleratorReconciler constructs new globalAcceleratorReconciler
69-
func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder record.EventRecorder, finalizerManager k8s.FinalizerManager, config config.ControllerConfig, logger logr.Logger, metricsCollector lbcmetrics.MetricCollector, reconcileCounters *metricsutil.ReconcileCounters) *globalAcceleratorReconciler {
83+
func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder record.EventRecorder, finalizerManager k8s.FinalizerManager, config config.ControllerConfig, cloud services.Cloud, logger logr.Logger, metricsCollector lbcmetrics.MetricCollector, reconcileCounters *metricsutil.ReconcileCounters) *globalAcceleratorReconciler {
7084

7185
// Create tracking provider
72-
trackingProvider := tracking.NewDefaultProvider(agaTagPrefix, config.ClusterName)
86+
trackingProvider := tracking.NewDefaultProvider(agaTagPrefix, config.ClusterName, tracking.WithRegion(config.AWSConfig.Region))
7387

7488
// Create model builder
7589
agaModelBuilder := aga.NewDefaultModelBuilder(
@@ -78,6 +92,7 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
7892
trackingProvider,
7993
config.FeatureGates,
8094
config.ClusterName,
95+
config.AWSConfig.Region,
8196
config.DefaultTags,
8297
config.ExternalManagedTags,
8398
logger.WithName("aga-model-builder"),
@@ -87,17 +102,26 @@ func NewGlobalAcceleratorReconciler(k8sClient client.Client, eventRecorder recor
87102
// Create stack marshaller
88103
stackMarshaller := deploy.NewDefaultStackMarshaller()
89104

105+
// Create AGA stack deployer
106+
stackDeployer := agadeploy.NewDefaultStackDeployer(cloud, config, trackingProvider, logger.WithName("aga-stack-deployer"), metricsCollector, controllerName)
107+
108+
// Create status updater
109+
statusUpdater := agastatus.NewStatusUpdater(k8sClient, logger)
110+
90111
return &globalAcceleratorReconciler{
91112
k8sClient: k8sClient,
92113
eventRecorder: eventRecorder,
93114
finalizerManager: finalizerManager,
94115
logger: logger,
95116
modelBuilder: agaModelBuilder,
96117
stackMarshaller: stackMarshaller,
118+
stackDeployer: stackDeployer,
119+
statusUpdater: statusUpdater,
97120
metricsCollector: metricsCollector,
98121
reconcileTracker: reconcileCounters.IncrementAGA,
99122

100-
maxConcurrentReconciles: config.GlobalAcceleratorMaxConcurrentReconciles,
123+
maxConcurrentReconciles: config.GlobalAcceleratorMaxConcurrentReconciles,
124+
maxExponentialBackoffDelay: config.GlobalAcceleratorMaxExponentialBackoffDelay,
101125
}
102126
}
103127

@@ -108,11 +132,14 @@ type globalAcceleratorReconciler struct {
108132
finalizerManager k8s.FinalizerManager
109133
modelBuilder aga.ModelBuilder
110134
stackMarshaller deploy.StackMarshaller
135+
stackDeployer agadeploy.StackDeployer
136+
statusUpdater agastatus.StatusUpdater
111137
logger logr.Logger
112138
metricsCollector lbcmetrics.MetricCollector
113-
reconcileTracker func(namespaceName types.NamespacedName)
139+
reconcileTracker func(namespaceName ktypes.NamespacedName)
114140

115-
maxConcurrentReconciles int
141+
maxConcurrentReconciles int
142+
maxExponentialBackoffDelay time.Duration
116143
}
117144

118145
// +kubebuilder:rbac:groups=aga.k8s.aws,resources=globalaccelerators,verbs=get;list;watch;patch
@@ -155,26 +182,19 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAccelerator(ctx context.Con
155182
return ctrlerrors.NewErrorWithMetrics(controllerName, MetricErrorAddFinalizers, err, r.metricsCollector)
156183
}
157184

158-
// TODO: Implement GlobalAccelerator resource management
159-
// This would include:
160-
// 1. Creating/updating AWS Global Accelerator
161-
// 2. Managing listeners and endpoint groups
162-
// 3. Handling endpoint discovery from Services/Ingresses/Gateways
163185
reconcileResourceFn := func() {
164186
err = r.reconcileGlobalAcceleratorResources(ctx, ga)
165187
}
166188
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageReconcileGlobalAccelerator, reconcileResourceFn)
167189
if err != nil {
168190
return ctrlerrors.NewErrorWithMetrics(controllerName, MetricErrorReconcileGlobalAccelerator, err, r.metricsCollector)
169191
}
170-
171-
r.eventRecorder.Event(ga, corev1.EventTypeNormal, k8s.GlobalAcceleratorEventReasonSuccessfullyReconciled, "Successfully reconciled")
172192
return nil
173193
}
174194

175195
func (r *globalAcceleratorReconciler) cleanupGlobalAccelerator(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
176196
if k8s.HasFinalizer(ga, shared_constants.GlobalAcceleratorFinalizer) {
177-
// TODO: Implement cleanup logic for AWS Global Accelerator resources
197+
// TODO: Implement cleanup logic for AWS Global Accelerator resources (Only cleaning up accelerator for now)
178198
if err := r.cleanupGlobalAcceleratorResources(ctx, ga); err != nil {
179199
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedCleanup, fmt.Sprintf("Failed cleanup due to %v", err))
180200
return err
@@ -203,7 +223,7 @@ func (r *globalAcceleratorReconciler) buildModel(ctx context.Context, ga *agaapi
203223
}
204224

205225
func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
206-
r.logger.Info("Reconciling GlobalAccelerator resources", "name", ga.Name, "namespace", ga.Namespace)
226+
r.logger.Info("Reconciling GlobalAccelerator resources", "globalAccelerator", k8s.NamespacedName(ga))
207227
var stack core.Stack
208228
var accelerator *agamodel.Accelerator
209229
var err error
@@ -212,25 +232,91 @@ func (r *globalAcceleratorReconciler) reconcileGlobalAcceleratorResources(ctx co
212232
}
213233
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageBuildModel, buildModelFn)
214234
if err != nil {
235+
// Update status to indicate model building failure
236+
if statusErr := r.statusUpdater.UpdateStatusFailure(ctx, ga, agadeploy.ModelBuildFailed, fmt.Sprintf("Failed to build model: %v", err)); statusErr != nil {
237+
r.logger.Error(statusErr, "Failed to update GlobalAccelerator status after model build failure")
238+
}
215239
return ctrlerrors.NewErrorWithMetrics(controllerName, MetricErrorBuildModel, err, r.metricsCollector)
216240
}
217241

218-
// Log the built model for debugging
219-
r.logger.Info("Built model successfully", "accelerator", accelerator.ID(), "stackID", stack.StackID())
242+
// Deploy the stack to create/update AWS Global Accelerator resources
243+
deployStackFn := func() {
244+
err = r.stackDeployer.Deploy(ctx, stack, r.metricsCollector, controllerName)
245+
}
246+
r.metricsCollector.ObserveControllerReconcileLatency(controllerName, MetricStageDeployStack, deployStackFn)
247+
if err != nil {
248+
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedDeploy, fmt.Sprintf("Failed to deploy stack due to %v", err))
249+
250+
// Update status to indicate deployment failure
251+
if statusErr := r.statusUpdater.UpdateStatusFailure(ctx, ga, agadeploy.DeploymentFailed, fmt.Sprintf("Failed to deploy stack: %v", err)); statusErr != nil {
252+
r.logger.Error(statusErr, "Failed to update GlobalAccelerator status after deployment failure")
253+
}
254+
255+
return ctrlerrors.NewErrorWithMetrics(controllerName, MetricErrorDeployStack, err, r.metricsCollector)
256+
}
257+
258+
r.logger.Info("Successfully deployed GlobalAccelerator stack", "stackID", stack.StackID())
259+
260+
// Update GlobalAccelerator status after successful deployment
261+
requeueNeeded, err := r.statusUpdater.UpdateStatusSuccess(ctx, ga, accelerator)
262+
if err != nil {
263+
r.eventRecorder.Event(ga, corev1.EventTypeWarning, k8s.GlobalAcceleratorEventReasonFailedUpdateStatus, fmt.Sprintf("Failed update status due to %v", err))
264+
return err
265+
}
266+
if requeueNeeded {
267+
return ctrlerrors.NewRequeueNeededAfter(requeueMessage, statusUpdateRequeueTime)
268+
}
220269

221-
// TODO: Implement the deploy phase
222-
// This would include:
223-
// 1. Deploy the stack to create/update AWS Global Accelerator resources
224-
// 2. Update the GlobalAccelerator status with the created resources
225-
// 3. Handle any deployment errors and update status accordingly
270+
r.eventRecorder.Event(ga, corev1.EventTypeNormal, k8s.GlobalAcceleratorEventReasonSuccessfullyReconciled, "Successfully reconciled")
226271

227272
return nil
228273
}
229274

230275
func (r *globalAcceleratorReconciler) cleanupGlobalAcceleratorResources(ctx context.Context, ga *agaapi.GlobalAccelerator) error {
231-
// TODO: Implement the actual AWS Global Accelerator resource cleanup
232-
// This is a placeholder implementation
233-
r.logger.Info("Cleaning up GlobalAccelerator resources", "name", ga.Name, "namespace", ga.Namespace)
276+
r.logger.Info("Cleaning up GlobalAccelerator resources", "globalAccelerator", k8s.NamespacedName(ga))
277+
278+
// TODO we will handle cleaning up dependent resources when we implement those
279+
// 1. Find the accelerator ARN from the CRD status
280+
if ga.Status.AcceleratorARN == nil {
281+
r.logger.Info("No accelerator ARN found in status, nothing to clean up", "globalAccelerator", k8s.NamespacedName(ga))
282+
return nil
283+
}
284+
285+
acceleratorARN := *ga.Status.AcceleratorARN
286+
if acceleratorARN == "" {
287+
r.logger.Info("Empty accelerator ARN in status, nothing to clean up", "globalAccelerator", k8s.NamespacedName(ga))
288+
return nil
289+
}
290+
291+
// 2. Delete the accelerator using accelerator delete manager
292+
acceleratorManager := r.stackDeployer.GetAcceleratorManager()
293+
r.logger.Info("Deleting accelerator", "acceleratorARN", acceleratorARN, "globalAccelerator", k8s.NamespacedName(ga))
294+
295+
// Initialize reference to existing accelerator for deletion
296+
acceleratorWithTags := agadeploy.AcceleratorWithTags{
297+
Accelerator: &types.Accelerator{
298+
AcceleratorArn: &acceleratorARN,
299+
},
300+
Tags: nil,
301+
}
302+
303+
if err := acceleratorManager.Delete(ctx, acceleratorWithTags); err != nil {
304+
// Check if it's an AcceleratorNotDisabledError
305+
var notDisabledErr *agadeploy.AcceleratorNotDisabledError
306+
if errors.As(err, &notDisabledErr) {
307+
// Update status to indicate we're waiting for the accelerator to be disabled
308+
if updateErr := r.statusUpdater.UpdateStatusDeletion(ctx, ga); updateErr != nil {
309+
r.logger.Error(updateErr, "Failed to update status during accelerator deletion")
310+
}
311+
return ctrlerrors.NewRequeueNeeded("Waiting for accelerator to be disabled")
312+
}
313+
314+
// Any other error
315+
r.logger.Error(err, "Failed to delete accelerator", "acceleratorARN", acceleratorARN, "globalAccelerator", k8s.NamespacedName(ga))
316+
return fmt.Errorf("failed to delete accelerator %s: %w", acceleratorARN, err)
317+
}
318+
319+
r.logger.Info("Successfully cleaned up all GlobalAccelerator resources", "globalAccelerator", k8s.NamespacedName(ga))
234320
return nil
235321
}
236322

@@ -259,6 +345,7 @@ func (r *globalAcceleratorReconciler) SetupWithManager(ctx context.Context, mgr
259345
Named(controllerName).
260346
WithOptions(controller.Options{
261347
MaxConcurrentReconciles: r.maxConcurrentReconciles,
348+
RateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[reconcile.Request](5*time.Second, r.maxExponentialBackoffDelay),
262349
}).
263350
Complete(r)
264351
}

main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ func main() {
236236
}
237237

238238
// Setup GlobalAccelerator controller only if enabled
239-
if controllerCFG.FeatureGates.Enabled(config.AGAController) {
239+
if shared_utils.IsAGAControllerEnabled(controllerCFG.FeatureGates, controllerCFG.AWSConfig.Region) {
240240
agaReconciler := agacontroller.NewGlobalAcceleratorReconciler(mgr.GetClient(), mgr.GetEventRecorderFor("globalAccelerator"),
241241
finalizerManager, controllerCFG, cloud, ctrl.Log.WithName("controllers").WithName("globalAccelerator"), lbcMetricsCollector, reconcileCounters)
242242
if err := agaReconciler.SetupWithManager(ctx, mgr, clientSet); err != nil {

0 commit comments

Comments
 (0)