Skip to content

Commit 5c1883e

Browse files
committed
Add LOCAL INFILE support
- Improve envelope codec for `SubsequenceClientMessage` - Add `LocalInfileRequest`/`LocalInfileResponse` for text protocol
1 parent df0a358 commit 5c1883e

27 files changed

+850
-78
lines changed

src/main/java/io/asyncer/r2dbc/mysql/ConnectionContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,10 @@ public ZeroDateOption getZeroDateOption() {
119119
return zeroDateOption;
120120
}
121121

122+
public int getLocalInfileBufferSize() {
123+
return 64 * 1024;
124+
}
125+
122126
/**
123127
* Get the bitmap of server statuses.
124128
*

src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java

Lines changed: 84 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222
import io.asyncer.r2dbc.mysql.client.FluxExchangeable;
2323
import io.asyncer.r2dbc.mysql.constant.ServerStatuses;
2424
import io.asyncer.r2dbc.mysql.constant.SslMode;
25-
import io.asyncer.r2dbc.mysql.internal.util.InternalArrays;
2625
import io.asyncer.r2dbc.mysql.internal.util.StringUtils;
2726
import io.asyncer.r2dbc.mysql.message.client.AuthResponse;
2827
import io.asyncer.r2dbc.mysql.message.client.ClientMessage;
2928
import io.asyncer.r2dbc.mysql.message.client.HandshakeResponse;
30-
import io.asyncer.r2dbc.mysql.message.client.LoginClientMessage;
29+
import io.asyncer.r2dbc.mysql.message.client.LocalInfileResponse;
30+
import io.asyncer.r2dbc.mysql.message.client.SubsequenceClientMessage;
3131
import io.asyncer.r2dbc.mysql.message.client.PingMessage;
3232
import io.asyncer.r2dbc.mysql.message.client.PrepareQueryMessage;
3333
import io.asyncer.r2dbc.mysql.message.client.PreparedCloseMessage;
@@ -44,6 +44,7 @@
4444
import io.asyncer.r2dbc.mysql.message.server.ErrorMessage;
4545
import io.asyncer.r2dbc.mysql.message.server.HandshakeHeader;
4646
import io.asyncer.r2dbc.mysql.message.server.HandshakeRequest;
47+
import io.asyncer.r2dbc.mysql.message.server.LocalInfileRequest;
4748
import io.asyncer.r2dbc.mysql.message.server.OkMessage;
4849
import io.asyncer.r2dbc.mysql.message.server.PreparedOkMessage;
4950
import io.asyncer.r2dbc.mysql.message.server.ServerMessage;
@@ -74,6 +75,7 @@
7475
import java.util.List;
7576
import java.util.Map;
7677
import java.util.concurrent.atomic.AtomicBoolean;
78+
import java.util.concurrent.atomic.AtomicInteger;
7779
import java.util.function.Consumer;
7880
import java.util.function.Predicate;
7981

@@ -209,36 +211,26 @@ static Mono<Client> login(Client client, SslMode sslMode, String database, Strin
209211
* terminates with the last {@link CompleteMessage} or a {@link ErrorMessage}. The {@link ErrorMessage}
210212
* will emit an exception. The exchange will be completed by {@link CompleteMessage} after receive the
211213
* last result for the last binding.
214+
* <p>
215+
* Note: this method does not support {@code LOCAL INFILE} due to it should be used for excepted queries.
212216
*
213217
* @param client the {@link Client} to exchange messages with.
214218
* @param sql the query to execute, can be contains multi-statements.
215219
* @return receives complete signal.
216220
*/
217221
static Mono<Void> executeVoid(Client client, String sql) {
218-
return Mono.defer(() -> execute0(client, sql).doOnNext(EXECUTE_VOID).then());
219-
}
222+
return Mono.defer(() -> client.<ServerMessage>exchange(new TextQueryMessage(sql), (message, sink) -> {
223+
if (message instanceof ErrorMessage) {
224+
sink.next(((ErrorMessage) message).offendedBy(sql));
225+
sink.complete();
226+
} else {
227+
sink.next(message);
220228

221-
/**
222-
* Execute multiple simple queries with one-by-one and return a {@link Mono} for the complete signal or
223-
* error. Query execution terminates with the last {@link CompleteMessage} or a {@link ErrorMessage}. The
224-
* {@link ErrorMessage} will emit an exception and cancel subsequent statements execution. The exchange
225-
* will be completed by {@link CompleteMessage} after receive the last result for the last binding.
226-
*
227-
* @param client the {@link Client} to exchange messages with.
228-
* @param statements the queries to execute, each element can be contains multi-statements.
229-
* @return receives complete signal.
230-
*/
231-
static Mono<Void> executeVoid(Client client, String... statements) {
232-
switch (statements.length) {
233-
case 0:
234-
return Mono.empty();
235-
case 1:
236-
return executeVoid(client, statements[0]);
237-
default:
238-
return client.exchange(new MultiQueryExchangeable(InternalArrays.asIterator(statements)))
239-
.doOnNext(EXECUTE_VOID)
240-
.then();
241-
}
229+
if (message instanceof CompleteMessage && ((CompleteMessage) message).isDone()) {
230+
sink.complete();
231+
}
232+
}
233+
}).doOnSubscribe(ignored -> QueryLogger.log(sql)).doOnNext(EXECUTE_VOID).then());
242234
}
243235

