Skip to content

Commit 653e008

Browse files
committed
Make Scala interception more reliable for existing connection pools
Previously the tests would frequently fail because the pool wasn't properly reset when interception began. This seems to be a race condition if there's a request currently in progress, and it's something that would likely resolve in practice _eventually_ but it's better to detect cases and fix it properly per pool, rather than relying on shutdownAll working reliably with concurrent requests. Still more work to do here to make Akka reliable with older versions, and check a couple of other edge cases.
1 parent 8326dc5 commit 653e008

File tree

7 files changed

+113
-47
lines changed

7 files changed

+113
-47
lines changed

src/main/java/tech/httptoolkit/javaagent/advice/akka/ResetAllConnectionPoolsAdvice.java

Lines changed: 0 additions & 26 deletions
This file was deleted.
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package tech.httptoolkit.javaagent.advice.akka;
2+
3+
import akka.http.impl.engine.client.PoolId;
4+
import akka.http.scaladsl.ClientTransport;
5+
import net.bytebuddy.asm.Advice;
6+
import scala.concurrent.Await;
7+
import scala.concurrent.Future;
8+
import scala.concurrent.duration.Duration;
9+
10+
import java.lang.reflect.Method;
11+
import java.util.Collections;
12+
import java.util.Set;
13+
import java.util.WeakHashMap;
14+
import java.util.concurrent.TimeUnit;
15+
import java.util.concurrent.TimeoutException;
16+
17+
public class ResetOldPoolsAdvice {
18+
19+
public static Set<PoolId> resetPoolIds = Collections.newSetFromMap(new WeakHashMap<>());
20+
21+
@Advice.OnMethodEnter
22+
public static void beforeDispatchRequest(
23+
@Advice.This Object thisPoolMaster,
24+
@Advice.Argument(value = 0) PoolId poolId
25+
) throws Exception {
26+
// If a pool config has been changed to use our proxy already, then we're perfect
27+
ClientTransport transport = poolId.hcps().setup().settings().transport();
28+
boolean alreadyIntercepted = transport == ResetPoolSetupAdvice.interceptedProxyTransport;
29+
// If not, it's still OK, as long as we've previously reset the pool to ensure the connection was
30+
// re-established (we hook connection setup too, so all new conns are intercepted, even with old config)
31+
boolean alreadyReset = resetPoolIds.contains(poolId);
32+
33+
if (alreadyIntercepted || alreadyReset) return;
34+
35+
// Otherwise this is a request to use a pre-existing connection pool which probably has connections open that
36+
// aren't using our proxy. We shutdown the pool before the request. It'll be restarted automatically when
37+
// the request does go through, but this ensures we re-establish connections (so it definitely gets intercepted)
38+
Method shutdownMethod = thisPoolMaster.getClass()
39+
.getDeclaredMethod("shutdown", PoolId.class);
40+
41+
Future<?> shutdownFuture = (Future<?>) shutdownMethod.invoke(thisPoolMaster, poolId);
42+
43+
// We wait a little, just to ensure the shutdown is definitely started before this request is dispatched.
44+
try {
45+
Await.result(shutdownFuture, Duration.apply(10, TimeUnit.MILLISECONDS));
46+
} catch (TimeoutException ignored) {}
47+
48+
// Lastly, we remember this pool id, so that we don't unnecessarily reset it again in future:
49+
resetPoolIds.add(poolId);
50+
}
51+
52+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package tech.httptoolkit.javaagent.advice.akka;
2+
3+
import akka.http.scaladsl.ClientTransport;
4+
import akka.http.scaladsl.ConnectionContext;
5+
import akka.http.scaladsl.settings.ConnectionPoolSettings;
6+
import net.bytebuddy.asm.Advice;
7+
import tech.httptoolkit.javaagent.HttpProxyAgent;
8+
9+
import java.net.InetSocketAddress;
10+
11+
public class ResetPoolSetupAdvice {
12+
13+
// We use this to avoid re-instantiating the proxy endlessly, but also to recognize intercepted
14+
// and pre-existing settings configurations when they're used.
15+
public static ClientTransport interceptedProxyTransport = ClientTransport.httpsProxy(
16+
new InetSocketAddress(
17+
HttpProxyAgent.getAgentProxyHost(),
18+
HttpProxyAgent.getAgentProxyPort()
19+
)
20+
);
21+
22+
@Advice.OnMethodExit
23+
public static void afterConstructor(
24+
@Advice.FieldValue(value = "settings", readOnly = false) ConnectionPoolSettings settings,
25+
@Advice.FieldValue(value = "connectionContext", readOnly = false) ConnectionContext connContext
26+
) {
27+
// Change all new outgoing connections to use the proxy:
28+
settings = settings.withTransport(interceptedProxyTransport);
29+
30+
// Change all new outgoing connections to trust our certificate:
31+
connContext = ConnectionContext.httpsClient(HttpProxyAgent.getInterceptedSslContext());
32+
}
33+
}

src/main/kotlin/tech/httptoolkit/javaagent/AgentMain.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ fun interceptAllHttps(config: Config, instrumentation: Instrumentation) {
105105
KtorCioEngineTransformer(logger),
106106
KtorClientTlsTransformer(logger),
107107
AkkaHttpTransformer(logger),
108+
AkkaPoolSettingsTransformer(logger),
108109
AkkaPoolTransformer(logger)
109110
).forEach { matchingAgentTransformer ->
110111
agentBuilder = matchingAgentTransformer.register(agentBuilder)

src/main/kotlin/tech/httptoolkit/javaagent/AkkaClientTransformers.kt

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,14 @@ package tech.httptoolkit.javaagent
33
import net.bytebuddy.agent.builder.AgentBuilder
44
import net.bytebuddy.asm.Advice
55
import net.bytebuddy.dynamic.DynamicType
6-
import net.bytebuddy.matcher.ElementMatchers.hasMethodName
7-
import net.bytebuddy.matcher.ElementMatchers.named
8-
import tech.httptoolkit.javaagent.advice.akka.OverrideHttpSettingsAdvice
9-
import tech.httptoolkit.javaagent.advice.akka.ResetAllConnectionPoolsAdvice
6+
import net.bytebuddy.matcher.ElementMatchers.*
7+
import tech.httptoolkit.javaagent.advice.akka.*
108

119
// First, we hook outgoing connection creation, and ensure that new connections always go via the proxy & trust us:
1210
class AkkaHttpTransformer(logger: TransformationLogger) : MatchingAgentTransformer(logger) {
1311
override fun register(builder: AgentBuilder): AgentBuilder {
1412
return builder
15-
.type(named("akka.http.scaladsl.HttpExt")) // Scala compiles Http()s methods here for some reason
13+
.type(named("akka.http.scaladsl.HttpExt")) // Scala compiles Http()s methods as 'Ext' for some reason
1614
.transform(this)
1715
}
1816

@@ -25,9 +23,27 @@ class AkkaHttpTransformer(logger: TransformationLogger) : MatchingAgentTransform
2523
}
2624
}
2725

28-
// Then, to ensure that any existing connections trust us too, we do a one-off connection pool reset,
29-
// triggered by the new PoolMaster.dispatchRequest (seems to happen for every request). This seems to
30-
// affects shared pools and individual pools too.
26+
// Second, when a connection pool setup is created (part of creating any connection pool, but also for
27+
// sending any individual request) we change its configuration. This isn't strictly necessary given the above,
28+
// but helps generally, and makes the 3rd step possible.
29+
class AkkaPoolSettingsTransformer(logger: TransformationLogger) : MatchingAgentTransformer(logger) {
30+
override fun register(builder: AgentBuilder): AgentBuilder {
31+
return builder
32+
.type(named("akka.http.impl.settings.ConnectionPoolSetup"))
33+
.transform(this)
34+
}
35+
36+
override fun transform(builder: DynamicType.Builder<*>): DynamicType.Builder<*> {
37+
return builder
38+
.visit(
39+
Advice.to(ResetPoolSetupAdvice::class.java)
40+
.on(isConstructor())
41+
)
42+
}
43+
}
44+
45+
// Then, to ensure that any existing connections trust us too, we monitor all calls to dispatchRequest, and reset
46+
// any pools that don't have intercepted configuration (so preare -existing), just once per pool id.
3147
class AkkaPoolTransformer(logger: TransformationLogger) : MatchingAgentTransformer(logger) {
3248
override fun register(builder: AgentBuilder): AgentBuilder {
3349
return builder
@@ -38,7 +54,7 @@ class AkkaPoolTransformer(logger: TransformationLogger) : MatchingAgentTransform
3854
override fun transform(builder: DynamicType.Builder<*>): DynamicType.Builder<*> {
3955
return builder
4056
.visit(
41-
Advice.to(ResetAllConnectionPoolsAdvice::class.java)
57+
Advice.to(ResetOldPoolsAdvice::class.java)
4258
.on(hasMethodName("dispatchRequest"))
4359
)
4460
}

src/main/kotlin/tech/httptoolkit/javaagent/KtorCioTransformers.kt

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,4 @@ class KtorCioEngineTransformer(logger: TransformationLogger) : MatchingAgentTran
6262
.visit(Advice.to(KtorResetProxyFieldAdvice::class.java)
6363
.on(hasMethodName("execute")))
6464
}
65-
}
66-
67-
/**
68-
*
69-
*
70-
* Proxy settings live in HttpClientEngineConfig. Stored on CIOEngine at creation time,
71-
* then used in execute() only. Reset then?
72-
*
73-
* Yes: change getProxy settings (changes all new engines immediately, though others shouldn't
74-
* need it) and reset on execute()
75-
*
76-
*/
65+
}

test-app/src/main/java/tech/httptoolkit/testapp/cases/AkkaRequestClientCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public int test(String url, Http client) throws URISyntaxException, ExecutionExc
2424
.singleRequest(HttpRequest.create(url))
2525
.toCompletableFuture();
2626
HttpResponse response = responseFuture.get();
27+
response.discardEntityBytes(system);
2728
return response.status().intValue();
2829
}
2930
}

0 commit comments

Comments
 (0)