Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .changelog/1763060740.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
applies_to:
- client
authors:
- annahay
references: []
breaking: false
new_feature: true
bug_fix: false
---
Add support for configurable token bucket success reward and fractional token management
2 changes: 1 addition & 1 deletion .github/workflows/manual-pull-request-bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
contents: read
needs:
- get-pr-info
runs-on: ubuntu-latest
runs-on: smithy_ubuntu-latest_8-core
steps:
- uses: GitHubSecurityLab/actions-permissions/monitor@v1
- uses: actions/checkout@v4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class RetryPartitionTest {
"RetryPartition" to RuntimeType.smithyRuntime(ctx.runtimeConfig).resolve("client::retries::RetryPartition"),
"RuntimeComponents" to RuntimeType.runtimeComponents(ctx.runtimeConfig),
"TokenBucket" to RuntimeType.smithyRuntime(ctx.runtimeConfig).resolve("client::retries::TokenBucket"),
"MAXIMUM_CAPACITY" to RuntimeType.smithyRuntime(ctx.runtimeConfig).resolve("client::retries::token_bucket::MAXIMUM_CAPACITY"),
)
crate.integrationTest("custom_retry_partition") {
tokioTest("test_custom_token_bucket") {
Expand All @@ -139,7 +140,8 @@ class RetryPartitionTest {
) -> Result<(), #{BoxError}> {
self.called.fetch_add(1, Ordering::Relaxed);
let token_bucket = cfg.load::<#{TokenBucket}>().unwrap();
let expected = format!("permits: {}", tokio::sync::Semaphore::MAX_PERMITS);
let max_capacity = #{MAXIMUM_CAPACITY};
let expected = format!("permits: {}", max_capacity);
assert!(
format!("{token_bucket:?}").contains(&expected),
"Expected debug output to contain `{expected}`, but got: {token_bucket:?}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,28 @@ impl StandardRetryStrategy {
Default::default()
}

fn release_retry_permit(&self) -> ReleaseResult {
fn release_retry_permit(&self, token_bucket: &TokenBucket) -> ReleaseResult {
let mut retry_permit = self.retry_permit.lock().unwrap();
match retry_permit.take() {
Some(p) => {
drop(p);
// Retry succeeded: reward success and forget permit if configured, otherwise release permit back
if token_bucket.success_reward() > 0.0 {
token_bucket.reward_success();
p.forget();
} else {
drop(p); // Original behavior - release back to bucket
}
APermitWasReleased
}
None => NoPermitWasReleased,
None => {
// First-attempt success: reward success or regenerate token
if token_bucket.success_reward() > 0.0 {
token_bucket.reward_success();
} else {
token_bucket.regenerate_a_token();
}
NoPermitWasReleased
}
}
}

Expand Down Expand Up @@ -210,15 +224,9 @@ impl RetryStrategy for StandardRetryStrategy {
.unwrap_or(false);
update_rate_limiter_if_exists(runtime_components, cfg, is_throttling_error);

// on success release any retry quota held by previous attempts
// on success release any retry quota held by previous attempts, reward success when indicated
if !ctx.is_failed() {
if let NoPermitWasReleased = self.release_retry_permit() {
// In the event that there was no retry permit to release, we generate new
// permits from nothing. We do this to make up for permits we had to "forget".
// Otherwise, repeated retries would empty the bucket and nothing could fill it
// back up again.
token_bucket.regenerate_a_token();
}
self.release_retry_permit(token_bucket);
}
// end bookkeeping

Expand Down
Loading
Loading