244236
/**
@@ -303,18 +295,7 @@ static Mono<Void> createSavepoint(Client client, ConnectionState state, String n
303295
* @return the messages received in response to this exchange.
304296
*/
305297
private static Flux<ServerMessage> execute0(Client client, String sql) {
306-
return client.<ServerMessage>exchange(new TextQueryMessage(sql), (message, sink) -> {
307-
if (message instanceof ErrorMessage) {
308-
sink.next(((ErrorMessage) message).offendedBy(sql));
309-
sink.complete();
310-
} else {
311-
sink.next(message);
312-
313-
if (message instanceof CompleteMessage && ((CompleteMessage) message).isDone()) {
314-
sink.complete();
315-
}
316-
}
317-
}).doOnSubscribe(ignored -> QueryLogger.log(sql));
298+
return client.exchange(new SimpleQueryExchangeable(sql));
318299
}
319300

320301
private QueryFlow() { }
@@ -339,6 +320,16 @@ public final void accept(ServerMessage message, SynchronousSink<ServerMessage> s
339320
if (message instanceof ErrorMessage) {
340321
sink.next(((ErrorMessage) message).offendedBy(offendingSql()));
341322
sink.complete();
323+
} else if (message instanceof LocalInfileRequest) {
324+
LocalInfileRequest request = (LocalInfileRequest) message;
325+
String path = request.getPath();
326+
327+
QueryLogger.logLocalInfile(path);
328+
329+
requests.emitNext(
330+
new LocalInfileResponse(request.getEnvelopeId() + 1, path, sink),
331+
Sinks.EmitFailureHandler.FAIL_FAST
332+
);
342333
} else {
343334
sink.next(message);
344335

@@ -353,6 +344,59 @@ public final void accept(ServerMessage message, SynchronousSink<ServerMessage> s
353344
abstract protected String offendingSql();
354345
}
355346

347+
final class SimpleQueryExchangeable extends BaseFluxExchangeable {
348+
349+
private static final int INIT = 0;
350+
351+
private static final int EXECUTE = 1;
352+
353+
private static final int DISPOSE = 2;
354+
355+
private final AtomicInteger state = new AtomicInteger(INIT);
356+
357+
private final String sql;
358+
359+
SimpleQueryExchangeable(String sql) {
360+
this.sql = sql;
361+
}
362+
363+
@Override
364+
public void dispose() {
365+
if (state.getAndSet(DISPOSE) != DISPOSE) {
366+
requests.tryEmitComplete();
367+
}
368+
}
369+
370+
@Override
371+
public boolean isDisposed() {
372+
return state.get() == DISPOSE;
373+
}
374+
375+
@Override
376+
protected void tryNextOrComplete(@Nullable SynchronousSink<ServerMessage> sink) {
377+
if (state.compareAndSet(INIT, EXECUTE)) {
378+
QueryLogger.log(sql);
379+
380+
Sinks.EmitResult result = requests.tryEmitNext(new TextQueryMessage(sql));
381+
382+
if (result == Sinks.EmitResult.OK) {
383+
return;
384+
}
385+
386+
QueryFlow.logger.error("Emit request failed due to {}", result);
387+
}
388+
389+
if (sink != null) {
390+
sink.complete();
391+
}
392+
}
393+
394+
@Override
395+
protected String offendingSql() {
396+
return sql;
397+
}
398+
}
399+
356400
/**
357401
* An implementation of {@link FluxExchangeable} that considers client-preparing requests.
358402
*/
@@ -770,8 +814,8 @@ final class LoginExchangeable extends FluxExchangeable<Void> {
770814

771815
private static final int HANDSHAKE_VERSION = 10;
772816

773-
private final Sinks.Many<LoginClientMessage> requests = Sinks.many().unicast()
774-
.onBackpressureBuffer(Queues.<LoginClientMessage>one().get());
817+
private final Sinks.Many<SubsequenceClientMessage> requests = Sinks.many().unicast()
818+
.onBackpressureBuffer(Queues.<SubsequenceClientMessage>one().get());
775819

776820
private final Client client;
777821

@@ -879,7 +923,7 @@ public void dispose() {
879923
this.requests.tryEmitComplete();
880924
}
881925

882-
private void emitNext(LoginClientMessage message, SynchronousSink<Void> sink) {
926+
private void emitNext(SubsequenceClientMessage message, SynchronousSink<Void> sink) {
883927
Sinks.EmitResult result = requests.tryEmitNext(message);
884928

885929
if (result != Sinks.EmitResult.OK) {
@@ -903,8 +947,6 @@ private Capability clientCapability(Capability serverCapability) {
903947

904948
builder.disableDatabasePinned();
905949
builder.disableCompression();
906-
// TODO: support LOAD DATA LOCAL INFILE
907-
builder.disableLoadDataInfile();
908950
builder.disableIgnoreAmbiguitySpace();
909951
builder.disableInteractiveTimeout();
910952

src/main/java/io/asyncer/r2dbc/mysql/QueryLogger.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,5 +47,9 @@ static void log(int statementId, MySqlParameter[] values) {
4747
logger.debug("Executing prepared statement {} with {}", statementId, values);
4848
}
4949

50+
static void logLocalInfile(String path) {
51+
logger.debug("Loading data from: {}", path);
52+
}
53+
5054
private QueryLogger() { }
5155
}

src/main/java/io/asyncer/r2dbc/mysql/TextParametrizedStatement.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ final class TextParametrizedStatement extends ParametrizedStatementSupport {
3333

3434
@Override
3535
protected Flux<MySqlResult> execute(List<Binding> bindings) {
36-
return Flux.defer(() -> QueryFlow.execute(client, query, returningIdentifiers(), bindings))
36+
return Flux.defer(() -> QueryFlow.execute(client, query, returningIdentifiers(),
37+
bindings))
3738
.map(messages -> MySqlResult.toResult(false, codecs, context, syntheticKeyName(), messages));
3839
}
3940
}

src/main/java/io/asyncer/r2dbc/mysql/TextSimpleStatement.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ final class TextSimpleStatement extends SimpleStatementSupport {
3434
public Flux<MySqlResult> execute() {
3535
return Flux.defer(() -> QueryFlow.execute(
3636
client,
37-
StringUtils.extendReturning(sql, returningIdentifiers()))
38-
).map(messages -> MySqlResult.toResult(false, codecs, context, syntheticKeyName(), messages));
37+
StringUtils.extendReturning(sql, returningIdentifiers())
38+
).map(messages -> MySqlResult.toResult(false, codecs, context, syntheticKeyName(), messages)));
3939
}
4040
}

src/main/java/io/asyncer/r2dbc/mysql/client/MessageDuplexCodec.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.asyncer.r2dbc.mysql.ConnectionContext;
2020
import io.asyncer.r2dbc.mysql.internal.util.OperatorUtils;
2121
import io.asyncer.r2dbc.mysql.message.client.ClientMessage;
22-
import io.asyncer.r2dbc.mysql.message.client.LoginClientMessage;
22+
import io.asyncer.r2dbc.mysql.message.client.SubsequenceClientMessage;
2323
import io.asyncer.r2dbc.mysql.message.client.PrepareQueryMessage;
2424
import io.asyncer.r2dbc.mysql.message.client.PreparedFetchMessage;
2525
import io.asyncer.r2dbc.mysql.message.client.SslRequest;
@@ -86,22 +86,22 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
8686
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
8787
if (msg instanceof ClientMessage) {
8888
ByteBufAllocator allocator = ctx.alloc();
89-
9089
Flux<ByteBuf> encoded;
91-
int envelopeId;
9290

93-
if (msg instanceof LoginClientMessage) {
94-
LoginClientMessage message = (LoginClientMessage) msg;
91+
if (msg instanceof SubsequenceClientMessage) {
92+
SubsequenceClientMessage message = (SubsequenceClientMessage) msg;
9593

9694
encoded = Flux.from(message.encode(allocator, this.context));
97-
envelopeId = message.getEnvelopeId();
95+
int envelopeId = message.getEnvelopeId();
96+
97+
OperatorUtils.envelope(encoded, allocator, envelopeId, false)
98+
.subscribe(new WriteSubscriber(ctx, promise));
9899
} else {
99100
encoded = Flux.from(((ClientMessage) msg).encode(allocator, this.context));
100-
envelopeId = 0;
101-
}
102101

103-
OperatorUtils.cumulateEnvelope(encoded, allocator, envelopeId)
104-
.subscribe(new WriteSubscriber(ctx, promise));
102+
OperatorUtils.envelope(encoded, allocator, 0, true)
103+
.subscribe(new WriteSubscriber(ctx, promise));
104+
}
105105

106106
if (msg instanceof PrepareQueryMessage) {
107107
setDecodeContext(DecodeContext.prepareQuery());

src/main/java/io/asyncer/r2dbc/mysql/client/ReactorNettyClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,8 @@ public <T> Flux<T> exchange(FluxExchangeable<T> exchangeable) {
184184
.asFlux()
185185
.doOnSubscribe(ignored -> exchangeable.subscribe(
186186
this::emitNextRequest,
187-
e -> requests.emitError(e, Sinks.EmitFailureHandler.FAIL_FAST))
187+
e ->
188+
requests.emitError(e, Sinks.EmitFailureHandler.FAIL_FAST))
188189
)
189190
.handle(exchangeable)
190191
.doOnTerminate(() -> {

0 commit comments

Comments
 (0)