diff --git a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java index 54d65936fab..fae22848f7c 100644 --- a/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java +++ b/binder/src/main/java/io/grpc/binder/internal/BinderClientTransport.java @@ -25,6 +25,8 @@ import android.os.IBinder; import android.os.Parcel; import android.os.Process; + +import com.google.common.base.Preconditions; import com.google.common.base.Ticker; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -57,7 +59,9 @@ import io.grpc.internal.StatsTraceContext; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; + import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -75,6 +79,12 @@ public final class BinderClientTransport extends BinderTransport /** Number of ongoing calls which keep this transport "in-use". */ private final AtomicInteger numInUseStreams; + /** Last in-use state that was reported to the listener */ + private final AtomicBoolean listenerInUse; + + /** Synchronizes transport listener callbacks */ + private final Object listenerNotifyLock; + private final long readyTimeoutMillis; private final PingTracker pingTracker; private final boolean preAuthorizeServer; @@ -114,7 +124,10 @@ public BinderClientTransport( Boolean preAuthServerOverride = options.getEagAttributes().get(PRE_AUTH_SERVER_OVERRIDE); this.preAuthorizeServer = preAuthServerOverride != null ? preAuthServerOverride : factory.preAuthorizeServers; - numInUseStreams = new AtomicInteger(); + this.numInUseStreams = new AtomicInteger(); + this.listenerInUse = new AtomicBoolean(); + this.listenerNotifyLock = new Object(); + pingTracker = new PingTracker(Ticker.systemTicker(), (id) -> sendPing(id)); serviceBinding = @@ -259,9 +272,7 @@ public synchronized ClientStream newStream( return newFailingClientStream(failure, attributes, headers, tracers); } - if (inbound.countsForInUse() && numInUseStreams.getAndIncrement() == 0) { - clientTransportListener.transportInUse(true); - } + updateInUseStreamsCountIfNeeded(inbound.countsForInUse(), 1); Outbound.ClientOutbound outbound = new Outbound.ClientOutbound(this, callId, method, headers, statsTraceContext); if (method.getType().clientSendsOneMessage()) { @@ -273,9 +284,7 @@ public synchronized ClientStream newStream( @Override protected void unregisterInbound(Inbound inbound) { - if (inbound.countsForInUse() && numInUseStreams.decrementAndGet() == 0) { - clientTransportListener.transportInUse(false); - } + updateInUseStreamsCountIfNeeded(inbound.countsForInUse(), -1); super.unregisterInbound(inbound); } @@ -306,6 +315,7 @@ void notifyShutdown(Status status) { @GuardedBy("this") void notifyTerminated() { if (numInUseStreams.getAndSet(0) > 0) { + listenerInUse.set(false); clientTransportListener.transportInUse(false); } if (readyTimeoutFuture != null) { @@ -391,6 +401,63 @@ private synchronized void handleAuthResult(Throwable t) { Status.INTERNAL.withDescription("Could not evaluate SecurityPolicy").withCause(t), true); } + /** + * Updates in-use stream count and notifies listener only on transitions between 0 and >0, without + * acquiring the transport lock. + */ + private void updateInUseStreamsCountIfNeeded(boolean countsForInUse, int delta) { + Preconditions.checkArgument(delta == -1 || delta == 1, "stream count delta must be -1 or +1"); + if (!countsForInUse) { + return; + } + int prev, next; + + if (delta > 0) { + next = numInUseStreams.incrementAndGet(); + prev = next - 1; + } else { + prev = numInUseStreams.get(); + int updated; + + while (true) { + int current = prev; + int newValue = current > 0 ? current - 1 : 0; + if (numInUseStreams.compareAndSet(current, newValue)) { + updated = newValue; + break; + } + prev = numInUseStreams.get(); + } + next = updated; + } + + boolean prevInUse = prev > 0; + boolean nextInUse = next > 0; + + if (prevInUse != nextInUse) { + if (listenerInUse.compareAndSet(prevInUse, nextInUse)) { + scheduleTransportInUseNotification(nextInUse); + } + } + } + + private void scheduleTransportInUseNotification(final boolean inUse) { + getScheduledExecutorService() + .execute( + new Runnable() { + @Override + public void run() { + // Provide external synchronization as required by Listener contract, + // without taking the transport lock to avoid potential deadlocks. + synchronized (listenerNotifyLock) { + if (listenerInUse.get() == inUse) { + clientTransportListener.transportInUse(inUse); + } + } + } + }); + } + @GuardedBy("this") @Override protected void handlePingResponse(Parcel parcel) {