3030import io .grpc .InternalLogId ;
3131import io .grpc .Metadata ;
3232import io .grpc .Status ;
33- import io .grpc .Status .Code ;
3433import io .grpc .internal .AbstractServerStream ;
3534import io .grpc .internal .GrpcUtil ;
3635import io .grpc .internal .SerializingExecutor ;
4342import java .util .Collections ;
4443import java .util .HashMap ;
4544import java .util .Map ;
46- import java .util .concurrent .CountDownLatch ;
47- import java .util .concurrent .TimeUnit ;
4845import java .util .function .Supplier ;
4946import java .util .logging .Logger ;
5047import javax .annotation .Nullable ;
@@ -58,12 +55,15 @@ final class ServletServerStream extends AbstractServerStream {
5855
5956 private final ServletTransportState transportState ;
6057 private final Sink sink = new Sink ();
61- private final AsyncContext asyncCtx ;
6258 private final HttpServletResponse resp ;
6359 private final Attributes attributes ;
6460 private final String authority ;
6561 private final InternalLogId logId ;
6662 private final AsyncServletOutputStreamWriter writer ;
63+ /**
64+ * If the async servlet operation has been completed.
65+ */
66+ volatile boolean asyncCompleted = false ;
6767
6868 ServletServerStream (
6969 AsyncContext asyncCtx ,
@@ -78,7 +78,6 @@ final class ServletServerStream extends AbstractServerStream {
7878 this .attributes = attributes ;
7979 this .authority = authority ;
8080 this .logId = logId ;
81- this .asyncCtx = asyncCtx ;
8281 this .resp = (HttpServletResponse ) asyncCtx .getResponse ();
8382 this .writer = new AsyncServletOutputStreamWriter (
8483 asyncCtx , transportState , logId );
@@ -292,24 +291,14 @@ public void writeTrailers(Metadata trailers, boolean headersSent, Status status)
292291
293292 @ Override
294293 public void cancel (Status status ) {
295- if (resp .isCommitted () && Code .DEADLINE_EXCEEDED == status .getCode ()) {
296- return ; // let the servlet timeout, the container will sent RST_STREAM automatically
297- }
298294 transportState .runOnTransportThread (() -> transportState .transportReportStatus (status ));
299- // There is no way to RST_STREAM with CANCEL code, so write trailers instead
300- close (Status .CANCELLED .withDescription ("Servlet stream cancelled" )
301- .withCause (status .asRuntimeException ()),
302- new Metadata ());
303- CountDownLatch countDownLatch = new CountDownLatch (1 );
304- transportState .runOnTransportThread (() -> {
305- asyncCtx .complete ();
306- countDownLatch .countDown ();
307- });
308- try {
309- countDownLatch .await (5 , TimeUnit .SECONDS );
310- } catch (InterruptedException e ) {
311- Thread .currentThread ().interrupt ();
295+ if (asyncCompleted ) {
296+ logger .fine ("ignore cancel as already completed" );
297+ return ;
312298 }
299+ // There is no way to RST_STREAM with CANCEL code, so write trailers instead
300+ close (status , new Metadata ());
301+ // close() calls writeTrailers(), which calls AsyncContext.complete()
313302 }
314303 }
315304
0 commit comments