9797import static com .mongodb .internal .connection .ProtocolHelper .isCommandOk ;
9898import static com .mongodb .internal .logging .LogMessage .Level .DEBUG ;
9999import static com .mongodb .internal .thread .InterruptionUtil .translateInterruptedException ;
100+ import static com .mongodb .internal .tracing .Tags .CLIENT_CONNECTION_ID ;
101+ import static com .mongodb .internal .tracing .Tags .CURSOR_ID ;
102+ import static com .mongodb .internal .tracing .Tags .NAMESPACE ;
103+ import static com .mongodb .internal .tracing .Tags .QUERY_OPCODE ;
104+ import static com .mongodb .internal .tracing .Tags .QUERY_SUMMARY ;
105+ import static com .mongodb .internal .tracing .Tags .QUERY_TEXT ;
106+ import static com .mongodb .internal .tracing .Tags .SERVER_ADDRESS ;
107+ import static com .mongodb .internal .tracing .Tags .SERVER_CONNECTION_ID ;
108+ import static com .mongodb .internal .tracing .Tags .SERVER_PORT ;
109+ import static com .mongodb .internal .tracing .Tags .SERVER_TYPE ;
110+ import static com .mongodb .internal .tracing .Tags .SESSION_ID ;
111+ import static com .mongodb .internal .tracing .Tags .SYSTEM ;
112+ import static com .mongodb .internal .tracing .Tags .TRANSACTION_NUMBER ;
100113import static java .util .Arrays .asList ;
101114
102115/**
@@ -377,24 +390,13 @@ public boolean isClosed() {
377390 public <T > T sendAndReceive (final CommandMessage message , final Decoder <T > decoder , final OperationContext operationContext ) {
378391 Supplier <T > sendAndReceiveInternal = () -> sendAndReceiveInternal (
379392 message , decoder , operationContext );
380-
381- Span tracingSpan = createTracingSpan (message , operationContext );
382-
383393 try {
384394 return sendAndReceiveInternal .get ();
385- } catch (MongoCommandException e ) {
386- if (tracingSpan != null ) {
387- tracingSpan .error (e );
388- }
389-
395+ } catch (Throwable e ) {
390396 if (reauthenticationIsTriggered (e )) {
391397 return reauthenticateAndRetry (sendAndReceiveInternal , operationContext );
392398 }
393399 throw e ;
394- } finally {
395- if (tracingSpan != null ) {
396- tracingSpan .end ();
397- }
398400 }
399401 }
400402
@@ -406,9 +408,7 @@ public <T> void sendAndReceiveAsync(final CommandMessage message, final Decoder<
406408 AsyncSupplier <T > sendAndReceiveAsyncInternal = c -> sendAndReceiveAsyncInternal (
407409 message , decoder , operationContext , c );
408410
409- beginAsync ().<T >thenSupply (c -> {
410- sendAndReceiveAsyncInternal .getAsync (c );
411- }).onErrorIf (e -> reauthenticationIsTriggered (e ), (t , c ) -> {
411+ beginAsync ().thenSupply (sendAndReceiveAsyncInternal ::getAsync ).onErrorIf (this ::reauthenticationIsTriggered , (t , c ) -> {
412412 reauthenticateAndRetryAsync (sendAndReceiveAsyncInternal , operationContext , c );
413413 }).finish (callback );
414414 }
@@ -447,15 +447,31 @@ public boolean reauthenticationIsTriggered(@Nullable final Throwable t) {
447447 private <T > T sendAndReceiveInternal (final CommandMessage message , final Decoder <T > decoder ,
448448 final OperationContext operationContext ) {
449449 CommandEventSender commandEventSender ;
450+ Span tracingSpan = createTracingSpan (message , operationContext );
451+
450452 try (ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput (this )) {
451453 message .encode (bsonOutput , operationContext );
452- commandEventSender = createCommandEventSender (message , bsonOutput , operationContext );
454+ BsonDocument commandDocument = message .getCommandDocument (bsonOutput );
455+
456+ commandEventSender = createCommandEventSender (message , commandDocument , operationContext );
453457 commandEventSender .sendStartedEvent ();
458+
459+ if (tracingSpan != null && operationContext .getTracingManager ().isCommandPayloadEnabled ()) {
460+ tracingSpan .tag (QUERY_TEXT , commandDocument .toJson ());
461+ }
462+
454463 try {
455464 sendCommandMessage (message , bsonOutput , operationContext );
456465 } catch (Exception e ) {
466+ if (tracingSpan != null ) {
467+ tracingSpan .error (e );
468+ }
457469 commandEventSender .sendFailedEvent (e );
458470 throw e ;
471+ } finally {
472+ if (tracingSpan != null ) {
473+ tracingSpan .end ();
474+ }
459475 }
460476 }
461477
@@ -568,7 +584,8 @@ private <T> void sendAndReceiveAsyncInternal(final CommandMessage message, final
568584
569585 try {
570586 message .encode (bsonOutput , operationContext );
571- CommandEventSender commandEventSender = createCommandEventSender (message , bsonOutput , operationContext );
587+ BsonDocument commandDocument = message .getCommandDocument (bsonOutput );
588+ CommandEventSender commandEventSender = createCommandEventSender (message , commandDocument , operationContext );
572589 commandEventSender .sendStartedEvent ();
573590 Compressor localSendCompressor = sendCompressor ;
574591 if (localSendCompressor == null || SECURITY_SENSITIVE_COMMANDS .contains (message .getCommandDocument (bsonOutput ).getFirstKey ())) {
@@ -887,42 +904,6 @@ public ByteBuf getBuffer(final int size) {
887904 return stream .getBuffer (size );
888905 }
889906
890- @ Nullable
891- private Span createTracingSpan (final CommandMessage message , final OperationContext operationContext ) {
892- TracingManager tracingManager = operationContext .getTracingManager ();
893- Span span ;
894- if (tracingManager .isEnabled ()) {
895- BsonDocument command = message .getCommand ();
896- TraceContext parentContext = null ;
897- long cursorId = -1 ;
898- if (command .containsKey ("getMore" )) {
899- cursorId = command .getInt64 ("getMore" ).longValue ();
900- parentContext = tracingManager .getCursorParentContext (cursorId );
901- } else {
902- parentContext = tracingManager .getParentContext (operationContext .getId ());
903- }
904-
905- span = tracingManager .addSpan ("Command " + command .getFirstKey (), parentContext );
906- span .tag ("db.system" , "mongodb" );
907- span .tag ("db.namespace" , message .getNamespace ().getFullName ());
908- span .tag ("db.query.summary" , command .getFirstKey ());
909- span .tag ("db.query.opcode" , String .valueOf (message .getOpCode ()));
910- span .tag ("db.query.text" , command .toString ());
911- if (cursorId != -1 ) {
912- span .tag ("db.mongodb.cursor_id" , String .valueOf (cursorId ));
913- }
914- span .tag ("server.address" , serverId .getAddress ().getHost ());
915- span .tag ("server.port" , String .valueOf (serverId .getAddress ().getPort ()));
916- span .tag ("server.type" , message .getSettings ().getServerType ().name ());
917-
918- span .tag ("db.mongodb.server_connection_id" , this .description .getConnectionId ().toString ());
919- } else {
920- span = null ;
921- }
922-
923- return span ;
924- }
925-
926907 private class MessageHeaderCallback implements SingleResultCallback <ByteBuf > {
927908 private final OperationContext operationContext ;
928909 private final SingleResultCallback <ResponseBuffers > callback ;
@@ -1003,19 +984,88 @@ public void onResult(@Nullable final ByteBuf result, @Nullable final Throwable t
1003984
1004985 private static final StructuredLogger COMMAND_PROTOCOL_LOGGER = new StructuredLogger ("protocol.command" );
1005986
1006- private CommandEventSender createCommandEventSender (final CommandMessage message , final ByteBufferBsonOutput bsonOutput ,
987+ private CommandEventSender createCommandEventSender (final CommandMessage message , final BsonDocument commandDocument ,
1007988 final OperationContext operationContext ) {
1008989 boolean listensOrLogs = commandListener != null || COMMAND_PROTOCOL_LOGGER .isRequired (DEBUG , getClusterId ());
1009990 if (!recordEverything && (isMonitoringConnection || !opened () || !authenticated .get () || !listensOrLogs )) {
1010991 return new NoOpCommandEventSender ();
1011992 }
1012993 return new LoggingCommandEventSender (
1013994 SECURITY_SENSITIVE_COMMANDS , SECURITY_SENSITIVE_HELLO_COMMANDS , description , commandListener ,
1014- operationContext , message , bsonOutput ,
995+ operationContext , message , commandDocument ,
1015996 COMMAND_PROTOCOL_LOGGER , loggerSettings );
1016997 }
1017998
1018999 private ClusterId getClusterId () {
10191000 return description .getConnectionId ().getServerId ().getClusterId ();
10201001 }
1002+
1003+ /**
1004+ * Creates a tracing span for the given command message.
1005+ * <p>
1006+ * The span is only created if tracing is enabled and the command is not security-sensitive.
1007+ * It attaches various tags to the span, such as database system, namespace, query summary, opcode,
1008+ * server address, port, server type, client and server connection IDs, and, if applicable,
1009+ * transaction number and session ID. For cursor fetching commands, the parent context is retrieved using the cursor ID.
1010+ * If command payload tracing is enabled, the command document is also attached as a tag.
1011+ *
1012+ * @param message the command message to trace
1013+ * @param operationContext the operation context containing tracing and session information
1014+ * @return the created {@link Span}, or {@code null} if tracing is not enabled or the command is security-sensitive
1015+ */
1016+ @ Nullable
1017+ private Span createTracingSpan (final CommandMessage message , final OperationContext operationContext ) {
1018+ TracingManager tracingManager = operationContext .getTracingManager ();
1019+ BsonDocument command = message .getCommand ();
1020+ String commandName = command .getFirstKey ();
1021+ if (!tracingManager .isEnabled ()
1022+ || SECURITY_SENSITIVE_COMMANDS .contains (commandName )
1023+ || SECURITY_SENSITIVE_HELLO_COMMANDS .contains (commandName )) {
1024+ return null ;
1025+ }
1026+
1027+ // Retrieving the appropriate parent context for the span.
1028+ TraceContext parentContext ;
1029+ long cursorId = -1 ;
1030+ if (command .containsKey ("getMore" )) {
1031+ cursorId = command .getInt64 ("getMore" ).longValue ();
1032+ parentContext = tracingManager .getCursorParentContext (cursorId );
1033+ } else {
1034+ parentContext = tracingManager .getParentContext (operationContext .getId ());
1035+ }
1036+
1037+ Span span = tracingManager
1038+ .addSpan ("Command " + commandName , parentContext )
1039+ .tag (SYSTEM , "mongodb" )
1040+ .tag (NAMESPACE , message .getNamespace ().getDatabaseName ())
1041+ .tag (QUERY_SUMMARY , command .toString ())
1042+ .tag (QUERY_OPCODE , String .valueOf (message .getOpCode ()));
1043+
1044+ if (cursorId != -1 ) {
1045+ span .tag (CURSOR_ID , cursorId );
1046+ }
1047+
1048+ tagServerAndConnectionInfo (span , message );
1049+ tagSessionAndTransactionInfo (span , operationContext );
1050+
1051+ return span ;
1052+ }
1053+
1054+ private void tagServerAndConnectionInfo (final Span span , final CommandMessage message ) {
1055+ span .tag (SERVER_ADDRESS , serverId .getAddress ().getHost ())
1056+ .tag (SERVER_PORT , String .valueOf (serverId .getAddress ().getPort ()))
1057+ .tag (SERVER_TYPE , message .getSettings ().getServerType ().name ())
1058+ .tag (CLIENT_CONNECTION_ID , this .description .getConnectionId ().toString ())
1059+ .tag (SERVER_CONNECTION_ID , String .valueOf (this .description .getConnectionId ().getServerValue ()));
1060+ }
1061+
1062+ private void tagSessionAndTransactionInfo (final Span span , final OperationContext operationContext ) {
1063+ SessionContext sessionContext = operationContext .getSessionContext ();
1064+ if (sessionContext .hasSession () && !sessionContext .isImplicitSession ()) {
1065+ span .tag (TRANSACTION_NUMBER , String .valueOf (sessionContext .getTransactionNumber ()))
1066+ .tag (SESSION_ID , String .valueOf (sessionContext .getSessionId ()
1067+ .get (sessionContext .getSessionId ().getFirstKey ())
1068+ .asBinary ().asUuid ()));
1069+ }
1070+ }
10211071}
0 commit comments