Skip to content

Commit ed3647c

Browse files
committed
HTTP client proxy
1 parent 3ee0be4 commit ed3647c

File tree

4 files changed

+99
-1
lines changed

4 files changed

+99
-1
lines changed

README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,26 @@ Uses a [OkHttp](https://square.github.io/okhttp/) client.
274274
> Time to live for the connection
275275
> * Type: `Long`
276276
> * Default: `300000`
277+
>
278+
> ##### `http.client.proxy.host`
279+
> Hostname of the HTTP Proxy
280+
> * Type: `String`
281+
> * Default: ``
282+
>
283+
> ##### `http.client.proxy.port`
284+
> Port of the HTTP Proxy
285+
> * Type: `Integer`
286+
> * Default: `3128`
287+
>
288+
> ##### `http.client.proxy.username`
289+
> Username of the HTTP Proxy
290+
> * Type: `String`
291+
> * Default: ``
292+
>
293+
> ##### `http.client.proxy.password`
294+
> Password of the HTTP Proxy
295+
> * Type: `String`
296+
> * Default: ``
277297
---
278298
<a name="auth"/>
279299

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/client/okhttp/OkHttpClient.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.github.castorm.kafka.connect.http.model.HttpResponse;
2727
import lombok.SneakyThrows;
2828
import lombok.extern.slf4j.Slf4j;
29+
import okhttp3.Authenticator;
2930
import okhttp3.Call;
3031
import okhttp3.ConnectionPool;
3132
import okhttp3.HttpUrl;
@@ -35,18 +36,24 @@
3536
import okhttp3.logging.HttpLoggingInterceptor;
3637

3738
import java.io.IOException;
39+
import java.net.InetSocketAddress;
40+
import java.net.Proxy;
3841
import java.util.List;
3942
import java.util.Map;
4043
import java.util.Optional;
4144

45+
import static java.net.Proxy.NO_PROXY;
46+
import static java.net.Proxy.Type.HTTP;
4247
import static java.util.Optional.empty;
4348
import static java.util.concurrent.TimeUnit.MILLISECONDS;
4449
import static javax.ws.rs.core.HttpHeaders.AUTHORIZATION;
50+
import static okhttp3.Credentials.basic;
4551
import static okhttp3.HttpUrl.parse;
4652
import static okhttp3.RequestBody.create;
4753
import static okhttp3.logging.HttpLoggingInterceptor.Level.BASIC;
4854
import static okhttp3.logging.HttpLoggingInterceptor.Level.BODY;
4955
import static okhttp3.logging.HttpLoggingInterceptor.Level.NONE;
56+
import static org.apache.commons.lang.StringUtils.isEmpty;
5057

5158
@Slf4j
5259
public class OkHttpClient implements HttpClient {
@@ -69,6 +76,8 @@ public void configure(Map<String, ?> configs) {
6976
.addInterceptor(createLoggingInterceptor())
7077
.addInterceptor(chain -> chain.proceed(authorize(chain.request())))
7178
.authenticator((route, response) -> authorize(response.request()))
79+
.proxy(resolveProxy(config.getProxyHost(), config.getProxyPort()))
80+
.proxyAuthenticator(resolveProxyAuthenticator(config.getProxyUsername(), config.getProxyPassword()))
7281
.build();
7382
}
7483

@@ -88,6 +97,17 @@ private static HttpLoggingInterceptor createLoggingInterceptor() {
8897
}
8998
}
9099

100+
private static Proxy resolveProxy(String host, Integer port) {
101+
return isEmpty(host) ? NO_PROXY : new Proxy(HTTP, new InetSocketAddress(host, port));
102+
}
103+
104+
private static Authenticator resolveProxyAuthenticator(String username, String password) {
105+
return isEmpty(username) ? Authenticator.NONE :
106+
(route, response) -> response.request().newBuilder()
107+
.header("Proxy-Authorization", basic(username, password))
108+
.build();
109+
}
110+
91111
@Override
92112
@SneakyThrows(IOException.class)
93113
public HttpResponse execute(HttpRequest httpRequest) {

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/client/okhttp/OkHttpClientConfig.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import static org.apache.kafka.common.config.ConfigDef.Type.CLASS;
3434
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
3535
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
36+
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
3637

3738
@Getter
3839
public class OkHttpClientConfig extends AbstractConfig {
@@ -42,12 +43,20 @@ public class OkHttpClientConfig extends AbstractConfig {
4243
private static final String CONNECTION_KEEP_ALIVE_DURATION_MILLIS = "http.client.ttl.millis";
4344
private static final String CONNECTION_MAX_IDLE = "http.client.max-idle";
4445
private static final String AUTHENTICATOR = "http.auth";
46+
private static final String PROXY_HOST = "http.client.proxy.host";
47+
private static final String PROXY_PORT = "http.client.proxy.port";
48+
private static final String PROXY_USERNAME = "http.client.proxy.username";
49+
private static final String PROXY_PASSWORD = "http.client.proxy.password";
4550

4651
private final Long connectionTimeoutMillis;
4752
private final Long readTimeoutMillis;
4853
private final Long keepAliveDuration;
4954
private final Integer maxIdleConnections;
5055
private final HttpAuthenticator authenticator;
56+
private final String proxyHost;
57+
private final Integer proxyPort;
58+
private final String proxyUsername;
59+
private final String proxyPassword;
5160

5261
OkHttpClientConfig(Map<String, ?> originals) {
5362
super(config(), originals);
@@ -56,6 +65,10 @@ public class OkHttpClientConfig extends AbstractConfig {
5665
keepAliveDuration = getLong(CONNECTION_KEEP_ALIVE_DURATION_MILLIS);
5766
maxIdleConnections = getInt(CONNECTION_MAX_IDLE);
5867
authenticator = getConfiguredInstance(AUTHENTICATOR, HttpAuthenticator.class);
68+
proxyHost = getString(PROXY_HOST);
69+
proxyPort = getInt(PROXY_PORT);
70+
proxyUsername = getString(PROXY_USERNAME);
71+
proxyPassword = getString(PROXY_PASSWORD);
5972
}
6073

6174
public static ConfigDef config() {
@@ -64,6 +77,11 @@ public static ConfigDef config() {
6477
.define(READ_TIMEOUT_MILLIS, LONG, 2000, HIGH, "Read Timeout Millis")
6578
.define(CONNECTION_KEEP_ALIVE_DURATION_MILLIS, LONG, 300000, HIGH, "Keep Alive Duration Millis")
6679
.define(CONNECTION_MAX_IDLE, INT, 1, HIGH, "Max Idle Connections")
67-
.define(AUTHENTICATOR, CLASS, ConfigurableHttpAuthenticator.class, MEDIUM, "Custom Authenticator");
80+
.define(AUTHENTICATOR, CLASS, ConfigurableHttpAuthenticator.class, MEDIUM, "Custom Authenticator")
81+
.define(PROXY_HOST, STRING, "", MEDIUM, "Proxy host")
82+
.define(PROXY_PORT, INT, 3128, MEDIUM, "Proxy port")
83+
.define(PROXY_USERNAME, STRING, "", MEDIUM, "Proxy username")
84+
.define(PROXY_PASSWORD, STRING, "", MEDIUM, "Proxy password")
85+
;
6886
}
6987
}

kafka-connect-http/src/test/java/com/github/castorm/kafka/connect/http/client/okhttp/OkHttpClientConfigTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,46 @@ void whenAuthenticator_thenInitialized() {
8282
assertThat(config(ImmutableMap.of("http.auth", "com.github.castorm.kafka.connect.http.auth.BasicHttpAuthenticator")).getAuthenticator()).isInstanceOf(BasicHttpAuthenticator.class);
8383
}
8484

85+
@Test
86+
void whenNoProxyHost_thenDefault() {
87+
assertThat(config(emptyMap()).getProxyHost()).isEqualTo("");
88+
}
89+
90+
@Test
91+
void whenProxyHost_thenInitialized() {
92+
assertThat(config(ImmutableMap.of("http.client.proxy.host", "host")).getProxyHost()).isEqualTo("host");
93+
}
94+
95+
@Test
96+
void whenNoProxyPort_thenDefault() {
97+
assertThat(config(emptyMap()).getProxyPort()).isEqualTo(3128);
98+
}
99+
100+
@Test
101+
void whenProxyPort_thenInitialized() {
102+
assertThat(config(ImmutableMap.of("http.client.proxy.port", "8080")).getProxyPort()).isEqualTo(8080);
103+
}
104+
105+
@Test
106+
void whenNoProxyUsername_thenDefault() {
107+
assertThat(config(emptyMap()).getProxyUsername()).isEqualTo("");
108+
}
109+
110+
@Test
111+
void whenProxyUsername_thenInitialized() {
112+
assertThat(config(ImmutableMap.of("http.client.proxy.username", "user")).getProxyUsername()).isEqualTo("user");
113+
}
114+
115+
@Test
116+
void whenNoProxyPassword_thenDefault() {
117+
assertThat(config(emptyMap()).getProxyPassword()).isEqualTo("");
118+
}
119+
120+
@Test
121+
void whenProxyPassword_thenInitialized() {
122+
assertThat(config(ImmutableMap.of("http.client.proxy.password", "pass")).getProxyPassword()).isEqualTo("pass");
123+
}
124+
85125
private static OkHttpClientConfig config(Map<String, String> config) {
86126
return new OkHttpClientConfig(config);
87127
}

0 commit comments

Comments
 (0)