Skip to content

Commit a1c3ec7

Browse files
authored
Enhance Transactional Methods for Improved Request Queue Handling (#179)
Motivation: The current Transaction Methods implementation lacks consideration for the request queue, resulting in potential undefined behavior and issues. Modifications: Transaction Methods properly respect the request queue. Result: Bug resolved. Enhanced stability and reliability.
1 parent cf3a302 commit a1c3ec7

File tree

2 files changed

+64
-19
lines changed

2 files changed

+64
-19
lines changed

src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -184,22 +184,12 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS
184184

185185
@Override
186186
public Mono<Void> beginTransaction() {
187-
return Mono.defer(() -> {
188-
if (isInTransaction()) {
189-
return Mono.empty();
190-
}
191-
192-
return QueryFlow.executeVoid(client, "BEGIN");
193-
});
187+
return beginTransaction(MySqlTransactionDefinition.empty());
194188
}
195189

196190
@Override
197191
public Mono<Void> beginTransaction(TransactionDefinition definition) {
198192
return Mono.defer(() -> {
199-
if (isInTransaction()) {
200-
return Mono.empty();
201-
}
202-
203193
return QueryFlow.beginTransaction(client, this, batchSupported, definition);
204194
});
205195
}
@@ -219,10 +209,6 @@ public Mono<Void> close() {
219209
@Override
220210
public Mono<Void> commitTransaction() {
221211
return Mono.defer(() -> {
222-
if (!isInTransaction()) {
223-
return Mono.empty();
224-
}
225-
226212
return QueryFlow.doneTransaction(client, this, true, lockWaitTimeout, batchSupported);
227213
});
228214
}
@@ -300,10 +286,6 @@ public Mono<Void> releaseSavepoint(String name) {
300286
@Override
301287
public Mono<Void> rollbackTransaction() {
302288
return Mono.defer(() -> {
303-
if (!isInTransaction()) {
304-
return Mono.empty();
305-
}
306-
307289
return QueryFlow.doneTransaction(client, this, false, lockWaitTimeout, batchSupported);
308290
});
309291
}

src/test/java/io/asyncer/r2dbc/mysql/ConnectionIntegrationTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,69 @@ void errorPropagteRequestQueue() {
292292
);
293293
}
294294

295+
@Test
296+
void commitTransactionShouldRespectQueuedMessages() {
297+
final String tdl = "CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY, name VARCHAR(50))";
298+
complete(connection ->
299+
Mono.from(connection.createStatement(tdl).execute())
300+
.flatMap(IntegrationTestSupport::extractRowsUpdated)
301+
.thenMany(Flux.merge(
302+
connection.beginTransaction(),
303+
connection.createStatement("INSERT INTO test VALUES (1, 'test1')")
304+
.execute(),
305+
connection.commitTransaction()
306+
))
307+
.doOnComplete(() -> assertThat(connection.isInTransaction()).isFalse())
308+
.thenMany(connection.createStatement("SELECT COUNT(*) FROM test").execute())
309+
.flatMap(result ->
310+
Mono.from(result.map((row, metadata) -> row.get(0, Long.class)))
311+
)
312+
.doOnNext(text -> assertThat(text).isEqualTo(1L))
313+
);
314+
}
315+
316+
@Test
317+
void rollbackTransactionShouldRespectQueuedMessages() {
318+
final String tdl = "CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY, name VARCHAR(50))";
319+
complete(connection ->
320+
Mono.from(connection.createStatement(tdl).execute())
321+
.flatMap(IntegrationTestSupport::extractRowsUpdated)
322+
.thenMany(Flux.merge(
323+
connection.beginTransaction(),
324+
connection.createStatement("INSERT INTO test VALUES (1, 'test1')")
325+
.execute(),
326+
connection.rollbackTransaction()
327+
))
328+
.doOnComplete(() -> assertThat(connection.isInTransaction()).isFalse())
329+
.thenMany(connection.createStatement("SELECT COUNT(*) FROM test").execute())
330+
.flatMap(result -> Mono.from(result.map((row, metadata) -> row.get(0, Long.class)))
331+
.doOnNext(count -> assertThat(count).isEqualTo(0L)))
332+
);
333+
}
334+
335+
@Test
336+
void beginTransactionShouldRespectQueuedMessages() {
337+
final String tdl = "CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY, name VARCHAR(50))";
338+
complete(connection ->
339+
Mono.from(connection.createStatement(tdl).execute())
340+
.flatMap(IntegrationTestSupport::extractRowsUpdated)
341+
.then(Mono.from(connection.beginTransaction()))
342+
.doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isTrue())
343+
.thenMany(Flux.merge(
344+
connection.createStatement("INSERT INTO test VALUES (1, 'test1')").execute(),
345+
connection.commitTransaction(),
346+
connection.beginTransaction()
347+
))
348+
.doOnComplete(() -> assertThat(connection.isInTransaction()).isTrue())
349+
.then(Mono.from(connection.rollbackTransaction()))
350+
.doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isFalse())
351+
.thenMany(connection.createStatement("SELECT COUNT(*) FROM test").execute())
352+
.flatMap(result -> Mono.from(result.map((row, metadata) -> row.get(0, Long.class)))
353+
.doOnNext(count -> assertThat(count).isEqualTo(1L)))
354+
);
355+
356+
}
357+
295358
@Test
296359
void batchCrud() {
297360
// TODO: spilt it to multiple test cases and move it to BatchIntegrationTest

0 commit comments

Comments
 (0)