diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
index 87cf57955b..39e7dfc480 100755
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
@@ -210,6 +210,23 @@ public static class Collector {
* How long grpc client will timeout in sending data to upstream.
*/
public static int GRPC_UPSTREAM_TIMEOUT = 30;
+ /**
+ * The interval in seconds to send a keepalive ping to the backend.
+ * If this is less than or equal to 0, the keepalive is disabled.
+ *
+ *
+ * Note: The minimum safe value is 10 seconds. Values below this may be rejected by the gRPC server.
+ *
+ * This maps to `collector.grpc_keepalive_time` in agent.config.
+ */
+ public static long GRPC_KEEPALIVE_TIME = 120L;
+ /**
+ * The timeout in seconds to wait for a keepalive ack from the backend.
+ * If the ack is not received within this time, the connection is considered dead.
+ *
+ * This maps to `collector.grpc_keepalive_timeout` in agent.config.
+ */
+ public static long GRPC_KEEPALIVE_TIMEOUT = 30L;
/**
* Get profile task list interval
*/
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java
index d7806ad674..b752cd62f6 100755
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java
@@ -27,6 +27,8 @@
import io.grpc.netty.NettyChannelBuilder;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.skywalking.apm.agent.core.conf.Config;
public class GRPCChannel {
/**
@@ -39,6 +41,12 @@ private GRPCChannel(String host, int port, List channelBuilders,
List decorators) throws Exception {
ManagedChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port);
+ if (Config.Collector.GRPC_KEEPALIVE_TIME > 0) {
+ channelBuilder.keepAliveTime(Config.Collector.GRPC_KEEPALIVE_TIME, TimeUnit.SECONDS)
+ .keepAliveTimeout(Config.Collector.GRPC_KEEPALIVE_TIMEOUT, TimeUnit.SECONDS)
+ .keepAliveWithoutCalls(true);
+ }
+
NameResolverRegistry.getDefaultRegistry().register(new DnsNameResolverProvider());
for (ChannelBuilder builder : channelBuilders) {
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java
index 9744398fd1..846c343bbd 100755
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java
@@ -57,6 +57,7 @@ public class GRPCChannelManager implements BootService, Runnable {
private volatile List grpcServers;
private volatile int selectedIdx = -1;
private volatile int reconnectCount = 0;
+ private final Object statusLock = new Object();
@Override
public void prepare() {
@@ -99,7 +100,10 @@ public void shutdown() {
@Override
public void run() {
- LOGGER.debug("Selected collector grpc service running, reconnect:{}.", reconnect);
+ if (reconnect) {
+ LOGGER.warn("Selected collector grpc service running, reconnect:{}.", reconnect);
+ }
+
if (IS_RESOLVE_DNS_PERIODICALLY && reconnect) {
grpcServers = Arrays.stream(Config.Collector.BACKEND_SERVICE.split(","))
.filter(StringUtil::isNotBlank)
@@ -130,32 +134,28 @@ public void run() {
String server = "";
try {
int index = Math.abs(random.nextInt()) % grpcServers.size();
+ server = grpcServers.get(index);
+ String[] ipAndPort = server.split(":");
+
if (index != selectedIdx) {
selectedIdx = index;
+ LOGGER.debug("Connecting to different gRPC server {}. Shutting down existing channel if any.", server);
+ createNewChannel(ipAndPort[0], Integer.parseInt(ipAndPort[1]));
+ } else {
+ // Same server, increment reconnectCount
+ reconnectCount++;
- server = grpcServers.get(index);
- String[] ipAndPort = server.split(":");
-
- if (managedChannel != null) {
- managedChannel.shutdownNow();
+ if (reconnectCount > Config.Agent.FORCE_RECONNECTION_PERIOD) {
+ // Reconnect attempts exceeded threshold, force rebuild channel
+ LOGGER.warn("Reconnect attempts to {} exceeded threshold ({}), forcing channel rebuild",
+ server, Config.Agent.FORCE_RECONNECTION_PERIOD);
+ createNewChannel(ipAndPort[0], Integer.parseInt(ipAndPort[1]));
+ } else if (managedChannel.isConnected(false)) {
+ // Channel appears connected, trust it but keep reconnectCount for monitoring
+ LOGGER.debug("Channel to {} appears connected (reconnect attempt: {})", server, reconnectCount);
+ notifyConnected();
}
-
- managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
- .addManagedChannelBuilder(new StandardChannelBuilder())
- .addManagedChannelBuilder(new TLSChannelBuilder())
- .addChannelDecorator(new AgentIDDecorator())
- .addChannelDecorator(new AuthenticationDecorator())
- .build();
- reconnectCount = 0;
- reconnect = false;
- notify(GRPCChannelStatus.CONNECTED);
- } else if (managedChannel.isConnected(++reconnectCount > Config.Agent.FORCE_RECONNECTION_PERIOD)) {
- // Reconnect to the same server is automatically done by GRPC,
- // therefore we are responsible to check the connectivity and
- // set the state and notify listeners
- reconnectCount = 0;
- reconnect = false;
- notify(GRPCChannelStatus.CONNECTED);
+ // else: Channel is disconnected and under threshold, wait for next retry
}
return;
@@ -184,8 +184,7 @@ public Channel getChannel() {
*/
public void reportError(Throwable throwable) {
if (isNetworkError(throwable)) {
- reconnect = true;
- notify(GRPCChannelStatus.DISCONNECT);
+ triggerReconnect();
}
}
@@ -199,6 +198,49 @@ private void notify(GRPCChannelStatus status) {
}
}
+ /**
+ * Create a new gRPC channel to the specified server and reset connection state.
+ */
+ private void createNewChannel(String host, int port) throws Exception {
+ if (managedChannel != null) {
+ managedChannel.shutdownNow();
+ }
+
+ managedChannel = GRPCChannel.newBuilder(host, port)
+ .addManagedChannelBuilder(new StandardChannelBuilder())
+ .addManagedChannelBuilder(new TLSChannelBuilder())
+ .addChannelDecorator(new AgentIDDecorator())
+ .addChannelDecorator(new AuthenticationDecorator())
+ .build();
+
+ // Reset reconnectCount after actually rebuilding the channel
+ reconnectCount = 0;
+ notifyConnected();
+ }
+
+ /**
+ * Trigger reconnection by setting reconnect flag and notifying listeners.
+ */
+ private void triggerReconnect() {
+ synchronized (statusLock) {
+ reconnect = true;
+ notify(GRPCChannelStatus.DISCONNECT);
+ }
+ }
+
+ /**
+ * Notify listeners that connection is established without resetting reconnectCount.
+ * This is used when the channel appears connected but we want to keep monitoring
+ * reconnect attempts in case it's a false positive (half-open connection).
+ */
+ private void notifyConnected() {
+ synchronized (statusLock) {
+ // Don't reset reconnectCount - connection might still be half-open
+ reconnect = false;
+ notify(GRPCChannelStatus.CONNECTED);
+ }
+ }
+
private boolean isNetworkError(Throwable throwable) {
if (throwable instanceof StatusRuntimeException) {
StatusRuntimeException statusRuntimeException = (StatusRuntimeException) throwable;
diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config
index 9056993626..c44eebed2a 100755
--- a/apm-sniffer/config/agent.config
+++ b/apm-sniffer/config/agent.config
@@ -100,6 +100,13 @@ collector.properties_report_period_factor=${SW_AGENT_COLLECTOR_PROPERTIES_REPORT
collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:127.0.0.1:11800}
# How long grpc client will timeout in sending data to upstream. Unit is second.
collector.grpc_upstream_timeout=${SW_AGENT_COLLECTOR_GRPC_UPSTREAM_TIMEOUT:30}
+# The interval in seconds to send a keepalive ping to the backend.
+# If this is less than or equal to 0, the keepalive is disabled.
+# Note: The minimum safe value is 10 seconds. Values below this may be rejected by the gRPC server.
+#collector.grpc_keepalive_time=${SW_AGENT_COLLECTOR_GRPC_KEEPALIVE_TIME:120}
+# The timeout in seconds to wait for a keepalive ack from the backend.
+# If the ack is not received within this time, the connection is considered dead.
+#collector.grpc_keepalive_timeout=${SW_AGENT_COLLECTOR_GRPC_KEEPALIVE_TIMEOUT:30}
# Sniffer get profile task list interval.
collector.get_profile_task_interval=${SW_AGENT_COLLECTOR_GET_PROFILE_TASK_INTERVAL:20}
# Sniffer get agent dynamic config interval.