Skip to content

Commit 8d854fd

Browse files
committed
Asynchronous rollback fixes for Scaling
Handle scaling in/out, Scaling up/down rollback fixes.
1 parent 26a1c9a commit 8d854fd

File tree

12 files changed

+389
-43
lines changed

12 files changed

+389
-43
lines changed

pkg/resource/replication_group/annotations.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,14 @@ const (
2222
// LogDeliveryConfigurationRequest structs passed in as input to either the create or modify API called most
2323
// recently
2424
AnnotationLastRequestedLDCs = svcapitypes.AnnotationPrefix + "last-requested-log-delivery-configurations"
25+
// AnnotationLastRequestedCNT is an annotation whose value is passed in as input to either the create or modify API
26+
// called most recently
27+
AnnotationLastRequestedCNT = svcapitypes.AnnotationPrefix + "last-requested-cache-node-type"
28+
// AnnotationLastRequestedNNG is an annotation whose value is passed in as input to either the create or modify API
29+
// called most recently
30+
AnnotationLastRequestedNNG = svcapitypes.AnnotationPrefix + "last-requested-num-node-groups"
31+
// AnnotationLastRequestedNGC is an annotation whose value is the marshaled list of pointers to
32+
// NodeGroupConfiguration structs passed in as input to either the create or modify API called most
33+
// recently
34+
AnnotationLastRequestedNGC = svcapitypes.AnnotationPrefix + "last-requested-num-group-configuration"
2535
)

