Skip to content
22 changes: 19 additions & 3 deletions src/main/scala/com/github/matsluni/akkahttpspi/RequestRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@
package com.github.matsluni.akkahttpspi

import java.util.concurrent.CompletableFuture

import akka.actor.ActorSystem
import akka.http.scaladsl.model.{ContentTypes, HttpResponse}
import akka.http.scaladsl.model.headers.{`Content-Length`, `Content-Type`}
import akka.stream.Materializer
import akka.stream.{Materializer, StreamTcpException}
import akka.stream.scaladsl.{Keep, Sink}
import org.slf4j.LoggerFactory
import software.amazon.awssdk.http.SdkHttpFullResponse
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler

import java.io.IOException
import scala.compat.java8.FutureConverters
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.CollectionConverters._
Expand All @@ -52,7 +52,7 @@ class RequestRunner()(implicit sys: ActorSystem, ec: ExecutionContext, mat: Mate
complete
}

result.failed.foreach(handler.onError)
result.failed.foreach(e => handler.onError(RequestRunner.decorateException(e)))
FutureConverters.toJava(result.map(_ => null: Void)).toCompletableFuture
}

Expand All @@ -78,3 +78,19 @@ class RequestRunner()(implicit sys: ActorSystem, ec: ExecutionContext, mat: Mate
headers ++ contentType ++ contentLength
}
}

