diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java index 87cf57955b..39e7dfc480 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java @@ -210,6 +210,23 @@ public static class Collector { * How long grpc client will timeout in sending data to upstream. */ public static int GRPC_UPSTREAM_TIMEOUT = 30; + /** + * The interval in seconds to send a keepalive ping to the backend. + * If this is less than or equal to 0, the keepalive is disabled. + * + *

+ * Note: The minimum safe value is 10 seconds. Values below this may be rejected by the gRPC server. + * + * This maps to `collector.grpc_keepalive_time` in agent.config. + */ + public static long GRPC_KEEPALIVE_TIME = 120L; + /** + * The timeout in seconds to wait for a keepalive ack from the backend. + * If the ack is not received within this time, the connection is considered dead. + * + * This maps to `collector.grpc_keepalive_timeout` in agent.config. + */ + public static long GRPC_KEEPALIVE_TIMEOUT = 30L; /** * Get profile task list interval */ diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java index d7806ad674..b752cd62f6 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java @@ -27,6 +27,8 @@ import io.grpc.netty.NettyChannelBuilder; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.skywalking.apm.agent.core.conf.Config; public class GRPCChannel { /** @@ -39,6 +41,12 @@ private GRPCChannel(String host, int port, List channelBuilders, List decorators) throws Exception { ManagedChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port); + if (Config.Collector.GRPC_KEEPALIVE_TIME > 0) { + channelBuilder.keepAliveTime(Config.Collector.GRPC_KEEPALIVE_TIME, TimeUnit.SECONDS) + .keepAliveTimeout(Config.Collector.GRPC_KEEPALIVE_TIMEOUT, TimeUnit.SECONDS) + .keepAliveWithoutCalls(true); + } + NameResolverRegistry.getDefaultRegistry().register(new DnsNameResolverProvider()); for (ChannelBuilder builder : channelBuilders) { diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java index 9744398fd1..846c343bbd 100755 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java @@ -57,6 +57,7 @@ public class GRPCChannelManager implements BootService, Runnable { private volatile List grpcServers; private volatile int selectedIdx = -1; private volatile int reconnectCount = 0; + private final Object statusLock = new Object(); @Override public void prepare() { @@ -99,7 +100,10 @@ public void shutdown() { @Override public void run() { - LOGGER.debug("Selected collector grpc service running, reconnect:{}.", reconnect); + if (reconnect) { + LOGGER.warn("Selected collector grpc service running, reconnect:{}.", reconnect); + } + if (IS_RESOLVE_DNS_PERIODICALLY && reconnect) { grpcServers = Arrays.stream(Config.Collector.BACKEND_SERVICE.split(",")) .filter(StringUtil::isNotBlank) @@ -130,32 +134,28 @@ public void run() { String server = ""; try { int index = Math.abs(random.nextInt()) % grpcServers.size(); + server = grpcServers.get(index); + String[] ipAndPort = server.split(":"); + if (index != selectedIdx) { selectedIdx = index; + LOGGER.debug("Connecting to different gRPC server {}. Shutting down existing channel if any.", server); + createNewChannel(ipAndPort[0], Integer.parseInt(ipAndPort[1])); + } else { + // Same server, increment reconnectCount + reconnectCount++; - server = grpcServers.get(index); - String[] ipAndPort = server.split(":"); - - if (managedChannel != null) { - managedChannel.shutdownNow(); + if (reconnectCount > Config.Agent.FORCE_RECONNECTION_PERIOD) { + // Reconnect attempts exceeded threshold, force rebuild channel + LOGGER.warn("Reconnect attempts to {} exceeded threshold ({}), forcing channel rebuild", + server, Config.Agent.FORCE_RECONNECTION_PERIOD); + createNewChannel(ipAndPort[0], Integer.parseInt(ipAndPort[1])); + } else if (managedChannel.isConnected(false)) { + // Channel appears connected, trust it but keep reconnectCount for monitoring + LOGGER.debug("Channel to {} appears connected (reconnect attempt: {})", server, reconnectCount); + notifyConnected(); } - - managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1])) - .addManagedChannelBuilder(new StandardChannelBuilder()) - .addManagedChannelBuilder(new TLSChannelBuilder()) - .addChannelDecorator(new AgentIDDecorator()) - .addChannelDecorator(new AuthenticationDecorator()) - .build(); - reconnectCount = 0; - reconnect = false; - notify(GRPCChannelStatus.CONNECTED); - } else if (managedChannel.isConnected(++reconnectCount > Config.Agent.FORCE_RECONNECTION_PERIOD)) { - // Reconnect to the same server is automatically done by GRPC, - // therefore we are responsible to check the connectivity and - // set the state and notify listeners - reconnectCount = 0; - reconnect = false; - notify(GRPCChannelStatus.CONNECTED); + // else: Channel is disconnected and under threshold, wait for next retry } return; @@ -184,8 +184,7 @@ public Channel getChannel() { */ public void reportError(Throwable throwable) { if (isNetworkError(throwable)) { - reconnect = true; - notify(GRPCChannelStatus.DISCONNECT); + triggerReconnect(); } } @@ -199,6 +198,49 @@ private void notify(GRPCChannelStatus status) { } } + /** + * Create a new gRPC channel to the specified server and reset connection state. + */ + private void createNewChannel(String host, int port) throws Exception { + if (managedChannel != null) { + managedChannel.shutdownNow(); + } + + managedChannel = GRPCChannel.newBuilder(host, port) + .addManagedChannelBuilder(new StandardChannelBuilder()) + .addManagedChannelBuilder(new TLSChannelBuilder()) + .addChannelDecorator(new AgentIDDecorator()) + .addChannelDecorator(new AuthenticationDecorator()) + .build(); + + // Reset reconnectCount after actually rebuilding the channel + reconnectCount = 0; + notifyConnected(); + } + + /** + * Trigger reconnection by setting reconnect flag and notifying listeners. + */ + private void triggerReconnect() { + synchronized (statusLock) { + reconnect = true; + notify(GRPCChannelStatus.DISCONNECT); + } + } + + /** + * Notify listeners that connection is established without resetting reconnectCount. + * This is used when the channel appears connected but we want to keep monitoring + * reconnect attempts in case it's a false positive (half-open connection). + */ + private void notifyConnected() { + synchronized (statusLock) { + // Don't reset reconnectCount - connection might still be half-open + reconnect = false; + notify(GRPCChannelStatus.CONNECTED); + } + } + private boolean isNetworkError(Throwable throwable) { if (throwable instanceof StatusRuntimeException) { StatusRuntimeException statusRuntimeException = (StatusRuntimeException) throwable; diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config index 9056993626..c44eebed2a 100755 --- a/apm-sniffer/config/agent.config +++ b/apm-sniffer/config/agent.config @@ -100,6 +100,13 @@ collector.properties_report_period_factor=${SW_AGENT_COLLECTOR_PROPERTIES_REPORT collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:127.0.0.1:11800} # How long grpc client will timeout in sending data to upstream. Unit is second. collector.grpc_upstream_timeout=${SW_AGENT_COLLECTOR_GRPC_UPSTREAM_TIMEOUT:30} +# The interval in seconds to send a keepalive ping to the backend. +# If this is less than or equal to 0, the keepalive is disabled. +# Note: The minimum safe value is 10 seconds. Values below this may be rejected by the gRPC server. +#collector.grpc_keepalive_time=${SW_AGENT_COLLECTOR_GRPC_KEEPALIVE_TIME:120} +# The timeout in seconds to wait for a keepalive ack from the backend. +# If the ack is not received within this time, the connection is considered dead. +#collector.grpc_keepalive_timeout=${SW_AGENT_COLLECTOR_GRPC_KEEPALIVE_TIMEOUT:30} # Sniffer get profile task list interval. collector.get_profile_task_interval=${SW_AGENT_COLLECTOR_GET_PROFILE_TASK_INTERVAL:20} # Sniffer get agent dynamic config interval.