pkg/resource/replication_group/custom_set_output.go

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package replication_group
1616
import (
1717
"context"
1818
"encoding/json"
19+
"strconv"
1920

2021
svcapitypes "github.com/aws-controllers-k8s/elasticache-controller/apis/v1alpha1"
2122
ackv1alpha1 "github.com/aws-controllers-k8s/runtime/apis/core/v1alpha1"
@@ -57,6 +58,8 @@ func (rm *resourceManager) CustomCreateReplicationGroupSetOutput(
5758
) (*svcapitypes.ReplicationGroup, error) {
5859
rm.customSetOutput(resp.ReplicationGroup, ko)
5960
rm.setAnnotationsFields(r, ko)
61+
rm.setLastRequestedNodeGroupConfiguration(r, ko)
62+
rm.setLastRequestedNumNodeGroups(r, ko)
6063
return ko, nil
6164
}
6265

@@ -270,20 +273,36 @@ func (rm *resourceManager) provideEvents(
270273

271274
// setAnnotationsFields copies the desired object's annotations, populates any
272275
// relevant fields, and sets the latest object's annotations to this newly populated map.
276+
// Fields that are handled by custom modify implementation are not set here.
273277
// This should only be called upon a successful create or modify call.
274278
func (rm *resourceManager) setAnnotationsFields(
275279
r *resource,
276280
ko *svcapitypes.ReplicationGroup,
277281
) {
282+
annotations := getAnnotationsFields(r, ko)
283+
284+
rm.setLastRequestedLogDeliveryConfigurations(r, annotations)
285+
rm.setLastRequestedCacheNodeType(r, annotations)
286+
ko.ObjectMeta.Annotations = annotations
287+
}
288+
289+
// getAnnotationsFields return the annotations map that would be used to set the fields
290+
func getAnnotationsFields(
291+
r *resource,
292+
ko *svcapitypes.ReplicationGroup) map[string]string {
293+
294+
if ko.ObjectMeta.Annotations != nil {
295+
return ko.ObjectMeta.Annotations
296+
}
297+
278298
desiredAnnotations := r.ko.ObjectMeta.GetAnnotations()
279299
annotations := make(map[string]string)
280300
for k, v := range desiredAnnotations {
281301
annotations[k] = v
282302
}
283303

284-
rm.setLastRequestedLogDeliveryConfigurations(r, annotations)
285-
286304
ko.ObjectMeta.Annotations = annotations
305+
return annotations
287306
}
288307

289308
// setLastRequestedLogDeliveryConfigurations copies desired.Spec.LogDeliveryConfigurations
@@ -300,3 +319,43 @@ func (rm *resourceManager) setLastRequestedLogDeliveryConfigurations(
300319
annotations[AnnotationLastRequestedLDCs] = string(lastRequestedConfigs)
301320
}
302321
}
322+
323+
// setLastRequestedCacheNodeType copies desired.Spec.CacheNodeType into the annotation
324+
// of the object.
325+
func (rm *resourceManager) setLastRequestedCacheNodeType(
326+
r *resource,
327+
annotations map[string]string,
328+
) {
329+
if r.ko.Spec.CacheNodeType != nil {
330+
annotations[AnnotationLastRequestedCNT] = *r.ko.Spec.CacheNodeType
331+
}
332+
}
333+
334+
// setLastRequestedNodeGroupConfiguration copies desired.spec.NodeGroupConfiguration into the
335+
// annotation of the object
336+
func (rm *resourceManager) setLastRequestedNodeGroupConfiguration(
337+
r *resource,
338+
ko *svcapitypes.ReplicationGroup,
339+
) {
340+
annotations := getAnnotationsFields(r, ko)
341+
lastRequestedConfigs, err := json.Marshal(r.ko.Spec.NodeGroupConfiguration)
342+
if err != nil {
343+
annotations[AnnotationLastRequestedNGC] = "null"
344+
} else {
345+
annotations[AnnotationLastRequestedNGC] = string(lastRequestedConfigs)
346+
}
347+
}
348+
349+
// setLastRequestedNumNodeGroups copies desired.spec.NumNodeGroups into the
350+
// annotation of the object
351+
func (rm *resourceManager) setLastRequestedNumNodeGroups(
352+
r *resource,
353+
ko *svcapitypes.ReplicationGroup,
354+
) {
355+
annotations := getAnnotationsFields(r, ko)
356+
if r.ko.Spec.NumNodeGroups != nil {
357+
annotations[AnnotationLastRequestedNNG] = strconv.Itoa(int(*r.ko.Spec.NumNodeGroups))
358+
} else {
359+
annotations[AnnotationLastRequestedNNG] = "null"
360+
}
361+
}

pkg/resource/replication_group/custom_update_api.go

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@ package replication_group
1515

1616
import (
1717
"context"
18+
"encoding/json"
1819
"fmt"
1920
"github.com/aws-controllers-k8s/runtime/pkg/requeue"
2021
"github.com/aws/aws-sdk-go/aws/awserr"
2122
"github.com/pkg/errors"
23+
"reflect"
2224
"sort"
25+
"strconv"
2326

2427
svcapitypes "github.com/aws-controllers-k8s/elasticache-controller/apis/v1alpha1"
2528
ackcompare "github.com/aws-controllers-k8s/runtime/pkg/compare"
@@ -70,6 +73,28 @@ func (rm *resourceManager) CustomModifyReplicationGroup(
7073
requeue.DefaultRequeueAfterDuration)
7174
}
7275

76+
// Handle the asynchronous rollback case for while Scaling down.
77+
// This means that we have already attempted to apply the CacheNodeType once and
78+
// were not successful hence we will set a terminal condition.
79+
if !cacheNodeTypeRequiresUpdate(desired) && delta.DifferentAt("Spec.CacheNodeType") {
80+
return nil, awserr.New("InvalidParameterCombination", "Cannot update CacheNodeType, "+
81+
"Please refer to Events for more details", nil)
82+
83+
}
84+
85+
// Handle the asynchronous rollback for Resharding.
86+
if !nodeGroupRequiresUpdate(desired) && rm.shardConfigurationsDiffer(desired, latest) {
87+
88+
return nil, awserr.New("InvalidParameterCombination", "Cannot update NodeGroups, "+
89+
"Please refer to Events for more details", nil)
90+
}
91+
92+
// Handle NodeGroupConfiguration asynchronous rollback situations other than Resharding.
93+
if !nodeGroupRequiresUpdate(desired) && (rm.replicaCountDifference(desired, latest) != 0 && !delta.DifferentAt("Spec.ReplicasPerNodeGroup")) {
94+
return nil, awserr.New("InvalidParameterCombination", "Cannot update NodeGroupConfiguration, "+
95+
"Please refer to Events for more details", nil)
96+
}
97+
7398
// Order of operations when diffs map to multiple updates APIs:
7499
// 1. When automaticFailoverEnabled differs:
75100
// if automaticFailoverEnabled == false; do nothing in this custom logic, let the modify execute first.
@@ -316,7 +341,18 @@ func (rm *resourceManager) updateShardConfiguration(
316341
rm.log.V(1).Info("Error during ModifyReplicationGroupShardConfiguration", "error", respErr)
317342
return nil, respErr
318343
}
319-
return rm.setReplicationGroupOutput(desired, resp.ReplicationGroup)
344+
345+
r, err := rm.setReplicationGroupOutput(desired, resp.ReplicationGroup)
346+
347+
if err != nil {
348+
return r, err
349+
}
350+
351+
ko := r.ko.DeepCopy()
352+
// Update the annotations since API call was successful
353+
rm.setLastRequestedNodeGroupConfiguration(desired, ko)
354+
rm.setLastRequestedNumNodeGroups(desired, ko)
355+
return &resource{ko}, nil
320356
}
321357

