Skip to content

Commit 659d379

Browse files
kirktrueomkreddy
authored andcommitted
MINOR: Code cleanup and additional tests for DefaultJwtValidator
1 parent c36f1cb commit 659d379

File tree

3 files changed

+74
-4
lines changed

3 files changed

+74
-4
lines changed

clients/src/main/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtValidator.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.kafka.common.security.oauthbearer;
1919

20+
import org.apache.kafka.common.config.SaslConfigs;
2021
import org.apache.kafka.common.security.oauthbearer.internals.secured.CloseableVerificationKeyResolver;
22+
import org.apache.kafka.common.security.oauthbearer.internals.secured.ConfigurationUtils;
2123
import org.apache.kafka.common.utils.Utils;
2224

2325
import org.jose4j.keys.resolvers.VerificationKeyResolver;
@@ -54,7 +56,13 @@ public void configure(Map<String, ?> configs, String saslMechanism, List<AppConf
5456
if (verificationKeyResolver.isPresent()) {
5557
delegate = new BrokerJwtValidator(verificationKeyResolver.get());
5658
} else {
57-
delegate = new ClientJwtValidator();
59+
ConfigurationUtils cu = new ConfigurationUtils(configs, saslMechanism);
60+
61+
if (cu.containsKey(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL)) {
62+
delegate = new BrokerJwtValidator();
63+
} else {
64+
delegate = new ClientJwtValidator();
65+
}
5866
}
5967

6068
delegate.configure(configs, saslMechanism, jaasConfigEntries);

clients/src/test/java/org/apache/kafka/common/security/oauthbearer/DefaultJwtValidatorTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,34 @@
1717

1818
package org.apache.kafka.common.security.oauthbearer;
1919

20+
import org.apache.kafka.common.config.SaslConfigs;
21+
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
2022
import org.apache.kafka.common.security.oauthbearer.internals.secured.AccessTokenBuilder;
2123
import org.apache.kafka.common.security.oauthbearer.internals.secured.CloseableVerificationKeyResolver;
2224
import org.apache.kafka.common.security.oauthbearer.internals.secured.OAuthBearerTest;
2325

26+
import org.jose4j.jwk.JsonWebKey;
27+
import org.jose4j.jwk.JsonWebKeySet;
28+
import org.jose4j.jwk.PublicJsonWebKey;
2429
import org.jose4j.jws.AlgorithmIdentifiers;
30+
import org.junit.jupiter.api.AfterEach;
2531
import org.junit.jupiter.api.Test;
2632

2733
import java.util.Map;
2834

35+
import static org.apache.kafka.common.config.internals.BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG;
2936
import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
37+
import static org.apache.kafka.test.TestUtils.tempFile;
3038
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
3139
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
3240

3341
public class DefaultJwtValidatorTest extends OAuthBearerTest {
3442

43+
@AfterEach
44+
public void tearDown() {
45+
System.clearProperty(BrokerSecurityConfigs.ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG);
46+
}
47+
3548
@Test
3649
public void testConfigureWithVerificationKeyResolver() {
3750
AccessTokenBuilder builder = new AccessTokenBuilder()
@@ -51,6 +64,26 @@ public void testConfigureWithoutVerificationKeyResolver() {
5164
assertInstanceOf(ClientJwtValidator.class, jwtValidator.delegate());
5265
}
5366

67+
@Test
68+
public void testConfigureWithJwksUrl() throws Exception {
69+
PublicJsonWebKey jwk = createRsaJwk();
70+
AccessTokenBuilder builder = new AccessTokenBuilder()
71+
.jwk(jwk)
72+
.alg(AlgorithmIdentifiers.RSA_USING_SHA256);
73+
String accessToken = builder.build();
74+
75+
JsonWebKeySet jwks = new JsonWebKeySet(jwk);
76+
String jwksJson = jwks.toJson(JsonWebKey.OutputControlLevel.PUBLIC_ONLY);
77+
String fileUrl = tempFile(jwksJson).toURI().toString();
78+
System.setProperty(ALLOWED_SASL_OAUTHBEARER_URLS_CONFIG, fileUrl);
79+
Map<String, ?> configs = getSaslConfigs(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_URL, fileUrl);
80+
81+
DefaultJwtValidator jwtValidator = new DefaultJwtValidator();
82+
assertDoesNotThrow(() -> jwtValidator.configure(configs, OAUTHBEARER_MECHANISM, getJaasConfigEntries()));
83+
assertInstanceOf(BrokerJwtValidator.class, jwtValidator.delegate());
84+
assertDoesNotThrow(() -> jwtValidator.validate(accessToken));
85+
}
86+
5487
private CloseableVerificationKeyResolver createVerificationKeyResolver(AccessTokenBuilder builder) {
5588
return (jws, nestingContext) -> builder.jwk().getPublicKey();
5689
}

core/src/test/scala/integration/kafka/api/ClientOAuthIntegrationTest.scala

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
2626
import java.util.{Base64, Collections, Properties}
2727
import no.nav.security.mock.oauth2.{MockOAuth2Server, OAuth2Config}
2828
import no.nav.security.mock.oauth2.token.{KeyProvider, OAuth2TokenProvider}
29-
import org.apache.kafka.common.KafkaException
29+
import org.apache.kafka.common.{KafkaException, TopicPartition}
3030
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
31+
import org.apache.kafka.common.errors.SaslAuthenticationException
3132
import org.apache.kafka.common.security.auth.SecurityProtocol
32-
import org.apache.kafka.common.security.oauthbearer.{OAuthBearerLoginCallbackHandler, OAuthBearerLoginModule, OAuthBearerValidatorCallbackHandler}
33+
import org.apache.kafka.common.security.oauthbearer.{JwtRetriever, OAuthBearerLoginCallbackHandler, OAuthBearerLoginModule, OAuthBearerValidatorCallbackHandler}
3334
import org.apache.kafka.common.utils.Utils
3435
import org.apache.kafka.test.TestUtils
3536
import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertThrows}
@@ -244,6 +245,27 @@ class ClientOAuthIntegrationTest extends IntegrationTestHarness with SaslSetup {
244245
assertThrows(classOf[ConfigException], () => createAdminClient(configOverrides = configs))
245246
}
246247

248+
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
249+
@MethodSource(Array("getTestGroupProtocolParametersAll"))
250+
def testAuthenticationErrorOnTamperedJwt(groupProtocol: String): Unit = {
251+
val className = classOf[TamperedJwtRetriever].getName
252+
253+
val configs = defaultOAuthConfigs()
254+
configs.put(SaslConfigs.SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS, className)
255+
256+
val tp = new TopicPartition("test-topic", 0)
257+
258+
val admin = createAdminClient(configOverrides = configs)
259+
TestUtils.assertFutureThrows(classOf[SaslAuthenticationException], admin.describeCluster().clusterId())
260+
261+
val producer = createProducer(configOverrides = configs)
262+
assertThrows(classOf[SaslAuthenticationException], () => producer.partitionsFor(tp.topic()))
263+
264+
val consumer = createConsumer(configOverrides = configs)
265+
consumer.assign(Collections.singleton(tp))
266+
assertThrows(classOf[SaslAuthenticationException], () => consumer.position(tp))
267+
}
268+
247269
def generatePrivateKeyFile(): File = {
248270
val file = File.createTempFile("private-", ".key")
249271
val bytes = Base64.getEncoder.encode(privateKey.getEncoded)
@@ -258,4 +280,11 @@ class ClientOAuthIntegrationTest extends IntegrationTestHarness with SaslSetup {
258280

259281
file
260282
}
261-
}
283+
}
284+
285+
class TamperedJwtRetriever extends JwtRetriever {
286+
287+
override def retrieve(): String = {
288+
"eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9.eyJzdWIiOiAiMTIzNDU2Nzg5MCIsICJuYW1lIjogIkpvaG4gRG9lIiwgInJvbGUiOiAiYWRtaW4iLCAiaWF0IjogMTUxNjIzOTAyMiwgImV4cCI6IDE5MTYyMzkwMjJ9.vVT5ylQCGvb0B-wv1YXHjmlMd-DZKCThUt5-enry_sA"
289+
}
290+
}

0 commit comments

Comments
 (0)