@@ -26,10 +26,11 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, TestInfo}
2626import java .util .{Base64 , Collections , Properties }
2727import no .nav .security .mock .oauth2 .{MockOAuth2Server , OAuth2Config }
2828import no .nav .security .mock .oauth2 .token .{KeyProvider , OAuth2TokenProvider }
29- import org .apache .kafka .common .KafkaException
29+ import org .apache .kafka .common .{ KafkaException , TopicPartition }
3030import org .apache .kafka .common .config .internals .BrokerSecurityConfigs
31+ import org .apache .kafka .common .errors .SaslAuthenticationException
3132import org .apache .kafka .common .security .auth .SecurityProtocol
32- import org .apache .kafka .common .security .oauthbearer .{OAuthBearerLoginCallbackHandler , OAuthBearerLoginModule , OAuthBearerValidatorCallbackHandler }
33+ import org .apache .kafka .common .security .oauthbearer .{JwtRetriever , OAuthBearerLoginCallbackHandler , OAuthBearerLoginModule , OAuthBearerValidatorCallbackHandler }
3334import org .apache .kafka .common .utils .Utils
3435import org .apache .kafka .test .TestUtils
3536import org .junit .jupiter .api .Assertions .{assertDoesNotThrow , assertThrows }
@@ -244,6 +245,27 @@ class ClientOAuthIntegrationTest extends IntegrationTestHarness with SaslSetup {
244245 assertThrows(classOf [ConfigException ], () => createAdminClient(configOverrides = configs))
245246 }
246247
248+ @ ParameterizedTest (name = TestInfoUtils .TestWithParameterizedGroupProtocolNames )
249+ @ MethodSource (Array (" getTestGroupProtocolParametersAll" ))
250+ def testAuthenticationErrorOnTamperedJwt (groupProtocol : String ): Unit = {
251+ val className = classOf [TamperedJwtRetriever ].getName
252+
253+ val configs = defaultOAuthConfigs()
254+ configs.put(SaslConfigs .SASL_OAUTHBEARER_JWT_RETRIEVER_CLASS , className)
255+
256+ val tp = new TopicPartition (" test-topic" , 0 )
257+
258+ val admin = createAdminClient(configOverrides = configs)
259+ TestUtils .assertFutureThrows(classOf [SaslAuthenticationException ], admin.describeCluster().clusterId())
260+
261+ val producer = createProducer(configOverrides = configs)
262+ assertThrows(classOf [SaslAuthenticationException ], () => producer.partitionsFor(tp.topic()))
263+
264+ val consumer = createConsumer(configOverrides = configs)
265+ consumer.assign(Collections .singleton(tp))
266+ assertThrows(classOf [SaslAuthenticationException ], () => consumer.position(tp))
267+ }
268+
247269 def generatePrivateKeyFile (): File = {
248270 val file = File .createTempFile(" private-" , " .key" )
249271 val bytes = Base64 .getEncoder.encode(privateKey.getEncoded)
@@ -258,4 +280,11 @@ class ClientOAuthIntegrationTest extends IntegrationTestHarness with SaslSetup {
258280
259281 file
260282 }
261- }
283+ }
284+
285+ class TamperedJwtRetriever extends JwtRetriever {
286+
287+ override def retrieve (): String = {
288+ " eyJhbGciOiAiSFMyNTYiLCAidHlwIjogIkpXVCJ9.eyJzdWIiOiAiMTIzNDU2Nzg5MCIsICJuYW1lIjogIkpvaG4gRG9lIiwgInJvbGUiOiAiYWRtaW4iLCAiaWF0IjogMTUxNjIzOTAyMiwgImV4cCI6IDE5MTYyMzkwMjJ9.vVT5ylQCGvb0B-wv1YXHjmlMd-DZKCThUt5-enry_sA"
289+ }
290+ }
0 commit comments