322358
// newIncreaseReplicaCountRequestPayload returns an SDK-specific struct for the HTTP request
@@ -691,3 +727,44 @@ func (rm *resourceManager) newModifyReplicationGroupRequestPayload(
691727

692728
return input
693729
}
730+
731+
// cacheNodeTypeRequiresUpdate retrieves the last requested cacheNodeType saved in annotations and compares them
732+
// to the current desired cacheNodeType
733+
func cacheNodeTypeRequiresUpdate(desired *resource) bool {
734+
annotations := desired.ko.ObjectMeta.GetAnnotations()
735+
if val, ok := annotations[AnnotationLastRequestedCNT]; ok && desired.ko.Spec.CacheNodeType != nil {
736+
return val != *desired.ko.Spec.CacheNodeType
737+
}
738+
739+
// This means there is delta and no value in annotation or in Spec
740+
return true
741+
}
742+
743+
// nodeGroupRequiresUpdate retrieves the last applied NumNodeGroups and NodeGroupConfiguration and compares them
744+
// to the current desired NumNodeGroups and NodeGroupConfiguration
745+
func nodeGroupRequiresUpdate(desired *resource) bool {
746+
annotations := desired.ko.ObjectMeta.GetAnnotations()
747+
748+
if val, ok := annotations[AnnotationLastRequestedNNG]; ok && val != "null" {
749+
numNodes, err := strconv.ParseInt(val, 10, 64)
750+
751+
if err != nil {
752+
return false
753+
}
754+
755+
if numNodes != *desired.ko.Spec.NumNodeGroups {
756+
return true
757+
}
758+
759+
return false
760+
}
761+
762+
desiredNodeGroupConfig := desired.ko.Spec.NodeGroupConfiguration
763+
if val, ok := annotations[AnnotationLastRequestedNGC]; ok && val != "null" {
764+
_ = json.Unmarshal([]byte(val), &desiredNodeGroupConfig)
765+
return !reflect.DeepEqual(desiredNodeGroupConfig, val)
766+
}
767+
768+
// This means there is delta and no value in annotation or in Spec
769+
return true
770+
}

pkg/resource/replication_group/custom_update_api_test.go

Lines changed: 74 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"go.uber.org/zap/zapcore"
2626
"path/filepath"
2727
ctrlrtzap "sigs.k8s.io/controller-runtime/pkg/log/zap"
28+
"strconv"
2829
"testing"
2930