object RequestRunner {
//Decorate akka-http exceptions with IOException so that AWS SDK retries them automatically (if using the default retry policy)
//This was inspired in NettyUtils.decorateException (https://github.com/aws/aws-sdk-java-v2/blob/13985e0668a9a0b12ad331644e3c4fd1385c2cd7/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/utils/NettyUtils.java#L67-L80)
private[akkahttpspi] def decorateException(e: Throwable): Throwable = e match {
//StreamTcpException is the exception thrown by the underlying TCP infrastructure (see akka.stream.impl.io.TcpConnectionStage)
case e: StreamTcpException => new IOException(e)
//akka.http.impl.engine.client.pool.SlotState$BusyState$$anon$1: Connection was shutdown is an
//IllegalStateException thrown by akka.http.impl.engine.client.pool.SlotState.BusyState.onConnectionCompleted
case e: IllegalStateException if e.getMessage == "Connection was shutdown." => new IOException(e)
//workaround for akka.http.impl.engine.client.OutgoingConnectionBlueprint.UnexpectedConnectionClosureException being private
//see more details in https://github.com/akka/akka-http/issues/3481
case e if e.getMessage.startsWith("The http server closed the connection unexpectedly") => new IOException(e)
case e => e
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package com.github.matsluni.akkahttpspi

import java.net.URI

import com.dimafeng.testcontainers.{ForAllTestContainer, GenericContainer}
import com.github.dockerjava.api.model.Frame
import com.github.matsluni.akkahttpspi.testcontainers.LocalStackReadyLogWaitStrategy
import org.scalatest.concurrent.{Eventually, Futures, IntegrationPatience}
import org.scalatest.BeforeAndAfter
Expand All @@ -28,6 +28,7 @@ import software.amazon.awssdk.regions.Region
import scala.util.Random
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import org.testcontainers.shaded.com.github.dockerjava.core.InvocationBuilder.AsyncResultCallback

trait BaseAwsClientTest[C <: SdkClient]
extends AnyWordSpec
Expand All @@ -53,12 +54,20 @@ trait LocalstackBaseAwsClientTest[C <: SdkClient] extends BaseAwsClientTest[C] {
lazy val exposedServicePort: Int = 4566

private lazy val containerInstance = new GenericContainer(
dockerImage = "localstack/localstack",
dockerImage = "localstack/localstack:1.4.0",
exposedPorts = Seq(exposedServicePort),
env = Map("SERVICES" -> service),
waitStrategy = Some(LocalStackReadyLogWaitStrategy)
)
override val container: GenericContainer = containerInstance

protected def killLocalstackProcess(): Unit = {
// restarting the docker container is not a solution because the port will change
// https://github.com/testcontainers/testcontainers-java/issues/606
container.dockerClient.execStartCmd(
container.dockerClient.execCreateCmd(container.containerId).withCmd("pkill", "python").exec().getId
).exec(new AsyncResultCallback[Frame]()).awaitCompletion().awaitResult()
}
}

trait ElasticMQSQSBaseAwsClientTest[C <: SdkClient] extends BaseAwsClientTest[C] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.scalatest.wordspec.AnyWordSpec
import software.amazon.awssdk.http.SdkHttpResponse
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler

import java.io.IOException
import scala.concurrent.Future
import scala.jdk.CollectionConverters._

Expand All @@ -47,6 +48,30 @@ class RequestRunnerSpec extends AnyWordSpec with Matchers with OptionValues {
handler.responseHeaders.headers().asScala.get("Content-Length").value.asScala.headOption.value shouldBe "2"
}

"decorate UnexpectedConnectionClosureException" in {
//instantiate akka.http.impl.engine.client.OutgoingConnectionBlueprint.UnexpectedConnectionClosureException using reflection
val clazz = Class.forName("akka.http.impl.engine.client.OutgoingConnectionBlueprint$UnexpectedConnectionClosureException")
val e = clazz.getDeclaredConstructor(classOf[Int]).newInstance(Integer.valueOf(1)).asInstanceOf[Throwable]
val ioexception = RequestRunner.decorateException(e)
ioexception shouldBe a[IOException]
ioexception.getMessage shouldBe "akka.http.impl.engine.client.OutgoingConnectionBlueprint$UnexpectedConnectionClosureException: The http server closed the connection unexpectedly before delivering responses for 1 outstanding requests"
}

"decorate StreamTcpException" in {
val e = new akka.stream.StreamTcpException("The connection closed with error: Connection reset")
val ioexception = RequestRunner.decorateException(e)
ioexception shouldBe a[IOException]
ioexception.getMessage shouldBe "akka.stream.StreamTcpException: The connection closed with error: Connection reset"
}
"decorate IllegalStateException with 'Connection was shutdown' reason" in {
val e = new IllegalStateException("Connection was shutdown.")
val ioexception = RequestRunner.decorateException(e)
ioexception shouldBe a[IOException]
ioexception.getMessage shouldBe "java.lang.IllegalStateException: Connection was shutdown."
}



class MyHeaderHandler() extends SdkAsyncHttpResponseHandler {
private val headers = new AtomicReference[SdkHttpResponse](null)
def responseHeaders = headers.get()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class TestS3 extends BaseAwsClientTest[S3AsyncClient] {
private lazy val containerInstance = new GenericContainer(
dockerImage = "adobe/s3mock:2.13.0",
exposedPorts = Seq(exposedServicePort),
waitStrategy = Some(TimeoutWaitStrategy(10 seconds))
waitStrategy = Some(TimeoutWaitStrategy(15 seconds))
)
override val container: GenericContainer = containerInstance
}
47 changes: 43 additions & 4 deletions src/test/scala/com/github/matsluni/akkahttpspi/sns/TestSNS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@

package com.github.matsluni.akkahttpspi.sns

import com.github.dockerjava.api.model.Frame
import com.github.matsluni.akkahttpspi.{AkkaHttpAsyncHttpService, LocalstackBaseAwsClientTest}
import org.testcontainers.shaded.com.github.dockerjava.core.InvocationBuilder.AsyncResultCallback
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.services.sns.SnsAsyncClient
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
import software.amazon.awssdk.core.retry.RetryPolicy
import software.amazon.awssdk.core.retry.backoff.FixedDelayBackoffStrategy
import software.amazon.awssdk.services.sns.model.{CreateTopicRequest, PublishRequest}
import software.amazon.awssdk.services.sns.{SnsAsyncClient, SnsAsyncClientBuilder}

import java.time.Duration
import scala.concurrent.duration.DurationInt

class TestSNS extends LocalstackBaseAwsClientTest[SnsAsyncClient] {

Expand All @@ -32,17 +40,36 @@ class TestSNS extends LocalstackBaseAwsClientTest[SnsAsyncClient] {
}
}

def withClient(testCode: SnsAsyncClient => Any): Any = {
"Retryable SNS client" should {
"retry" in withLongRetriesClient { client =>
//the localstack process wil take a few seconds to restart
killLocalstackProcess()

val startTime = System.currentTimeMillis()
val listTopicsFuture = client.listTopics()

//If debug logs on "software.amazon" are enabled, one should see the retry messages
//"Retryable error detected. Will retry in 1000ms. Request attempt number 2"
listTopicsFuture.join()

val duration = System.currentTimeMillis() - startTime
duration shouldBe > (3.seconds.toMillis)
}
}

def withClient(testCode: SnsAsyncClient => Any): Any = withCustomClient(identity)(testCode)
def withCustomClient(builderFn: SnsAsyncClientBuilder => SnsAsyncClientBuilder)(testCode: SnsAsyncClient => Any): Any = {

val akkaClient = new AkkaHttpAsyncHttpService().createAsyncHttpClientFactory().build()

val client = SnsAsyncClient
val builder = SnsAsyncClient
.builder()
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")))
.httpClient(akkaClient)
.region(defaultRegion)
.endpointOverride(endpoint)
.build()

val client = builderFn(builder).build()

try {
testCode(client)
Expand All @@ -53,5 +80,17 @@ class TestSNS extends LocalstackBaseAwsClientTest[SnsAsyncClient] {
}
}

/** Uses an SnsAsyncClient will retry more times than the default retry policy (3 retries) and with a bigger delay between retries */
private def withLongRetriesClient(testCode: SnsAsyncClient => Any) = withCustomClient(b =>
b.overrideConfiguration { (b: ClientOverrideConfiguration.Builder) =>
b.retryPolicy(RetryPolicy.builder()
.numRetries(10)
.backoffStrategy(FixedDelayBackoffStrategy.create(Duration.ofSeconds(1)))
.build()
)
()
}
)(testCode)

override def service: String = "sns"
}