-
Notifications
You must be signed in to change notification settings - Fork 53
HTTP-182 HTTP sink retries #181
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 7 commits
c86109d
387c37c
356e224
f984210
392e1f8
16a90ee
e1131a6
a8e4821
d0ddbea
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -451,17 +451,25 @@ is provided. | |
|
|
||
| ## HTTP status code handler | ||
| ### Sink table | ||
| You can configure a list of HTTP status codes that should be treated as errors for HTTP sink table. | ||
jonathanlehto marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| By default all 400 and 500 response codes will be interpreted as error code. | ||
|
|
||
| This behavior can be changed by using below properties in table definition (DDL) or passing it via `setProperty' method from Sink's builder. The property name are: | ||
| - `gid.connector.http.sink.error.code` used to defined HTTP status code value that should be treated as error for example 404. | ||
| Many status codes can be defined in one value, where each code should be separated with comma, for example: | ||
| `401, 402, 403`. User can use this property also to define a type code mask. In that case, all codes from given HTTP response type will be treated as errors. | ||
| An example of such a mask would be `3XX, 4XX, 5XX`. In this case, all 300s, 400s and 500s status codes will be treated as errors. | ||
| - `gid.connector.http.sink.error.code.exclude` used to exclude a HTTP code from error list. | ||
| Many status codes can be defined in one value, where each code should be separated with comma, for example: | ||
| `401, 402, 403`. In this example, codes 401, 402 and 403 would not be interpreted as error codes. | ||
| You can configure HTTP status code handling for HTTP sink table and enable automatic retries with delivery guarantees. | ||
|
|
||
| #### Retries and delivery guarantee | ||
| HTTP Sink supports automatic retries when `sink.delivery-guarantee` is set to `at-least-once`. Failed requests will be automatically retried based on the configured status codes. | ||
|
||
| - When `sink.delivery-guarantee` is `at-least-once` (default): Failed requests are retried automatically using AIMD (Additive Increase Multiplicative Decrease) rate limiting strategy. | ||
| - When `sink.delivery-guarantee` is `none`: Failed requests are logged but not retried. | ||
|
|
||
| The sink categorizes HTTP responses into groups: | ||
| - Success codes (`gid.connector.http.sink.success-codes`): Expected successful responses. | ||
| - Retry codes (`gid.connector.http.sink.retry-codes`): Transient errors that trigger automatic retries when using `at-least-once` delivery guarantee. | ||
| - Ignored responses (`gid.connector.http.sink.ignored-response-codes`): Responses whose content is ignored but treated as successful. | ||
| - Error codes: Any response code not classified in the above groups. | ||
|
|
||
| Parameters support whitelisting and blacklisting: `2XX,404,!203` means all codes from 200-299, plus 404, except 203. | ||
|
|
||
| #### Legacy error code configuration | ||
| For backward compatibility, you can use the legacy properties: | ||
| - `gid.connector.http.sink.error.code` - HTTP status codes treated as errors (supports masks like `3XX, 4XX, 5XX`). | ||
| - `gid.connector.http.sink.error.code.exclude` - HTTP codes to exclude from the error list. | ||
|
|
||
| ### Source table | ||
| The source table categorizes HTTP responses into three groups based on status codes: | ||
|
|
@@ -612,13 +620,17 @@ be requested if the current time is later than the cached token expiry time minu | |
| | url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. | | ||
| | insert-method | optional | Specify which HTTP method to use in the request. The value should be set either to `POST` or `PUT`. | | ||
| | sink.batch.max-size | optional | Maximum number of elements that may be passed in a batch to be written downstream. | | ||
| | sink.delivery-guarantee | optional | Defines the delivery semantic for the HTTP sink. Accepted enumerations are 'at-least-once', and 'none' (actually 'none' is the same as 'at-most-once'). 'exactly-once' semantic is not supported. | | ||
| | sink.requests.max-inflight | optional | The maximum number of in flight requests that may exist, if any more in flight requests need to be initiated once the maximum has been reached, then it will be blocked until some have completed. | | ||
| | sink.requests.max-buffered | optional | Maximum number of buffered records before applying backpressure. | | ||
| | sink.flush-buffer.size | optional | The maximum size of a batch of entries that may be sent to the HTTP endpoint measured in bytes. | | ||
| | sink.flush-buffer.timeout | optional | Threshold time in milliseconds for an element to be in a buffer before being flushed. | | ||
| | gid.connector.http.sink.request-callback | optional | Specify which `HttpPostRequestCallback` implementation to use. By default, it is set to `slf4j-logger` corresponding to `Slf4jHttpPostRequestCallback`. | | ||
| | gid.connector.http.sink.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Sink, separated with comma. | | ||
| | gid.connector.http.sink.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.sink.error.code` list, separated with comma. | | ||
| | gid.connector.http.sink.error.code `DEPRECATED` | optional | List of HTTP status codes that should be treated as errors by HTTP Sink, separated with comma. | | ||
| | gid.connector.http.sink.error.code.exclude `DEPRECATED` | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.sink.error.code` list, separated with comma. | | ||
| | gid.connector.http.sink.success-codes | optional | Comma separated http codes considered as success response. Use [1-5]XX for groups and '!' character for excluding. Defaults are `1XX,2XX,3XX` | | ||
| | gid.connector.http.sink.retry-codes | optional | Comma separated http codes considered as transient errors that will trigger retries. Use [1-5]XX for groups and '!' character for excluding. Only used when `sink.delivery-guarantee` is set to `at-least-once`. Defaults are `500,503,504` | | ||
| | gid.connector.http.sink.ignored-response-codes | optional | Comma separated http codes. Content for these responses will be ignored. Use [1-5]XX for groups and '!' character for excluding. | | ||
| | gid.connector.http.security.cert.server | optional | Path to trusted HTTP server certificate that should be add to connectors key store. More than one path can be specified using `,` as path delimiter. | | ||
| | gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. | | ||
| | gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. | | ||
|
|
@@ -736,9 +748,6 @@ The mapping from Http Json Response to SQL table schema is done via Flink's Json | |
| ### HTTP TableLookup Source | ||
| - Check other `//TODO`'s. | ||
|
|
||
| ### HTTP Sink | ||
| - Make `HttpSink` retry the failed requests. Currently, it does not retry those at all, only adds their count to the `numRecordsSendErrors` metric. It should be thoroughly thought over how to do it efficiently and then implemented. | ||
|
|
||
| ### | ||
| [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/queries/joins/#lookup-join | ||
| </br> | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| package com.getindata.connectors.http; | ||
|
|
||
| import java.util.List; | ||
|
|
||
| import lombok.Getter; | ||
|
|
||
| import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest; | ||
|
|
||
| @Getter | ||
| public class BatchHttpStatusCodeValidationFailedException extends Exception { | ||
| private final List<HttpRequest> failedRequests; | ||
|
|
||
| public BatchHttpStatusCodeValidationFailedException(String message, List<HttpRequest> failedRequests) { | ||
| super(message); | ||
| this.failedRequests = failedRequests; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,31 +1,57 @@ | ||
| package com.getindata.connectors.http.internal; | ||
|
|
||
| import java.util.List; | ||
| import java.util.stream.Collectors; | ||
|
|
||
| import lombok.Data; | ||
| import lombok.NonNull; | ||
| import lombok.ToString; | ||
|
|
||
| import com.getindata.connectors.http.internal.config.ResponseItemStatus; | ||
| import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry; | ||
| import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest; | ||
|
|
||
| /** | ||
| * Data class holding {@link HttpSinkRequestEntry} instances that {@link SinkHttpClient} attempted | ||
| * to write, divided into two lists — successful and failed ones. | ||
| * to write. | ||
| */ | ||
| @Data | ||
| @ToString | ||
| public class SinkHttpClientResponse { | ||
|
|
||
| /** | ||
| * A list of successfully written requests. | ||
| * A list of requests along with write status. | ||
| */ | ||
| @NonNull | ||
| private final List<HttpRequest> successfulRequests; | ||
| private final List<ResponseItem> requests; | ||
|
|
||
| /** | ||
| * A list of requests that {@link SinkHttpClient} failed to write. | ||
| */ | ||
| @NonNull | ||
| private final List<HttpRequest> failedRequests; | ||
| private List<HttpRequest> getRequestByStatus(final ResponseItemStatus status) { | ||
| return requests.stream() | ||
| .filter(r -> r.getStatus().equals(status)) | ||
| .map(ResponseItem::getRequest) | ||
| .collect(Collectors.toList()); | ||
| } | ||
|
|
||
| public List<HttpRequest> getSuccessfulRequests() { | ||
| return getRequestByStatus(ResponseItemStatus.SUCCESS); | ||
| } | ||
|
|
||
| public List<HttpRequest> getFailedRequests() { | ||
| return getRequestByStatus(ResponseItemStatus.FAILURE); | ||
| } | ||
|
|
||
| public List<HttpRequest> getTemporalRequests() { | ||
| return getRequestByStatus(ResponseItemStatus.TEMPORAL); | ||
| } | ||
|
|
||
| public List<HttpRequest> getIgnoredRequests() { | ||
| return getRequestByStatus(ResponseItemStatus.IGNORE); | ||
| } | ||
|
|
||
| @Data | ||
| @ToString | ||
| public static class ResponseItem { | ||
| private final HttpRequest request; | ||
| private final ResponseItemStatus status; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| package com.getindata.connectors.http.internal.config; | ||
|
|
||
| public enum ResponseItemStatus { | ||
| SUCCESS("success"), | ||
| TEMPORAL("temporal"), | ||
| IGNORE("ignore"), | ||
| FAILURE("failure"); | ||
|
|
||
| private final String status; | ||
|
|
||
| ResponseItemStatus(String status) { | ||
| this.status = status; | ||
| } | ||
|
|
||
| public String getStatus() { | ||
| return status; | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.