3031
"github.com/stretchr/testify/assert"
@@ -290,8 +291,7 @@ func TestCustomModifyReplicationGroup(t *testing.T) {
290291
desired := provideResource()
291292
latest := provideResource()
292293
var delta ackcompare.Delta
293-
var ctx context.Context
294-
res, err := rm.CustomModifyReplicationGroup(ctx, desired, latest, &delta)
294+
res, err := rm.CustomModifyReplicationGroup(context.TODO(), desired, latest, &delta)
295295
assert.Nil(res)
296296
assert.Nil(err)
297297
})
@@ -306,8 +306,7 @@ func TestCustomModifyReplicationGroup_Unavailable(t *testing.T) {
306306
desired := provideResource()
307307
latest := provideResourceWithStatus("modifying")
308308
var delta ackcompare.Delta
309-
var ctx context.Context
310-
res, err := rm.CustomModifyReplicationGroup(ctx, desired, latest, &delta)
309+
res, err := rm.CustomModifyReplicationGroup(context.TODO(), desired, latest, &delta)
311310
assert.Nil(res)
312311
assert.NotNil(err)
313312
var requeueNeededAfter *requeue.RequeueNeededAfter
@@ -329,8 +328,7 @@ func TestCustomModifyReplicationGroup_NodeGroup_Unvailable(t *testing.T) {
329328
nodeGroup.Status = &unavailableStatus
330329
}
331330
var delta ackcompare.Delta
332-
var ctx context.Context
333-
res, err := rm.CustomModifyReplicationGroup(ctx, desired, latest, &delta)
331+
res, err := rm.CustomModifyReplicationGroup(context.TODO(), desired, latest, &delta)
334332
assert.Nil(res)
335333
assert.NotNil(err)
336334
var requeueNeededAfter *requeue.RequeueNeededAfter
@@ -357,9 +355,8 @@ func TestCustomModifyReplicationGroup_NodeGroup_MemberClusters_mismatch(t *testi
357355
nodeGroup.Status = &availableStatus
358356
}
359357
var delta ackcompare.Delta
360-
var ctx context.Context
361358
require.NotNil(latest.ko.Status.MemberClusters)
362-
res, err := rm.CustomModifyReplicationGroup(ctx, desired, latest, &delta)
359+
res, err := rm.CustomModifyReplicationGroup(context.TODO(), desired, latest, &delta)
363360
assert.Nil(res)
364361
assert.NotNil(err) // due to surplus member cluster
365362
var requeueNeededAfter *requeue.RequeueNeededAfter
@@ -384,14 +381,61 @@ func TestCustomModifyReplicationGroup_NodeGroup_available(t *testing.T) {
384381
nodeGroup.Status = &availableStatus
385382
}
386383
var delta ackcompare.Delta
387-
var ctx context.Context
388384
require.NotNil(latest.ko.Status.MemberClusters)
389-
res, err := rm.CustomModifyReplicationGroup(ctx, desired, latest, &delta)
385+
res, err := rm.CustomModifyReplicationGroup(context.TODO(), desired, latest, &delta)
390386
assert.Nil(res)
391387
assert.Nil(err)
392388
})
393389
}
394390

