2626import graphql .ExecutionResult ;
2727import graphql .GraphQLError ;
2828import graphql .GraphqlErrorBuilder ;
29+ import org .apache .commons .logging .Log ;
2930import org .reactivestreams .Publisher ;
3031import reactor .core .publisher .BaseSubscriber ;
3132import reactor .core .publisher .Flux ;
@@ -125,8 +126,8 @@ protected ServerResponse prepareResponse(
125126 });
126127
127128 return ((this .timeout != null ) ?
128- ServerResponse .sse (SseSubscriber .connect (resultFlux , this .keepAliveDuration ), this .timeout ) :
129- ServerResponse .sse (SseSubscriber .connect (resultFlux , this .keepAliveDuration )));
129+ ServerResponse .sse (SseSubscriber .connect (resultFlux , this .logger , this . keepAliveDuration ), this .timeout ) :
130+ ServerResponse .sse (SseSubscriber .connect (resultFlux , this .logger , this . keepAliveDuration )));
130131 }
131132
132133
@@ -137,9 +138,12 @@ private static final class SseSubscriber extends BaseSubscriber<Map<String, Obje
137138
138139 private final ServerResponse .SseBuilder sseBuilder ;
139140
140- private SseSubscriber (ServerResponse .SseBuilder sseBuilder ) {
141+ private final Log logger ;
142+
143+ private SseSubscriber (ServerResponse .SseBuilder sseBuilder , Log logger ) {
141144 this .sseBuilder = sseBuilder ;
142145 this .sseBuilder .onTimeout (() -> cancelWithError (new AsyncRequestTimeoutException ()));
146+ this .logger = logger ;
143147 }
144148
145149 @ Override
@@ -180,20 +184,24 @@ private void cancelWithError(Throwable ex) {
180184
181185 @ Override
182186 protected void hookOnError (Throwable ex ) {
183- sendNext (exceptionToResultMap (ex ));
187+ Map <String , Object > errorMap ;
188+ if (ex instanceof SubscriptionPublisherException spe ) {
189+ errorMap = spe .toMap ();
190+ }
191+ else {
192+ if (this .logger .isErrorEnabled ()) {
193+ this .logger .error ("Unresolved " + ex .getClass ().getSimpleName (), ex );
194+ }
195+ errorMap = GraphqlErrorBuilder .newError ()
196+ .message ("Subscription error" )
197+ .errorType (org .springframework .graphql .execution .ErrorType .INTERNAL_ERROR )
198+ .build ()
199+ .toSpecification ();
200+ }
201+ sendNext (errorMap );
184202 sendComplete ();
185203 }
186204
187- private static Map <String , Object > exceptionToResultMap (Throwable ex ) {
188- return ((ex instanceof SubscriptionPublisherException spe ) ?
189- spe .toMap () :
190- GraphqlErrorBuilder .newError ()
191- .message ("Subscription error" )
192- .errorType (org .springframework .graphql .execution .ErrorType .INTERNAL_ERROR )
193- .build ()
194- .toSpecification ());
195- }
196-
197205 private void sendComplete () {
198206 try {
199207 this .sseBuilder .event ("complete" ).data ("" );
@@ -210,10 +218,10 @@ protected void hookOnComplete() {
210218 }
211219
212220 static Consumer <ServerResponse .SseBuilder > connect (
213- Flux <Map <String , Object >> resultFlux , @ Nullable Duration keepAliveDuration ) {
221+ Flux <Map <String , Object >> resultFlux , Log logger , @ Nullable Duration keepAliveDuration ) {
214222
215223 return (sseBuilder ) -> {
216- SseSubscriber subscriber = new SseSubscriber (sseBuilder );
224+ SseSubscriber subscriber = new SseSubscriber (sseBuilder , logger );
217225 if (keepAliveDuration != null ) {
218226 KeepAliveHandler handler = new KeepAliveHandler (keepAliveDuration );
219227 handler .compose (resultFlux ).subscribe (subscriber );
0 commit comments