1414package replication_group
1515
1616import (
17+ "context"
1718 "errors"
1819
20+ ackcompare "github.com/aws-controllers-k8s/runtime/pkg/compare"
1921 ackrequeue "github.com/aws-controllers-k8s/runtime/pkg/requeue"
22+ ackrtlog "github.com/aws-controllers-k8s/runtime/pkg/runtime/log"
23+ svcsdk "github.com/aws/aws-sdk-go/service/elasticache"
24+
25+ svcapitypes "github.com/aws-controllers-k8s/elasticache-controller/apis/v1alpha1"
2026)
2127
2228var (
29+ condMsgCurrentlyCreating string = "replication group currently being created."
2330 condMsgCurrentlyDeleting string = "replication group currently being deleted."
2431 condMsgNoDeleteWhileModifying string = "replication group currently being modified. cannot delete."
2532 condMsgTerminalCreateFailed string = "replication group in create-failed status."
2633)
2734
35+ const (
36+ statusDeleting string = "deleting"
37+ statusModifying string = "modifying"
38+ statusCreating string = "creating"
39+ statusCreateFailed string = "create-failed"
40+ )
41+
2842var (
2943 requeueWaitWhileDeleting = ackrequeue .NeededAfter (
3044 errors .New ("Delete is in progress." ),
3448 errors .New ("Modify is in progress." ),
3549 ackrequeue .DefaultRequeueAfterDuration ,
3650 )
51+ requeueWaitWhileTagUpdated = ackrequeue .NeededAfter (
52+ errors .New ("tags Update is in progress" ),
53+ ackrequeue .DefaultRequeueAfterDuration ,
54+ )
3755)
3856
3957// isDeleting returns true if supplied replication group resource state is 'deleting'
@@ -42,7 +60,7 @@ func isDeleting(r *resource) bool {
4260 return false
4361 }
4462 status := * r .ko .Status .Status
45- return status == "deleting"
63+ return status == statusDeleting
4664}
4765
4866// isModifying returns true if supplied replication group resource state is 'modifying'
@@ -51,7 +69,16 @@ func isModifying(r *resource) bool {
5169 return false
5270 }
5371 status := * r .ko .Status .Status
54- return status == "modifying"
72+ return status == statusModifying
73+ }
74+
75+ // isCreating returns true if supplied replication group resource state is 'modifying'
76+ func isCreating (r * resource ) bool {
77+ if r == nil || r .ko .Status .Status == nil {
78+ return false
79+ }
80+ status := * r .ko .Status .Status
81+ return status == statusCreating
5582}
5683
5784// isCreateFailed returns true if supplied replication group resource state is
@@ -61,5 +88,122 @@ func isCreateFailed(r *resource) bool {
6188 return false
6289 }
6390 status := * r .ko .Status .Status
64- return status == "create-failed"
91+ return status == statusCreateFailed
92+ }
93+
94+ // getTags retrieves the resource's associated tags
95+ func (rm * resourceManager ) getTags (
96+ ctx context.Context ,
97+ resourceARN string ,
98+ ) ([]* svcapitypes.Tag , error ) {
99+ resp , err := rm .sdkapi .ListTagsForResourceWithContext (
100+ ctx ,
101+ & svcsdk.ListTagsForResourceInput {
102+ ResourceName : & resourceARN ,
103+ },
104+ )
105+ rm .metrics .RecordAPICall ("GET" , "ListTagsForResource" , err )
106+ if err != nil {
107+ return nil , err
108+ }
109+ tags := make ([]* svcapitypes.Tag , 0 , len (resp .TagList ))
110+ for _ , tag := range resp .TagList {
111+ tags = append (tags , & svcapitypes.Tag {
112+ Key : tag .Key ,
113+ Value : tag .Value ,
114+ })
115+ }
116+ return tags , nil
117+ }
118+
119+ // syncTags keeps the resource's tags in sync
120+ //
121+ // NOTE(jaypipes): Elasticache's Tagging APIs differ from other AWS APIs in the
122+ // following ways:
123+ //
124+ // 1. The names of the tagging API operations are different. Other APIs use the
125+ // Tagris `ListTagsForResource`, `TagResource` and `UntagResource` API
126+ // calls. RDS uses `ListTagsForResource`, `AddTagsToResource` and
127+ // `RemoveTagsFromResource`.
128+ //
129+ // 2. Even though the name of the `ListTagsForResource` API call is the same,
130+ // the structure of the input and the output are different from other APIs.
131+ // For the input, instead of a `ResourceArn` field, Elasticache names the field
132+ // `ResourceName`, but actually expects an ARN, not the replication group
133+ // name. This is the same for the `AddTagsToResource` and
134+ // `RemoveTagsFromResource` input shapes. For the output shape, the field is
135+ // called `TagList` instead of `Tags` but is otherwise the same struct with
136+ // a `Key` and `Value` member field.
137+ func (rm * resourceManager ) syncTags (
138+ ctx context.Context ,
139+ desired * resource ,
140+ latest * resource ,
141+ ) (err error ) {
142+ rlog := ackrtlog .FromContext (ctx )
143+ exit := rlog .Trace ("rm.syncTags" )
144+ defer func () { exit (err ) }()
145+
146+ arn := (* string )(latest .ko .Status .ACKResourceMetadata .ARN )
147+
148+ from := ToACKTags (latest .ko .Spec .Tags )
149+ to := ToACKTags (desired .ko .Spec .Tags )
150+
151+ added , _ , removed := ackcompare .GetTagsDifference (from , to )
152+
153+ // NOTE(jaypipes): According to the elasticache API documentation, adding a tag
154+ // with a new value overwrites any existing tag with the same key. So, we
155+ // don't need to do anything to "update" a Tag. Simply including it in the
156+ // AddTagsToResource call is enough.
157+ for key := range removed {
158+ if _ , ok := added [key ]; ok {
159+ delete (removed , key )
160+ }
161+ }
162+
163+ // Modify tags causing the replication group to be modified and become unavailable temporarily
164+ // so after adding or removing tags, we have to wait for the replication group to be available again
165+ // process: add tags -> requeue -> remove tags -> requeue -> other update
166+ if len (added ) > 0 {
167+ toAdd := make ([]* svcsdk.Tag , 0 , len (added ))
168+ for key , val := range added {
169+ key , val := key , val
170+ toAdd = append (toAdd , & svcsdk.Tag {
171+ Key : & key ,
172+ Value : & val ,
173+ })
174+ }
175+
176+ rlog .Debug ("adding tags to replication group" , "tags" , added )
177+ _ , err = rm .sdkapi .AddTagsToResourceWithContext (
178+ ctx ,
179+ & svcsdk.AddTagsToResourceInput {
180+ ResourceName : arn ,
181+ Tags : toAdd ,
182+ },
183+ )
184+ rm .metrics .RecordAPICall ("UPDATE" , "AddTagsToResource" , err )
185+ if err != nil {
186+ return err
187+ }
188+ } else if len (removed ) > 0 {
189+ toRemove := make ([]* string , 0 , len (removed ))
190+ for key := range removed {
191+ key := key
192+ toRemove = append (toRemove , & key )
193+ }
194+ rlog .Debug ("removing tags from replication group" , "tags" , removed )
195+ _ , err = rm .sdkapi .RemoveTagsFromResourceWithContext (
196+ ctx ,
197+ & svcsdk.RemoveTagsFromResourceInput {
198+ ResourceName : arn ,
199+ TagKeys : toRemove ,
200+ },
201+ )
202+ rm .metrics .RecordAPICall ("UPDATE" , "RemoveTagsFromResource" , err )
203+ if err != nil {
204+ return err
205+ }
206+ }
207+
208+ return requeueWaitWhileTagUpdated
65209}
0 commit comments