diff --git a/src/main/scala/com/github/matsluni/akkahttpspi/RequestRunner.scala b/src/main/scala/com/github/matsluni/akkahttpspi/RequestRunner.scala index 0b32b41..b573553 100644 --- a/src/main/scala/com/github/matsluni/akkahttpspi/RequestRunner.scala +++ b/src/main/scala/com/github/matsluni/akkahttpspi/RequestRunner.scala @@ -17,16 +17,16 @@ package com.github.matsluni.akkahttpspi import java.util.concurrent.CompletableFuture - import akka.actor.ActorSystem import akka.http.scaladsl.model.{ContentTypes, HttpResponse} import akka.http.scaladsl.model.headers.{`Content-Length`, `Content-Type`} -import akka.stream.Materializer +import akka.stream.{Materializer, StreamTcpException} import akka.stream.scaladsl.{Keep, Sink} import org.slf4j.LoggerFactory import software.amazon.awssdk.http.SdkHttpFullResponse import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler +import java.io.IOException import scala.compat.java8.FutureConverters import scala.concurrent.{ExecutionContext, Future} import scala.jdk.CollectionConverters._ @@ -52,7 +52,7 @@ class RequestRunner()(implicit sys: ActorSystem, ec: ExecutionContext, mat: Mate complete } - result.failed.foreach(handler.onError) + result.failed.foreach(e => handler.onError(RequestRunner.decorateException(e))) FutureConverters.toJava(result.map(_ => null: Void)).toCompletableFuture } @@ -78,3 +78,19 @@ class RequestRunner()(implicit sys: ActorSystem, ec: ExecutionContext, mat: Mate headers ++ contentType ++ contentLength } } + +object RequestRunner { + //Decorate akka-http exceptions with IOException so that AWS SDK retries them automatically (if using the default retry policy) + //This was inspired in NettyUtils.decorateException (https://github.com/aws/aws-sdk-java-v2/blob/13985e0668a9a0b12ad331644e3c4fd1385c2cd7/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/utils/NettyUtils.java#L67-L80) + private[akkahttpspi] def decorateException(e: Throwable): Throwable = e match { + //StreamTcpException is the exception thrown by the underlying TCP infrastructure (see akka.stream.impl.io.TcpConnectionStage) + case e: StreamTcpException => new IOException(e) + //akka.http.impl.engine.client.pool.SlotState$BusyState$$anon$1: Connection was shutdown is an + //IllegalStateException thrown by akka.http.impl.engine.client.pool.SlotState.BusyState.onConnectionCompleted + case e: IllegalStateException if e.getMessage == "Connection was shutdown." => new IOException(e) + //workaround for akka.http.impl.engine.client.OutgoingConnectionBlueprint.UnexpectedConnectionClosureException being private + //see more details in https://github.com/akka/akka-http/issues/3481 + case e if e.getMessage.startsWith("The http server closed the connection unexpectedly") => new IOException(e) + case e => e + } +} diff --git a/src/test/scala/com/github/matsluni/akkahttpspi/BaseAwsClientTest.scala b/src/test/scala/com/github/matsluni/akkahttpspi/BaseAwsClientTest.scala index a93ad50..29e0df5 100644 --- a/src/test/scala/com/github/matsluni/akkahttpspi/BaseAwsClientTest.scala +++ b/src/test/scala/com/github/matsluni/akkahttpspi/BaseAwsClientTest.scala @@ -17,8 +17,8 @@ package com.github.matsluni.akkahttpspi import java.net.URI - import com.dimafeng.testcontainers.{ForAllTestContainer, GenericContainer} +import com.github.dockerjava.api.model.Frame import com.github.matsluni.akkahttpspi.testcontainers.LocalStackReadyLogWaitStrategy import org.scalatest.concurrent.{Eventually, Futures, IntegrationPatience} import org.scalatest.BeforeAndAfter @@ -28,6 +28,7 @@ import software.amazon.awssdk.regions.Region import scala.util.Random import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec +import org.testcontainers.shaded.com.github.dockerjava.core.InvocationBuilder.AsyncResultCallback trait BaseAwsClientTest[C <: SdkClient] extends AnyWordSpec @@ -53,12 +54,20 @@ trait LocalstackBaseAwsClientTest[C <: SdkClient] extends BaseAwsClientTest[C] { lazy val exposedServicePort: Int = 4566 private lazy val containerInstance = new GenericContainer( - dockerImage = "localstack/localstack", + dockerImage = "localstack/localstack:1.4.0", exposedPorts = Seq(exposedServicePort), env = Map("SERVICES" -> service), waitStrategy = Some(LocalStackReadyLogWaitStrategy) ) override val container: GenericContainer = containerInstance + + protected def killLocalstackProcess(): Unit = { + // restarting the docker container is not a solution because the port will change + // https://github.com/testcontainers/testcontainers-java/issues/606 + container.dockerClient.execStartCmd( + container.dockerClient.execCreateCmd(container.containerId).withCmd("pkill", "python").exec().getId + ).exec(new AsyncResultCallback[Frame]()).awaitCompletion().awaitResult() + } } trait ElasticMQSQSBaseAwsClientTest[C <: SdkClient] extends BaseAwsClientTest[C] { diff --git a/src/test/scala/com/github/matsluni/akkahttpspi/RequestRunnerSpec.scala b/src/test/scala/com/github/matsluni/akkahttpspi/RequestRunnerSpec.scala index b79474f..48d4ea2 100644 --- a/src/test/scala/com/github/matsluni/akkahttpspi/RequestRunnerSpec.scala +++ b/src/test/scala/com/github/matsluni/akkahttpspi/RequestRunnerSpec.scala @@ -29,6 +29,7 @@ import org.scalatest.wordspec.AnyWordSpec import software.amazon.awssdk.http.SdkHttpResponse import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler +import java.io.IOException import scala.concurrent.Future import scala.jdk.CollectionConverters._ @@ -47,6 +48,30 @@ class RequestRunnerSpec extends AnyWordSpec with Matchers with OptionValues { handler.responseHeaders.headers().asScala.get("Content-Length").value.asScala.headOption.value shouldBe "2" } + "decorate UnexpectedConnectionClosureException" in { + //instantiate akka.http.impl.engine.client.OutgoingConnectionBlueprint.UnexpectedConnectionClosureException using reflection + val clazz = Class.forName("akka.http.impl.engine.client.OutgoingConnectionBlueprint$UnexpectedConnectionClosureException") + val e = clazz.getDeclaredConstructor(classOf[Int]).newInstance(Integer.valueOf(1)).asInstanceOf[Throwable] + val ioexception = RequestRunner.decorateException(e) + ioexception shouldBe a[IOException] + ioexception.getMessage shouldBe "akka.http.impl.engine.client.OutgoingConnectionBlueprint$UnexpectedConnectionClosureException: The http server closed the connection unexpectedly before delivering responses for 1 outstanding requests" + } + + "decorate StreamTcpException" in { + val e = new akka.stream.StreamTcpException("The connection closed with error: Connection reset") + val ioexception = RequestRunner.decorateException(e) + ioexception shouldBe a[IOException] + ioexception.getMessage shouldBe "akka.stream.StreamTcpException: The connection closed with error: Connection reset" + } + "decorate IllegalStateException with 'Connection was shutdown' reason" in { + val e = new IllegalStateException("Connection was shutdown.") + val ioexception = RequestRunner.decorateException(e) + ioexception shouldBe a[IOException] + ioexception.getMessage shouldBe "java.lang.IllegalStateException: Connection was shutdown." + } + + + class MyHeaderHandler() extends SdkAsyncHttpResponseHandler { private val headers = new AtomicReference[SdkHttpResponse](null) def responseHeaders = headers.get() diff --git a/src/test/scala/com/github/matsluni/akkahttpspi/s3/TestS3.scala b/src/test/scala/com/github/matsluni/akkahttpspi/s3/TestS3.scala index 26df999..05e6410 100644 --- a/src/test/scala/com/github/matsluni/akkahttpspi/s3/TestS3.scala +++ b/src/test/scala/com/github/matsluni/akkahttpspi/s3/TestS3.scala @@ -126,7 +126,7 @@ class TestS3 extends BaseAwsClientTest[S3AsyncClient] { private lazy val containerInstance = new GenericContainer( dockerImage = "adobe/s3mock:2.13.0", exposedPorts = Seq(exposedServicePort), - waitStrategy = Some(TimeoutWaitStrategy(10 seconds)) + waitStrategy = Some(TimeoutWaitStrategy(15 seconds)) ) override val container: GenericContainer = containerInstance } diff --git a/src/test/scala/com/github/matsluni/akkahttpspi/sns/TestSNS.scala b/src/test/scala/com/github/matsluni/akkahttpspi/sns/TestSNS.scala index f45fb36..dacd86e 100644 --- a/src/test/scala/com/github/matsluni/akkahttpspi/sns/TestSNS.scala +++ b/src/test/scala/com/github/matsluni/akkahttpspi/sns/TestSNS.scala @@ -16,10 +16,18 @@ package com.github.matsluni.akkahttpspi.sns +import com.github.dockerjava.api.model.Frame import com.github.matsluni.akkahttpspi.{AkkaHttpAsyncHttpService, LocalstackBaseAwsClientTest} +import org.testcontainers.shaded.com.github.dockerjava.core.InvocationBuilder.AsyncResultCallback import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider} -import software.amazon.awssdk.services.sns.SnsAsyncClient +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration +import software.amazon.awssdk.core.retry.RetryPolicy +import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy import software.amazon.awssdk.services.sns.model.{CreateTopicRequest, PublishRequest} +import software.amazon.awssdk.services.sns.{SnsAsyncClient, SnsAsyncClientBuilder} + +import java.time.Duration +import scala.concurrent.duration.DurationInt class TestSNS extends LocalstackBaseAwsClientTest[SnsAsyncClient] { @@ -32,17 +40,36 @@ class TestSNS extends LocalstackBaseAwsClientTest[SnsAsyncClient] { } } - def withClient(testCode: SnsAsyncClient => Any): Any = { + "Retryable SNS client" should { + "retry" in withLongRetriesClient { client => + //the localstack process wil take a few seconds to restart + killLocalstackProcess() + + val startTime = System.currentTimeMillis() + val listTopicsFuture = client.listTopics() + + //If debug logs on "software.amazon" are enabled, one should see the retry messages + //"Retryable error detected. Will retry in 1000ms. Request attempt number 2" + listTopicsFuture.join() + + val duration = System.currentTimeMillis() - startTime + duration shouldBe > (3.seconds.toMillis) + } + } + + def withClient(testCode: SnsAsyncClient => Any): Any = withCustomClient(identity)(testCode) + def withCustomClient(builderFn: SnsAsyncClientBuilder => SnsAsyncClientBuilder)(testCode: SnsAsyncClient => Any): Any = { val akkaClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory().build() - val client = SnsAsyncClient + val builder = SnsAsyncClient .builder() .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x"))) .httpClient(akkaClient) .region(defaultRegion) .endpointOverride(endpoint) - .build() + + val client = builderFn(builder).build() try { testCode(client) @@ -53,5 +80,17 @@ class TestSNS extends LocalstackBaseAwsClientTest[SnsAsyncClient] { } } + /** Uses an SnsAsyncClient will retry more times than the default retry policy (3 retries) and with a bigger delay between retries */ + private def withLongRetriesClient(testCode: SnsAsyncClient => Any) = withCustomClient(b => + b.overrideConfiguration { (b: ClientOverrideConfiguration.Builder) => + b.retryPolicy(RetryPolicy.builder() + .numRetries(10) + .backoffStrategy(FixedDelayBackoffStrategy.create(Duration.ofSeconds(1))) + .build() + ) + () + } + )(testCode) + override def service: String = "sns" }