-
Notifications
You must be signed in to change notification settings - Fork 335
Add timeout and retry logic to Azure token fetch #3113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 4 commits
b79d930
b0a2714
3ea3d64
25a70b2
13f1c04
cf3f6c0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -438,4 +438,46 @@ public static void enforceFeatureEnabledOrThrow( | |
| "If set to true (default), allow credential vending for external catalogs. Note this requires ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING to be true first.") | ||
| .defaultValue(true) | ||
| .buildFeatureConfiguration(); | ||
|
|
||
| public static final FeatureConfiguration<Integer> CLOUD_API_TIMEOUT_MILLIS = | ||
| PolarisConfiguration.<Integer>builder() | ||
| .key("CLOUD_API_TIMEOUT_MILLIS") | ||
| .description( | ||
| "Timeout in milliseconds for cloud provider API requests. " | ||
| + "Prevents indefinite blocking when cloud provider endpoints are slow or unresponsive. " | ||
| + "Used internally by storage integrations for credential vending and other cloud operations. " | ||
| + "Currently only used by Azure storage integration (not yet implemented for AWS S3 or GCP).") | ||
| .defaultValue(15000) | ||
| .buildFeatureConfiguration(); | ||
|
|
||
| public static final FeatureConfiguration<Integer> CLOUD_API_RETRY_COUNT = | ||
| PolarisConfiguration.<Integer>builder() | ||
| .key("CLOUD_API_RETRY_COUNT") | ||
| .description( | ||
| "Number of retry attempts for cloud provider API requests. " | ||
| + "Uses exponential backoff with jitter to handle transient failures. " | ||
| + "Currently only used by Azure storage integration (not yet implemented for AWS S3 or GCP).") | ||
| .defaultValue(3) | ||
| .buildFeatureConfiguration(); | ||
|
|
||
| public static final FeatureConfiguration<Integer> CLOUD_API_RETRY_DELAY_MILLIS = | ||
| PolarisConfiguration.<Integer>builder() | ||
| .key("CLOUD_API_RETRY_DELAY_MILLIS") | ||
| .description( | ||
| "Initial delay in milliseconds before first retry for cloud provider API requests. " | ||
| + "Delay doubles with each retry (exponential backoff). " | ||
| + "Currently only used by Azure storage integration (not yet implemented for AWS S3 or GCP).") | ||
| .defaultValue(2000) | ||
| .buildFeatureConfiguration(); | ||
|
|
||
| public static final FeatureConfiguration<Integer> CLOUD_API_RETRY_JITTER_MILLIS = | ||
|
||
| PolarisConfiguration.<Integer>builder() | ||
| .key("CLOUD_API_RETRY_JITTER_MILLIS") | ||
| .description( | ||
| "Maximum jitter in milliseconds added to retry delays for cloud provider API requests. " | ||
| + "Helps prevent thundering herd when multiple requests fail simultaneously. " | ||
| + "Actual jitter is random between 0 and this value. " | ||
| + "Currently only used by Azure storage integration (not yet implemented for AWS S3 or GCP).") | ||
| .defaultValue(500) | ||
| .buildFeatureConfiguration(); | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,10 @@ | |
| */ | ||
| package org.apache.polaris.core.storage.azure; | ||
|
|
||
| import static org.apache.polaris.core.config.FeatureConfiguration.CLOUD_API_RETRY_DELAY_MILLIS; | ||
| import static org.apache.polaris.core.config.FeatureConfiguration.CLOUD_API_RETRY_JITTER_MILLIS; | ||
| import static org.apache.polaris.core.config.FeatureConfiguration.CLOUD_API_RETRY_COUNT; | ||
| import static org.apache.polaris.core.config.FeatureConfiguration.CLOUD_API_TIMEOUT_MILLIS; | ||
| import static org.apache.polaris.core.config.FeatureConfiguration.STORAGE_CREDENTIAL_DURATION_SECONDS; | ||
|
|
||
| import com.azure.core.credential.AccessToken; | ||
|
|
@@ -39,6 +43,7 @@ | |
| import com.azure.storage.file.datalake.sas.PathSasPermission; | ||
| import com.google.common.annotations.VisibleForTesting; | ||
| import jakarta.annotation.Nonnull; | ||
| import java.time.Duration; | ||
| import java.time.Instant; | ||
| import java.time.OffsetDateTime; | ||
| import java.time.Period; | ||
|
|
@@ -55,6 +60,7 @@ | |
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
| import reactor.core.publisher.Mono; | ||
| import reactor.util.retry.Retry; | ||
|
|
||
| /** Azure credential vendor that supports generating SAS token */ | ||
| public class AzureCredentialsStorageIntegration | ||
|
|
@@ -120,7 +126,7 @@ public StorageAccessConfig getSubscopedCreds( | |
| OffsetDateTime.ofInstant( | ||
| start.plusSeconds(3600), ZoneOffset.UTC); // 1 hr to sync with AWS and GCP Access token | ||
|
|
||
| AccessToken accessToken = getAccessToken(config().getTenantId()); | ||
| AccessToken accessToken = getAccessToken(realmConfig, config().getTenantId()); | ||
| // Get user delegation key. | ||
| // Set the new generated user delegation key expiry to 7 days and minute 1 min | ||
| // Azure strictly requires the end time to be <= 7 days from the current time, -1 min to avoid | ||
|
|
@@ -312,16 +318,103 @@ private void validateAccountAndContainer( | |
| }); | ||
| } | ||
|
|
||
| private AccessToken getAccessToken(String tenantId) { | ||
| /** | ||
| * Fetches an Azure AD access token with timeout and retry logic to handle transient failures. | ||
| * | ||
| * <p>This access token is used internally to obtain a user delegation key from Azure Storage, | ||
| * which is then used to generate SAS tokens for client credential vending. | ||
| * | ||
| * <p>This method implements a defensive strategy against slow or failing cloud provider requests: | ||
| * | ||
| * <ul> | ||
| * <li>Per-attempt timeout (configurable via CLOUD_API_TIMEOUT_MILLIS, default 15000ms) | ||
| * <li>Exponential backoff retry (configurable count and initial delay via CLOUD_API_RETRY_COUNT | ||
| * and CLOUD_API_RETRY_DELAY_MILLIS, defaults: 3 attempts starting at 2000ms) | ||
| * <li>Jitter to prevent thundering herd (configurable via CLOUD_API_RETRY_JITTER_MILLIS, default 500ms) | ||
| * </ul> | ||
| * | ||
| * @param realmConfig the realm configuration to get timeout and retry settings | ||
| * @param tenantId the Azure tenant ID | ||
| * @return the access token | ||
| * @throws RuntimeException if token fetch fails after all retries or times out | ||
| */ | ||
| private AccessToken getAccessToken(RealmConfig realmConfig, String tenantId) { | ||
| int timeoutMillis = realmConfig.getConfig(CLOUD_API_TIMEOUT_MILLIS); | ||
| int retryCount = realmConfig.getConfig(CLOUD_API_RETRY_COUNT); | ||
| int initialDelayMillis = realmConfig.getConfig(CLOUD_API_RETRY_DELAY_MILLIS); | ||
| int jitterMillis = realmConfig.getConfig(CLOUD_API_RETRY_JITTER_MILLIS); | ||
| double jitter = jitterMillis / 1000.0; // Convert millis to fraction for jitter factor | ||
|
||
|
|
||
| String scope = "https://storage.azure.com/.default"; | ||
| AccessToken accessToken = | ||
| defaultAzureCredential | ||
| .getToken(new TokenRequestContext().addScopes(scope).setTenantId(tenantId)) | ||
| .timeout(Duration.ofMillis(timeoutMillis)) | ||
| .doOnError( | ||
| error -> | ||
| LOGGER.warn( | ||
| "Error fetching Azure access token for tenant {}: {}", | ||
| tenantId, | ||
| error.getMessage())) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we log the full stack trace instead of |
||
| .retryWhen( | ||
| Retry.backoff(retryCount, Duration.ofMillis(initialDelayMillis)) | ||
| .jitter(jitter) | ||
| .filter(this::isRetriableAzureException) | ||
| .doBeforeRetry( | ||
| retrySignal -> | ||
| LOGGER.info( | ||
| "Retrying Azure token fetch for tenant {} (attempt {}/{})", | ||
| tenantId, | ||
| retrySignal.totalRetries() + 1, | ||
| retryCount)) | ||
|
Comment on lines
+368
to
+369
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add 1 to retryCount? Sot it matches the word "attempt" here? The total attempt number would be 1 + retryCount: |
||
| .onRetryExhaustedThrow( | ||
| (retryBackoffSpec, retrySignal) -> | ||
| new RuntimeException( | ||
| String.format( | ||
| "Azure token fetch exhausted after %d attempts for tenant %s", | ||
| retrySignal.totalRetries(), tenantId), | ||
| retrySignal.failure()))) | ||
| .blockOptional() | ||
| .orElse(null); | ||
|
|
||
| if (accessToken == null) { | ||
| throw new RuntimeException("No access token fetched!"); | ||
| throw new RuntimeException( | ||
| String.format("Failed to fetch Azure access token for tenant %s", tenantId)); | ||
| } | ||
| return accessToken; | ||
| } | ||
|
|
||
| /** | ||
| * Determines if an exception is retriable for Azure token requests. | ||
| * | ||
| * <p>Retries are attempted for: | ||
| * | ||
| * <ul> | ||
| * <li>TimeoutException - per-attempt timeout exceeded | ||
| * <li>AADSTS50058 - Token endpoint timeout | ||
| * <li>AADSTS50078 - Service temporarily unavailable | ||
| * <li>AADSTS700084 - Token refresh required | ||
| * <li>503 - Service unavailable | ||
| * <li>429 - Too many requests (rate limited) | ||
| * </ul> | ||
| * | ||
| * @param throwable the exception to check | ||
| * @return true if the exception should trigger a retry | ||
| */ | ||
| private boolean isRetriableAzureException(Throwable throwable) { | ||
| // Retry on timeout exceptions | ||
| if (throwable instanceof java.util.concurrent.TimeoutException) { | ||
| return true; | ||
| } | ||
| // Retry on common transient Azure credential exceptions | ||
| String message = throwable.getMessage(); | ||
| if (message != null) { | ||
| return message.contains("AADSTS50058") // Token endpoint timeout | ||
| || message.contains("AADSTS50078") // Service temporarily unavailable | ||
| || message.contains("AADSTS700084") // Token refresh required | ||
| || message.contains("503") // Service unavailable | ||
| || message.contains("429"); // Too many requests | ||
| } | ||
| return false; | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CLOUD_API_TIMEOUT_MILLISmay over-communicate its scope. Would a more precise name likeSTORAGE_API_TIMEOUT_MILLISor evenAZURE_STORAGE_API_TIMEOUT_MILLISbe clearer?If s3/GCS/MinIO doesn't have similar configuration or cannot be applied with the same timeout, I'd suggest to have a prefix
AZURE_to make it clear.