Skip to content

Commit 96eae80

Browse files
hackeddcastorm
authored andcommitted
Add HTTP Client keystore for mutual TLS
1 parent a9af8f0 commit 96eae80

File tree

2 files changed

+82
-3
lines changed

2 files changed

+82
-3
lines changed

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/client/okhttp/OkHttpClient.java

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,23 @@
3535
import okhttp3.Response;
3636
import okhttp3.logging.HttpLoggingInterceptor;
3737

38+
import javax.net.ssl.KeyManagerFactory;
39+
import javax.net.ssl.SSLContext;
40+
import javax.net.ssl.TrustManager;
41+
import javax.net.ssl.TrustManagerFactory;
42+
import javax.net.ssl.X509TrustManager;
43+
import java.io.FileInputStream;
3844
import java.io.IOException;
45+
import java.io.InputStream;
3946
import java.net.InetSocketAddress;
4047
import java.net.Proxy;
48+
import java.security.KeyManagementException;
49+
import java.security.KeyStore;
50+
import java.security.KeyStoreException;
51+
import java.security.NoSuchAlgorithmException;
52+
import java.security.UnrecoverableKeyException;
53+
import java.security.cert.CertificateException;
54+
import java.util.Arrays;
4155
import java.util.List;
4256
import java.util.Map;
4357
import java.util.Optional;
@@ -68,7 +82,8 @@ public void configure(Map<String, ?> configs) {
6882
OkHttpClientConfig config = new OkHttpClientConfig(configs);
6983

7084
authenticator = config.getAuthenticator();
71-
client = new okhttp3.OkHttpClient.Builder()
85+
86+
okhttp3.OkHttpClient.Builder builder = new okhttp3.OkHttpClient.Builder()
7287
.connectionPool(new ConnectionPool(config.getMaxIdleConnections(), config.getKeepAliveDuration(), MILLISECONDS))
7388
.connectTimeout(config.getConnectionTimeoutMillis(), MILLISECONDS)
7489
.readTimeout(config.getReadTimeoutMillis(), MILLISECONDS)
@@ -77,8 +92,11 @@ public void configure(Map<String, ?> configs) {
7792
.addInterceptor(chain -> chain.proceed(authorize(chain.request())))
7893
.authenticator((route, response) -> authorize(response.request()))
7994
.proxy(resolveProxy(config.getProxyHost(), config.getProxyPort()))
80-
.proxyAuthenticator(resolveProxyAuthenticator(config.getProxyUsername(), config.getProxyPassword()))
81-
.build();
95+
.proxyAuthenticator(resolveProxyAuthenticator(config.getProxyUsername(), config.getProxyPassword()));
96+
97+
resolveSslSocketFactory(builder, config.getKeyStore(), config.getKeyStorePassword().value());
98+
99+
client = builder.build();
82100
}
83101

84102
private Request authorize(Request request) {
@@ -108,6 +126,57 @@ private static Authenticator resolveProxyAuthenticator(String username, String p
108126
.build();
109127
}
110128

129+
private static void resolveSslSocketFactory(okhttp3.OkHttpClient.Builder builder, String keyStorePath, String keyStorePassword) {
130+
if (keyStorePath.isEmpty()) {
131+
return;
132+
}
133+
134+
KeyStore keyStore;
135+
try {
136+
keyStore = KeyStore.getInstance("PKCS12");
137+
} catch (KeyStoreException e) {
138+
throw new IllegalStateException("Unable to create keystore", e);
139+
}
140+
141+
try (InputStream is = new FileInputStream(keyStorePath)) {
142+
keyStore.load(is, keyStorePassword.toCharArray());
143+
}
144+
catch (CertificateException | IOException | NoSuchAlgorithmException e) {
145+
throw new IllegalStateException(String.format("Unable to load keystore '%s'", keyStorePath), e);
146+
}
147+
148+
KeyManagerFactory kmf;
149+
try {
150+
kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
151+
kmf.init(keyStore, keyStorePassword.toCharArray());
152+
} catch (UnrecoverableKeyException | NoSuchAlgorithmException | KeyStoreException e) {
153+
throw new IllegalStateException("Unable to initialize key manager", e);
154+
}
155+
156+
SSLContext sslContext;
157+
try {
158+
sslContext = SSLContext.getInstance("TLS");
159+
sslContext.init(kmf.getKeyManagers(), null, null);
160+
} catch (NoSuchAlgorithmException | KeyManagementException e) {
161+
throw new IllegalStateException("Unable to initialize SSL context", e);
162+
}
163+
164+
TrustManagerFactory trustManagerFactory;
165+
try {
166+
trustManagerFactory = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
167+
trustManagerFactory.init((KeyStore) null);
168+
} catch (NoSuchAlgorithmException | KeyStoreException e) {
169+
throw new IllegalStateException("Unable to initialize trust manager", e);
170+
}
171+
172+
TrustManager[] trustManagers = trustManagerFactory.getTrustManagers();
173+
if (trustManagers.length != 1 || !(trustManagers[0] instanceof X509TrustManager)) {
174+
throw new IllegalStateException("Unexpected default trust managers:" + Arrays.toString(trustManagers));
175+
}
176+
177+
builder.sslSocketFactory(sslContext.getSocketFactory(), (X509TrustManager) trustManagers[0]);
178+
}
179+
111180
@Override
112181
@SneakyThrows(IOException.class)
113182
public HttpResponse execute(HttpRequest httpRequest) {

kafka-connect-http/src/main/java/com/github/castorm/kafka/connect/http/client/okhttp/OkHttpClientConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import lombok.Getter;
2626
import org.apache.kafka.common.config.AbstractConfig;
2727
import org.apache.kafka.common.config.ConfigDef;
28+
import org.apache.kafka.common.config.types.Password;
2829

2930
import java.util.Map;
3031

@@ -34,6 +35,7 @@
3435
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
3536
import static org.apache.kafka.common.config.ConfigDef.Type.LONG;
3637
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
38+
import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD;
3739

3840
@Getter
3941
public class OkHttpClientConfig extends AbstractConfig {
@@ -47,6 +49,8 @@ public class OkHttpClientConfig extends AbstractConfig {
4749
private static final String PROXY_PORT = "http.client.proxy.port";
4850
private static final String PROXY_USERNAME = "http.client.proxy.username";
4951
private static final String PROXY_PASSWORD = "http.client.proxy.password";
52+
private static final String KEYSTORE = "http.client.keystore";
53+
private static final String KEYSTORE_PASSWORD = "http.client.keystore.password";
5054

5155
private final Long connectionTimeoutMillis;
5256
private final Long readTimeoutMillis;
@@ -57,6 +61,8 @@ public class OkHttpClientConfig extends AbstractConfig {
5761
private final Integer proxyPort;
5862
private final String proxyUsername;
5963
private final String proxyPassword;
64+
private final String keyStore;
65+
private final Password keyStorePassword;
6066

6167
OkHttpClientConfig(Map<String, ?> originals) {
6268
super(config(), originals);
@@ -69,6 +75,8 @@ public class OkHttpClientConfig extends AbstractConfig {
6975
proxyPort = getInt(PROXY_PORT);
7076
proxyUsername = getString(PROXY_USERNAME);
7177
proxyPassword = getString(PROXY_PASSWORD);
78+
keyStore = getString(KEYSTORE);
79+
keyStorePassword = getPassword(KEYSTORE_PASSWORD);
7280
}
7381

7482
public static ConfigDef config() {
@@ -82,6 +90,8 @@ public static ConfigDef config() {
8290
.define(PROXY_PORT, INT, 3128, MEDIUM, "Proxy port")
8391
.define(PROXY_USERNAME, STRING, "", MEDIUM, "Proxy username")
8492
.define(PROXY_PASSWORD, STRING, "", MEDIUM, "Proxy password")
93+
.define(KEYSTORE, STRING, "", MEDIUM, "Keystore")
94+
.define(KEYSTORE_PASSWORD, PASSWORD, "", MEDIUM, "Keystore password")
8595
;
8696
}
8797
}

0 commit comments

Comments
 (0)