@@ -292,6 +292,69 @@ void errorPropagteRequestQueue() {
292292 );
293293 }
294294
295+ @ Test
296+ void commitTransactionShouldRespectQueuedMessages () {
297+ final String tdl = "CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY, name VARCHAR(50))" ;
298+ complete (connection ->
299+ Mono .from (connection .createStatement (tdl ).execute ())
300+ .flatMap (IntegrationTestSupport ::extractRowsUpdated )
301+ .thenMany (Flux .merge (
302+ connection .beginTransaction (),
303+ connection .createStatement ("INSERT INTO test VALUES (1, 'test1')" )
304+ .execute (),
305+ connection .commitTransaction ()
306+ ))
307+ .doOnComplete (() -> assertThat (connection .isInTransaction ()).isFalse ())
308+ .thenMany (connection .createStatement ("SELECT COUNT(*) FROM test" ).execute ())
309+ .flatMap (result ->
310+ Mono .from (result .map ((row , metadata ) -> row .get (0 , Long .class )))
311+ )
312+ .doOnNext (text -> assertThat (text ).isEqualTo (1L ))
313+ );
314+ }
315+
316+ @ Test
317+ void rollbackTransactionShouldRespectQueuedMessages () {
318+ final String tdl = "CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY, name VARCHAR(50))" ;
319+ complete (connection ->
320+ Mono .from (connection .createStatement (tdl ).execute ())
321+ .flatMap (IntegrationTestSupport ::extractRowsUpdated )
322+ .thenMany (Flux .merge (
323+ connection .beginTransaction (),
324+ connection .createStatement ("INSERT INTO test VALUES (1, 'test1')" )
325+ .execute (),
326+ connection .rollbackTransaction ()
327+ ))
328+ .doOnComplete (() -> assertThat (connection .isInTransaction ()).isFalse ())
329+ .thenMany (connection .createStatement ("SELECT COUNT(*) FROM test" ).execute ())
330+ .flatMap (result -> Mono .from (result .map ((row , metadata ) -> row .get (0 , Long .class )))
331+ .doOnNext (count -> assertThat (count ).isEqualTo (0L )))
332+ );
333+ }
334+
335+ @ Test
336+ void beginTransactionShouldRespectQueuedMessages () {
337+ final String tdl = "CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY, name VARCHAR(50))" ;
338+ complete (connection ->
339+ Mono .from (connection .createStatement (tdl ).execute ())
340+ .flatMap (IntegrationTestSupport ::extractRowsUpdated )
341+ .then (Mono .from (connection .beginTransaction ()))
342+ .doOnSuccess (ignored -> assertThat (connection .isInTransaction ()).isTrue ())
343+ .thenMany (Flux .merge (
344+ connection .createStatement ("INSERT INTO test VALUES (1, 'test1')" ).execute (),
345+ connection .commitTransaction (),
346+ connection .beginTransaction ()
347+ ))
348+ .doOnComplete (() -> assertThat (connection .isInTransaction ()).isTrue ())
349+ .then (Mono .from (connection .rollbackTransaction ()))
350+ .doOnSuccess (ignored -> assertThat (connection .isInTransaction ()).isFalse ())
351+ .thenMany (connection .createStatement ("SELECT COUNT(*) FROM test" ).execute ())
352+ .flatMap (result -> Mono .from (result .map ((row , metadata ) -> row .get (0 , Long .class )))
353+ .doOnNext (count -> assertThat (count ).isEqualTo (1L )))
354+ );
355+
356+ }
357+
295358 @ Test
296359 void batchCrud () {
297360 // TODO: spilt it to multiple test cases and move it to BatchIntegrationTest
0 commit comments