391+
func TestCustomModifyReplicationGroup_Scaling_Async_Rollback(t *testing.T) {
392+
assert := assert.New(t)
393+
t.Run("ScaleDownRollback=Diff", func(t *testing.T) {
394+
desired := provideResource()
395+
latest := provideResource()
396+
rgId := "RGID"
397+
desired.ko.Spec.ReplicationGroupID = &rgId
398+
latest.ko.Spec.ReplicationGroupID = &rgId
399+
desired.ko.ObjectMeta.Annotations = make(map[string]string)
400+
desiredCacheNodeType := "cache.t3.micro"
401+
currentCacheNodeType := "cache.m5.large"
402+
desired.ko.Annotations[AnnotationLastRequestedCNT] = desiredCacheNodeType
403+
desired.ko.Spec.CacheNodeType = &desiredCacheNodeType
404+
405+
rm := provideResourceManager()
406+
407+
var delta ackcompare.Delta
408+
delta.Add("Spec.CacheNodeType", currentCacheNodeType, desiredCacheNodeType)
409+
410+
res, err := rm.CustomModifyReplicationGroup(context.TODO(), desired, latest, &delta)
411+
assert.Nil(res)
412+
assert.NotNil(err)
413+
assert.Equal("InvalidParameterCombination: Cannot update CacheNodeType, Please refer to Events for more details", err.Error())
414+
})
415+
416+
t.Run("ScaleInRollback=Diff", func(t *testing.T) {
417+
desired := provideResource()
418+
latest := provideResource()
419+
rgId := "RGID"
420+
desired.ko.Spec.ReplicationGroupID = &rgId
421+
latest.ko.Spec.ReplicationGroupID = &rgId
422+
desired.ko.ObjectMeta.Annotations = make(map[string]string)
423+
424+
desiredNodeGroup := int64(4)
425+
currentNodeGroup := int64(3)
426+
desired.ko.Annotations[AnnotationLastRequestedNNG] = strconv.Itoa(int(desiredNodeGroup))
427+
desired.ko.Spec.NumNodeGroups = &desiredNodeGroup
428+
rm := provideResourceManager()
429+
430+
var delta ackcompare.Delta
431+
delta.Add("Spec.NumNodeGroups", currentNodeGroup, desiredNodeGroup)
432+
433+
res, err := rm.CustomModifyReplicationGroup(context.TODO(), desired, latest, &delta)
434+
assert.Nil(res)
435+
assert.NotNil(err)
436+
assert.Equal("InvalidParameterCombination: Cannot update NodeGroups, Please refer to Events for more details", err.Error())
437+
})
438+
}
395439
func TestCustomModifyReplicationGroup_ScaleUpAndDown_And_Resharding(t *testing.T) {
396440
assert := assert.New(t)
397441

@@ -402,16 +446,26 @@ func TestCustomModifyReplicationGroup_ScaleUpAndDown_And_Resharding(t *testing.T
402446
rgId := "RGID"
403447
desired.ko.Spec.ReplicationGroupID = &rgId
404448
latest.ko.Spec.ReplicationGroupID = &rgId
449+
desired.ko.ObjectMeta.Annotations = make(map[string]string)
450+
desiredCacheNodeType := "cache.m5.large"
451+
currentCacheNodeType := "cache.t3.small"
452+
desired.ko.Annotations[AnnotationLastRequestedCNT] = currentCacheNodeType
453+
desired.ko.Spec.CacheNodeType = &desiredCacheNodeType
405454

455+
desiredNodeGroup := int64(4)
456+
currentNodeGroup := int64(3)
457+
desired.ko.Annotations[AnnotationLastRequestedNNG] = strconv.Itoa(int(currentNodeGroup))
458+
desired.ko.Spec.NumNodeGroups = &desiredNodeGroup
459+
allowedNodeModifications := []*string{&desiredCacheNodeType}
460+
desired.ko.Status.AllowedScaleUpModifications = allowedNodeModifications
406461
mocksdkapi := &mocksvcsdkapi.ElastiCacheAPI{}
407462
rm := provideResourceManagerWithMockSDKAPI(mocksdkapi)
408463

409464
var delta ackcompare.Delta
410-
delta.Add("Spec.CacheNodeType", "cache.t3.small", "cache.m5.large")
411-
delta.Add("Spec.NumNodeGroups", 3, 4)
465+
delta.Add("Spec.CacheNodeType", currentCacheNodeType, desiredCacheNodeType)
466+
delta.Add("Spec.NumNodeGroups", currentNodeGroup, desiredNodeGroup)
412467

413-
var ctx context.Context
414-
res, err := rm.CustomModifyReplicationGroup(ctx, desired, latest, &delta)
468+
res, err := rm.CustomModifyReplicationGroup(context.TODO(), desired, latest, &delta)
415469
assert.Nil(res)
416470
assert.Nil(err)
417471
assert.Empty(mocksdkapi.Calls)
@@ -429,15 +483,19 @@ func TestCustomModifyReplicationGroup_ScaleUpAndDown_And_Resharding(t *testing.T
429483

430484
var delta ackcompare.Delta
431485
delta.Add("Spec.CacheNodeType", "cache.t3.small", "cache.t3.micro")
486+
desired.ko.ObjectMeta.Annotations = make(map[string]string)
487+
cacheNodeType := "cache.t3.small"
488+
desired.ko.Annotations[AnnotationLastRequestedCNT] = "cache.t3.micro"
489+
desired.ko.Spec.CacheNodeType = &cacheNodeType
432490
oldshardCount := int64(4)
433491
newShardCount := int64(10)
434492
delta.Add("Spec.NumNodeGroups", oldshardCount, newShardCount)
435493
desired.ko.Spec.NumNodeGroups = &newShardCount
436494
latest.ko.Spec.NumNodeGroups = &oldshardCount
495+
desired.ko.Annotations[AnnotationLastRequestedNNG] = strconv.Itoa(int(oldshardCount))
437496
mocksdkapi.On("ModifyReplicationGroupShardConfigurationWithContext", mock.Anything, mock.Anything).Return(nil,
438497
awserr.New("Invalid", "Invalid error", nil))
439-
var ctx context.Context
440-
res, err := rm.CustomModifyReplicationGroup(ctx, desired, latest, &delta)
498+
res, err := rm.CustomModifyReplicationGroup(context.TODO(), desired, latest, &delta)
441499
assert.Nil(res)
442500
assert.NotNil(err)
443501
assert.NotEmpty(mocksdkapi.Calls)

0 commit comments

Comments
 (0)