1+ using System . Diagnostics ;
12using System . Net . Sockets ;
23using MySqlConnector . Logging ;
34using MySqlConnector . Protocol . Serialization ;
@@ -7,62 +8,71 @@ namespace MySqlConnector.Core;
78
89internal static class CommandExecutor
910{
10- public static async Task < MySqlDataReader > ExecuteReaderAsync ( IReadOnlyList < IMySqlCommand > commands , ICommandPayloadCreator payloadCreator , CommandBehavior behavior , IOBehavior ioBehavior , CancellationToken cancellationToken )
11+ public static async Task < MySqlDataReader > ExecuteReaderAsync ( IReadOnlyList < IMySqlCommand > commands , ICommandPayloadCreator payloadCreator , CommandBehavior behavior , Activity ? activity , IOBehavior ioBehavior , CancellationToken cancellationToken )
1112 {
12- cancellationToken . ThrowIfCancellationRequested ( ) ;
13- var commandListPosition = new CommandListPosition ( commands ) ;
14- var command = commands [ 0 ] ;
13+ try
14+ {
15+ cancellationToken . ThrowIfCancellationRequested ( ) ;
16+ var commandListPosition = new CommandListPosition ( commands ) ;
17+ var command = commands [ 0 ] ;
1518
16- // pre-requisite: Connection is non-null must be checked before calling this method
17- var connection = command . Connection ! ;
19+ // pre-requisite: Connection is non-null must be checked before calling this method
20+ var connection = command . Connection ! ;
1821
19- if ( Log . IsTraceEnabled ( ) )
20- Log . Trace ( "Session{0} ExecuteReader {1} CommandCount: {2}" , connection . Session . Id , ioBehavior , commands . Count ) ;
22+ if ( Log . IsTraceEnabled ( ) )
23+ Log . Trace ( "Session{0} ExecuteReader {1} CommandCount: {2}" , connection . Session . Id , ioBehavior , commands . Count ) ;
2124
22- Dictionary < string , CachedProcedure ? > ? cachedProcedures = null ;
23- foreach ( var command2 in commands )
24- {
25- if ( command2 . CommandType == CommandType . StoredProcedure )
25+ Dictionary < string , CachedProcedure ? > ? cachedProcedures = null ;
26+ foreach ( var command2 in commands )
2627 {
27- cachedProcedures ??= new ( ) ;
28- var commandText = command2 . CommandText ! ;
29- if ( ! cachedProcedures . ContainsKey ( commandText ) )
28+ if ( command2 . CommandType == CommandType . StoredProcedure )
3029 {
31- cachedProcedures . Add ( commandText , await connection . GetCachedProcedure ( commandText , revalidateMissing : false , ioBehavior , cancellationToken ) . ConfigureAwait ( false ) ) ;
30+ cachedProcedures ??= new ( ) ;
31+ var commandText = command2 . CommandText ! ;
32+ if ( ! cachedProcedures . ContainsKey ( commandText ) )
33+ {
34+ cachedProcedures . Add ( commandText , await connection . GetCachedProcedure ( commandText , revalidateMissing : false , ioBehavior , cancellationToken ) . ConfigureAwait ( false ) ) ;
3235
33- // because the connection was used to execute a MySqlDataReader with the connection's DefaultCommandTimeout,
34- // we need to reapply the command's CommandTimeout (even if some of the time has elapsed)
35- command . CancellableCommand . ResetCommandTimeout ( ) ;
36+ // because the connection was used to execute a MySqlDataReader with the connection's DefaultCommandTimeout,
37+ // we need to reapply the command's CommandTimeout (even if some of the time has elapsed)
38+ command . CancellableCommand . ResetCommandTimeout ( ) ;
39+ }
3640 }
3741 }
38- }
3942
40- var writer = new ByteBufferWriter ( ) ;
41- // cachedProcedures will be non-null if there is a stored procedure, which is also the only time it will be read
42- if ( ! payloadCreator . WriteQueryCommand ( ref commandListPosition , cachedProcedures ! , writer ) )
43- throw new InvalidOperationException ( "ICommandPayloadCreator failed to write query payload" ) ;
43+ var writer = new ByteBufferWriter ( ) ;
44+ // cachedProcedures will be non-null if there is a stored procedure, which is also the only time it will be read
45+ if ( ! payloadCreator . WriteQueryCommand ( ref commandListPosition , cachedProcedures ! , writer ) )
46+ throw new InvalidOperationException ( "ICommandPayloadCreator failed to write query payload" ) ;
4447
45- cancellationToken . ThrowIfCancellationRequested ( ) ;
48+ cancellationToken . ThrowIfCancellationRequested ( ) ;
4649
47- using var payload = writer . ToPayloadData ( ) ;
48- connection . Session . StartQuerying ( command . CancellableCommand ) ;
49- command . SetLastInsertedId ( - 1 ) ;
50- try
51- {
52- await connection . Session . SendAsync ( payload , ioBehavior , CancellationToken . None ) . ConfigureAwait ( false ) ;
53- return await MySqlDataReader . CreateAsync ( commandListPosition , payloadCreator , cachedProcedures , command , behavior , ioBehavior , cancellationToken ) . ConfigureAwait ( false ) ;
54- }
55- catch ( MySqlException ex ) when ( ex . ErrorCode == MySqlErrorCode . QueryInterrupted && cancellationToken . IsCancellationRequested )
56- {
57- Log . Info ( "Session{0} query was interrupted" , connection . Session . Id ) ;
58- throw new OperationCanceledException ( ex . Message , ex , cancellationToken ) ;
50+ using var payload = writer . ToPayloadData ( ) ;
51+ connection . Session . StartQuerying ( command . CancellableCommand ) ;
52+ command . SetLastInsertedId ( - 1 ) ;
53+ try
54+ {
55+ await connection . Session . SendAsync ( payload , ioBehavior , CancellationToken . None ) . ConfigureAwait ( false ) ;
56+ return await MySqlDataReader . CreateAsync ( commandListPosition , payloadCreator , cachedProcedures , command , behavior , activity , ioBehavior , cancellationToken ) . ConfigureAwait ( false ) ;
57+ }
58+ catch ( MySqlException ex ) when ( ex . ErrorCode == MySqlErrorCode . QueryInterrupted && cancellationToken . IsCancellationRequested )
59+ {
60+ Log . Info ( "Session{0} query was interrupted" , connection . Session . Id ) ;
61+ throw new OperationCanceledException ( ex . Message , ex , cancellationToken ) ;
62+ }
63+ catch ( Exception ex ) when ( payload . Span . Length > 4_194_304 && ( ex is SocketException or IOException or MySqlProtocolException ) )
64+ {
65+ // the default MySQL Server value for max_allowed_packet (in MySQL 5.7) is 4MiB: https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_max_allowed_packet
66+ // use "decimal megabytes" (to round up) when creating the exception message
67+ int megabytes = payload . Span . Length / 1_000_000 ;
68+ throw new MySqlException ( "Error submitting {0}MB packet; ensure 'max_allowed_packet' is greater than {0}MB." . FormatInvariant ( megabytes ) , ex ) ;
69+ }
5970 }
60- catch ( Exception ex ) when ( payload . Span . Length > 4_194_304 && ( ex is SocketException or IOException or MySqlProtocolException ) )
71+ catch ( Exception ex ) when ( activity is { IsAllDataRequested : true } )
6172 {
62- // the default MySQL Server value for max_allowed_packet (in MySQL 5.7) is 4MiB: https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_max_allowed_packet
63- // use "decimal megabytes" (to round up) when creating the exception message
64- int megabytes = payload . Span . Length / 1_000_000 ;
65- throw new MySqlException ( "Error submitting {0}MB packet; ensure 'max_allowed_packet' is greater than {0}MB." . FormatInvariant ( megabytes ) , ex ) ;
73+ activity . SetException ( ex ) ;
74+ activity . Stop ( ) ;
75+ throw ;
6676 }
6777 }
6878
0 commit comments