@@ -208,10 +208,26 @@ func (rm *resourceManager) customUpdateTable(
208208 }
209209 }
210210
211+ addedGSIs , updatedGSIs , removedGSIs := computeGlobalSecondaryIndexDelta (
212+ latest .ko .Spec .GlobalSecondaryIndexes ,
213+ desired .ko .Spec .GlobalSecondaryIndexes ,
214+ )
215+
216+ // Delete GSIs that have been removed first to avoid errors when updating table properties
217+ // where required values have not been set for removed GSIs.
218+ if delta .DifferentAt ("Spec.GlobalSecondaryIndexes" ) && len (removedGSIs ) > 0 {
219+ err = rm .deleteGSIs (ctx , desired , latest , removedGSIs )
220+ if err != nil {
221+ return nil , err
222+ }
223+ }
224+
225+ // If billing mode changing from PAY_PER_REQUEST to PROVISIONED, need to include all GSI updates. Otherwise,
226+ // need to perform GSI updates one at a time afterwards.
211227 if delta .DifferentAt ("Spec.BillingMode" ) ||
212228 delta .DifferentAt ("Spec.TableClass" ) || delta .DifferentAt ("Spec.DeletionProtectionEnabled" ) {
213- if err := rm .syncTable (ctx , desired , delta ); err != nil {
214- return nil , fmt . Errorf ( "cannot update table %v" , err )
229+ if err := rm .syncTable (ctx , desired , latest , delta ); err != nil {
230+ return nil , err
215231 }
216232 }
217233
@@ -229,25 +245,29 @@ func (rm *resourceManager) customUpdateTable(
229245 }
230246 }
231247
248+ // Update any GSIs that have been modified.
249+ if delta .DifferentAt ("Spec.GlobalSecondaryIndexes" ) && len (updatedGSIs ) > 0 {
250+ if err := rm .updateGSIs (ctx , desired , latest , updatedGSIs ); err != nil {
251+ return nil , err
252+ }
253+ }
254+
232255 // We want to update fast fields first
233256 // Then attributes
234257 // then GSI
235258 if delta .DifferentExcept ("Spec.Tags" , "Spec.TimeToLive" ) {
236259 switch {
237260 case delta .DifferentAt ("Spec.StreamSpecification" ):
238- if err := rm .syncTable (ctx , desired , delta ); err != nil {
261+ if err := rm .syncTable (ctx , desired , latest , delta ); err != nil {
239262 return nil , err
240263 }
241264 case delta .DifferentAt ("Spec.ProvisionedThroughput" ):
242265 if err := rm .syncTableProvisionedThroughput (ctx , desired ); err != nil {
243266 return nil , err
244267 }
245- case delta .DifferentAt ("Spec.GlobalSecondaryIndexes" ):
246- if err := rm .syncTableGlobalSecondaryIndexes (ctx , latest , desired ); err != nil {
247- if awsErr , ok := ackerr .AWSError (err ); ok &&
248- awsErr .ErrorCode () == "LimitExceededException" {
249- return nil , requeueWaitGSIReady
250- }
268+ // Create any new GSIs once all existing GSI have been updated.
269+ case delta .DifferentAt ("Spec.GlobalSecondaryIndexes" ) && len (addedGSIs ) > 0 :
270+ if err := rm .addGSIs (ctx , desired , latest , addedGSIs ); err != nil {
251271 return nil , err
252272 }
253273 case delta .DifferentAt ("Spec.TableReplicas" ):
@@ -271,74 +291,105 @@ func (rm *resourceManager) customUpdateTable(
271291// or SSE specification.
272292func (rm * resourceManager ) syncTable (
273293 ctx context.Context ,
274- r * resource ,
294+ desired * resource ,
295+ latest * resource ,
275296 delta * ackcompare.Delta ,
276297) (err error ) {
277298 rlog := ackrtlog .FromContext (ctx )
278299 exit := rlog .Trace ("rm.syncTable" )
279300 defer exit (err )
280301
281- input , err := rm .newUpdateTablePayload (ctx , r , delta )
302+ input , err := rm .newUpdateTablePayload (ctx , desired , latest , delta )
282303 if err != nil {
283304 return err
284305 }
285306 _ , err = rm .sdkapi .UpdateTable (ctx , input )
286307 rm .metrics .RecordAPICall ("UPDATE" , "UpdateTable" , err )
287308 if err != nil {
288- return err
309+ return fmt . Errorf ( "cannot update table %v" , err )
289310 }
311+ // If GSI update were included in the table update we need to requeue.
312+ if len (input .GlobalSecondaryIndexUpdates ) > 0 {
313+ return requeueWaitGSIReady
314+ }
315+
290316 return nil
291317}
292318
293319// newUpdateTablePayload constructs the updateTableInput object.
294320func (rm * resourceManager ) newUpdateTablePayload (
295321 ctx context.Context ,
296- r * resource ,
322+ desired * resource ,
323+ latest * resource ,
297324 delta * ackcompare.Delta ,
298325) (* svcsdk.UpdateTableInput , error ) {
299326 input := & svcsdk.UpdateTableInput {
300- TableName : aws .String (* r .ko .Spec .TableName ),
327+ TableName : aws .String (* desired .ko .Spec .TableName ),
301328 }
302-
329+ latestBillingMode := svcsdktypes . BillingMode ( * latest . ko . Spec . BillingMode )
303330 if delta .DifferentAt ("Spec.BillingMode" ) {
304- if r .ko .Spec .BillingMode != nil {
305- input .BillingMode = svcsdktypes .BillingMode (* r .ko .Spec .BillingMode )
331+ if desired .ko .Spec .BillingMode != nil {
332+ input .BillingMode = svcsdktypes .BillingMode (* desired .ko .Spec .BillingMode )
306333 } else {
307334 // set biling mode to the default value `PROVISIONED`
308335 input .BillingMode = svcsdktypes .BillingModeProvisioned
309336 }
337+
310338 if input .BillingMode == svcsdktypes .BillingModeProvisioned {
311339 input .ProvisionedThroughput = & svcsdktypes.ProvisionedThroughput {}
312- if r .ko .Spec .ProvisionedThroughput != nil {
313- if r .ko .Spec .ProvisionedThroughput .ReadCapacityUnits != nil {
340+ if desired .ko .Spec .ProvisionedThroughput != nil {
341+ if desired .ko .Spec .ProvisionedThroughput .ReadCapacityUnits != nil {
314342 input .ProvisionedThroughput .ReadCapacityUnits = aws .Int64 (
315- * r .ko .Spec .ProvisionedThroughput .ReadCapacityUnits ,
343+ * desired .ko .Spec .ProvisionedThroughput .ReadCapacityUnits ,
316344 )
317345 } else {
318346 input .ProvisionedThroughput .ReadCapacityUnits = aws .Int64 (0 )
319347 }
320348
321- if r .ko .Spec .ProvisionedThroughput .WriteCapacityUnits != nil {
349+ if desired .ko .Spec .ProvisionedThroughput .WriteCapacityUnits != nil {
322350 input .ProvisionedThroughput .WriteCapacityUnits = aws .Int64 (
323- * r .ko .Spec .ProvisionedThroughput .WriteCapacityUnits ,
351+ * desired .ko .Spec .ProvisionedThroughput .WriteCapacityUnits ,
324352 )
325353 } else {
326354 input .ProvisionedThroughput .WriteCapacityUnits = aws .Int64 (0 )
327355 }
328356 }
329357 }
358+
359+ // If billing mode is changing from PAY_PER_REQUEST to PROVISIONED we need to include all GSI updates
360+ if latestBillingMode == svcsdktypes .BillingModePayPerRequest && input .BillingMode == svcsdktypes .BillingModeProvisioned {
361+ _ , updatedGSIs , _ := computeGlobalSecondaryIndexDelta (
362+ latest .ko .Spec .GlobalSecondaryIndexes ,
363+ desired .ko .Spec .GlobalSecondaryIndexes ,
364+ )
365+
366+ // DynamoDB API fails if GSI updates are empty. Only set GlobalSecondaryIndexUpdates
367+ // if there are GSIs to update.
368+ if len (updatedGSIs ) > 0 {
369+ input .GlobalSecondaryIndexUpdates = []svcsdktypes.GlobalSecondaryIndexUpdate {}
370+ for _ , updatedGSI := range updatedGSIs {
371+ update := svcsdktypes.GlobalSecondaryIndexUpdate {
372+ Update : & svcsdktypes.UpdateGlobalSecondaryIndexAction {
373+ IndexName : aws .String (* updatedGSI .IndexName ),
374+ ProvisionedThroughput : newSDKProvisionedThroughput (updatedGSI .ProvisionedThroughput ),
375+ },
376+ }
377+ input .GlobalSecondaryIndexUpdates = append (input .GlobalSecondaryIndexUpdates , update )
378+ }
379+ }
380+ }
330381 }
331382 if delta .DifferentAt ("Spec.StreamSpecification" ) {
332- if r .ko .Spec .StreamSpecification != nil {
333- if r .ko .Spec .StreamSpecification .StreamEnabled != nil {
383+ if desired .ko .Spec .StreamSpecification != nil {
384+ if desired .ko .Spec .StreamSpecification .StreamEnabled != nil {
334385 input .StreamSpecification = & svcsdktypes.StreamSpecification {
335- StreamEnabled : aws .Bool (* r .ko .Spec .StreamSpecification .StreamEnabled ),
386+ StreamEnabled : aws .Bool (* desired .ko .Spec .StreamSpecification .StreamEnabled ),
336387 }
337388 // Only set streamViewType when streamSpefication is enabled and streamViewType is non-nil.
338- if * r .ko .Spec .StreamSpecification .StreamEnabled &&
339- r .ko .Spec .StreamSpecification .StreamViewType != nil {
389+ if * desired .ko .Spec .StreamSpecification .StreamEnabled &&
390+ desired .ko .Spec .StreamSpecification .StreamViewType != nil {
340391 input .StreamSpecification .StreamViewType = svcsdktypes .StreamViewType (
341- * r .ko .Spec .StreamSpecification .StreamViewType ,
392+ * desired .ko .Spec .StreamSpecification .StreamViewType ,
342393 )
343394 }
344395 } else {
@@ -349,13 +400,13 @@ func (rm *resourceManager) newUpdateTablePayload(
349400 }
350401 }
351402 if delta .DifferentAt ("Spec.TableClass" ) {
352- if r .ko .Spec .TableClass != nil {
353- input .TableClass = svcsdktypes .TableClass (* r .ko .Spec .TableClass )
403+ if desired .ko .Spec .TableClass != nil {
404+ input .TableClass = svcsdktypes .TableClass (* desired .ko .Spec .TableClass )
354405 }
355406 }
356407
357408 if delta .DifferentAt ("Spec.DeletionProtectionEnabled" ) {
358- input .DeletionProtectionEnabled = aws .Bool (* r .ko .Spec .DeletionProtectionEnabled )
409+ input .DeletionProtectionEnabled = aws .Bool (* desired .ko .Spec .DeletionProtectionEnabled )
359410 }
360411
361412 return input , nil
0 commit comments