Skip to content

Commit 19d33f5

Browse files
committed
Add option to configure local infile path
1 parent 5c1883e commit 19d33f5

13 files changed

+200
-35
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ ConnectionFactoryOptions options = ConnectionFactoryOptions.builder()
141141
.option(Option.valueOf("sslContextBuilderCustomizer"), "com.example.demo.MyCustomizer") // optional, default is no-op customizer
142142
.option(Option.valueOf("zeroDate"), "use_null") // optional, default "use_null"
143143
.option(Option.valueOf("useServerPrepareStatement"), true) // optional, default false
144+
.option(Option.valueOf("allowLoadLocalInfileInPath"), "/opt") // optional, default null, null means LOCAL INFILE not be allowed
144145
.option(Option.valueOf("tcpKeepAlive"), true) // optional, default false
145146
.option(Option.valueOf("tcpNoDelay"), true) // optional, default false
146147
.option(Option.valueOf("autodetectExtensions"), false) // optional, default false
@@ -189,6 +190,7 @@ MySqlConnectionConfiguration configuration = MySqlConnectionConfiguration.builde
189190
.sslContextBuilderCustomizer(MyCustomizer.INSTANCE) // optional, default is no-op customizer
190191
.zeroDateOption(ZeroDateOption.USE_NULL) // optional, default ZeroDateOption.USE_NULL
191192
.useServerPrepareStatement() // Use server-preparing statements, default use client-preparing statements
193+
.allowLoadLocalInfileInPath("/opt") // optional, default null, null means LOCAL INFILE not be allowed
192194
.tcpKeepAlive(true) // optional, controls TCP Keep Alive, default is false
193195
.tcpNoDelay(true) // optional, controls TCP No Delay, default is false
194196
.autodetectExtensions(false) // optional, controls extension auto-detect, default is true
@@ -242,6 +244,7 @@ Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
242244
| zeroDateOption | Any value of `ZeroDateOption` | Optional, default `USE_NULL` | The option indicates "zero date" handling, see following notice |
243245
| autodetectExtensions | `true` or `false` | Optional, default is `true` | Controls auto-detect `Extension`s |
244246
| useServerPrepareStatement | `true`, `false` or `Predicate<String>` | Optional, default is `false` | See following notice |
247+
| allowLoadLocalInfileInPath | A path | Optional, default is `null` | The path that allows `LOAD DATA LOCAL INFILE` to load file data |
245248
| passwordPublisher | A `Publisher<String>` | Optional, default is `null` | The password publisher, see following notice |
246249

247250
- `SslMode` Considers security level and verification for SSL, make sure the database server supports SSL before you want change SSL mode to `REQUIRED` or higher. **The Unix Domain Socket only offers "DISABLED" available**
@@ -584,6 +587,7 @@ If you want to raise an issue, please follow the recommendations below:
584587
- The MySQL may be not support well for searching rows by a binary field, like `BIT` and `JSON`
585588
- `BIT`: cannot select 'BIT(64)' with value greater than 'Long.MAX_VALUE' (or equivalent in binary)
586589
- `JSON`: different MySQL may have different serialization formats, e.g. MariaDB and MySQL
590+
- MySQL 8.0+ disables `@@global.local_infile` by default, make sure `@@local_infile` is `ON` before enable `allowLoadLocalInfileInPath` of the driver. e.g. run `SET GLOBAL local_infile=ON`, or set it in `mysql.cnf`.
587591

588592
## License
589593

src/main/java/io/asyncer/r2dbc/mysql/Capability.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,7 @@ void disableCompression() {
361361
this.bitmap &= ~COMPRESS;
362362
}
363363

364-
void disableLoadDataInfile() {
364+
void disableLoadDataLocalInfile() {
365365
this.bitmap &= ~LOCAL_FILES;
366366
}
367367

src/main/java/io/asyncer/r2dbc/mysql/ConnectionContext.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.asyncer.r2dbc.mysql.constant.ZeroDateOption;
2323
import org.jetbrains.annotations.Nullable;
2424

25+
import java.nio.file.Path;
2526
import java.time.ZoneId;
2627

2728
import static io.asyncer.r2dbc.mysql.internal.util.AssertUtils.requireNonNull;
@@ -42,6 +43,11 @@ public final class ConnectionContext implements CodecContext {
4243

4344
private final ZeroDateOption zeroDateOption;
4445

46+
@Nullable
47+
private final Path localInfilePath;
48+
49+
private final int localInfileBufferSize;
50+
4551
@Nullable
4652
private ZoneId serverZoneId;
4753

@@ -53,8 +59,11 @@ public final class ConnectionContext implements CodecContext {
5359

5460
private volatile Capability capability = null;
5561

56-
ConnectionContext(ZeroDateOption zeroDateOption, @Nullable ZoneId serverZoneId) {
62+
ConnectionContext(ZeroDateOption zeroDateOption, @Nullable Path localInfilePath,
63+
int localInfileBufferSize, @Nullable ZoneId serverZoneId) {
5764
this.zeroDateOption = requireNonNull(zeroDateOption, "zeroDateOption must not be null");
65+
this.localInfilePath = localInfilePath;
66+
this.localInfileBufferSize = localInfileBufferSize;
5867
this.serverZoneId = serverZoneId;
5968
}
6069

@@ -119,8 +128,23 @@ public ZeroDateOption getZeroDateOption() {
119128
return zeroDateOption;
120129
}
121130

131+
/**
132+
* Gets the allowed local infile path.
133+
*
134+
* @return the path.
135+
*/
136+
@Nullable
137+
public Path getLocalInfilePath() {
138+
return localInfilePath;
139+
}
140+
141+
/**
142+
* Gets the local infile buffer size.
143+
*
144+
* @return the buffer size.
145+
*/
122146
public int getLocalInfileBufferSize() {
123-
return 64 * 1024;
147+
return localInfileBufferSize;
124148
}
125149

126150
/**

src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionConfiguration.java

Lines changed: 72 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525

2626
import javax.net.ssl.HostnameVerifier;
2727
import java.net.Socket;
28+
import java.nio.file.Path;
29+
import java.nio.file.Paths;
2830
import java.time.Duration;
2931
import java.time.ZoneId;
3032
import java.util.ArrayList;
@@ -89,6 +91,11 @@ public final class MySqlConnectionConfiguration {
8991
@Nullable
9092
private final Predicate<String> preferPrepareStatement;
9193

94+
@Nullable
95+
private final Path loadLocalInfilePath;
96+
97+
private final int localInfileBufferSize;
98+
9299
private final int queryCacheSize;
93100

94101
private final int prepareCacheSize;
@@ -104,6 +111,7 @@ private MySqlConnectionConfiguration(
104111
@Nullable Duration socketTimeout, ZeroDateOption zeroDateOption, @Nullable ZoneId serverZoneId,
105112
String user, @Nullable CharSequence password, @Nullable String database,
106113
boolean createDatabaseIfNotExist, @Nullable Predicate<String> preferPrepareStatement,
114+
@Nullable Path loadLocalInfilePath, int localInfileBufferSize,
107115
int queryCacheSize, int prepareCacheSize, Extensions extensions,
108116
@Nullable Publisher<String> passwordPublisher
109117
) {
@@ -122,6 +130,8 @@ private MySqlConnectionConfiguration(
122130
this.database = database == null || database.isEmpty() ? "" : database;
123131
this.createDatabaseIfNotExist = createDatabaseIfNotExist;
124132
this.preferPrepareStatement = preferPrepareStatement;
133+
this.loadLocalInfilePath = loadLocalInfilePath;
134+
this.localInfileBufferSize = localInfileBufferSize;
125135
this.queryCacheSize = queryCacheSize;
126136
this.prepareCacheSize = prepareCacheSize;
127137
this.extensions = extensions;
@@ -207,6 +217,15 @@ Predicate<String> getPreferPrepareStatement() {
207217
return preferPrepareStatement;
208218
}
209219

220+
@Nullable
221+
Path getLoadLocalInfilePath() {
222+
return loadLocalInfilePath;
223+
}
224+
225+
int getLocalInfileBufferSize() {
226+
return localInfileBufferSize;
227+
}
228+
210229
int getQueryCacheSize() {
211230
return queryCacheSize;
212231
}
@@ -248,6 +267,8 @@ public boolean equals(Object o) {
248267
database.equals(that.database) &&
249268
createDatabaseIfNotExist == that.createDatabaseIfNotExist &&
250269
Objects.equals(preferPrepareStatement, that.preferPrepareStatement) &&
270+
Objects.equals(loadLocalInfilePath, that.loadLocalInfilePath) &&
271+
localInfileBufferSize == that.localInfileBufferSize &&
251272
queryCacheSize == that.queryCacheSize &&
252273
prepareCacheSize == that.prepareCacheSize &&
253274
extensions.equals(that.extensions) &&
@@ -258,7 +279,8 @@ public boolean equals(Object o) {
258279
public int hashCode() {
259280
return Objects.hash(isHost, domain, port, ssl, tcpKeepAlive, tcpNoDelay, connectTimeout,
260281
socketTimeout, serverZoneId, zeroDateOption, user, password, database, createDatabaseIfNotExist,
261-
preferPrepareStatement, queryCacheSize, prepareCacheSize, extensions, passwordPublisher);
282+
preferPrepareStatement, loadLocalInfilePath, localInfileBufferSize, queryCacheSize,
283+
prepareCacheSize, extensions, passwordPublisher);
262284
}
263285

264286
@Override
@@ -269,16 +291,21 @@ public String toString() {
269291
connectTimeout + ", socketTimeout=" + socketTimeout + ", serverZoneId=" + serverZoneId +
270292
", zeroDateOption=" + zeroDateOption + ", user='" + user + "', password=" + password +
271293
", database='" + database + "', createDatabaseIfNotExist=" + createDatabaseIfNotExist +
272-
", preferPrepareStatement=" + preferPrepareStatement + ", queryCacheSize=" + queryCacheSize +
273-
", prepareCacheSize=" + prepareCacheSize + ", extensions=" + extensions +
274-
", passwordPublisher=" + passwordPublisher + '}';
294+
", preferPrepareStatement=" + preferPrepareStatement +
295+
", loadLocalInfilePath=" + loadLocalInfilePath +
296+
", localInfileBufferSize=" + localInfileBufferSize +
297+
", queryCacheSize=" + queryCacheSize + ", prepareCacheSize=" + prepareCacheSize +
298+
", extensions=" + extensions + ", passwordPublisher=" + passwordPublisher + '}';
275299
}
276300

277301
return "MySqlConnectionConfiguration{unixSocket='" + domain + "', connectTimeout=" +
278302
connectTimeout + ", socketTimeout=" + socketTimeout + ", serverZoneId=" + serverZoneId +
279303
", zeroDateOption=" + zeroDateOption + ", user='" + user + "', password=" + password +
280304
", database='" + database + "', createDatabaseIfNotExist=" + createDatabaseIfNotExist +
281-
", preferPrepareStatement=" + preferPrepareStatement + ", queryCacheSize=" + queryCacheSize +
305+
", preferPrepareStatement=" + preferPrepareStatement +
306+
", loadLocalInfilePath=" + loadLocalInfilePath +
307+
", localInfileBufferSize=" + localInfileBufferSize +
308+
", queryCacheSize=" + queryCacheSize +
282309
", prepareCacheSize=" + prepareCacheSize + ", extensions=" + extensions +
283310
", passwordPublisher=" + passwordPublisher + '}';
284311
}
@@ -345,6 +372,11 @@ public static final class Builder {
345372
@Nullable
346373
private Predicate<String> preferPrepareStatement;
347374

375+
@Nullable
376+
private Path loadLocalInfilePath;
377+
378+
private int localInfileBufferSize = 8192;
379+
348380
private int queryCacheSize = 0;
349381

350382
private int prepareCacheSize = 256;
@@ -379,7 +411,8 @@ public MySqlConnectionConfiguration build() {
379411
sslCa, sslKey, sslKeyPassword, sslCert, sslContextBuilderCustomizer);
380412
return new MySqlConnectionConfiguration(isHost, domain, port, ssl, tcpKeepAlive, tcpNoDelay,
381413
connectTimeout, socketTimeout, zeroDateOption, serverZoneId, user, password, database,
382-
createDatabaseIfNotExist, preferPrepareStatement, queryCacheSize, prepareCacheSize,
414+
createDatabaseIfNotExist, preferPrepareStatement, loadLocalInfilePath,
415+
localInfileBufferSize, queryCacheSize, prepareCacheSize,
383416
Extensions.from(extensions, autodetectExtensions), passwordPublisher);
384417
}
385418

@@ -754,6 +787,38 @@ public Builder useServerPrepareStatement(Predicate<String> preferPrepareStatemen
754787
return this;
755788
}
756789

790+
/**
791+
* Configures to allow the {@code LOAD DATA LOCAL INFILE} statement in the given {@code path} or
792+
* disallow the statement. Default to {@code null} which means not allow the statement.
793+
*
794+
* @param path which parent path are allowed to load file data, {@code null} means not be allowed.
795+
* @return {@link Builder this}.
796+
* @throws java.nio.file.InvalidPathException if the string cannot be converted to a {@link Path}.
797+
* @since 1.1.0
798+
*/
799+
public Builder allowLoadLocalInfileInPath(@Nullable String path) {
800+
this.loadLocalInfilePath = path == null ? null : Paths.get(path);
801+
802+
return this;
803+
}
804+
805+
/**
806+
* Configures the buffer size for {@code LOAD DATA LOCAL INFILE} statement. Default to {@code 8192}.
807+
* <p>
808+
* It is used only if {@link #allowLoadLocalInfileInPath(String)} is set.
809+
*
810+
* @param localInfileBufferSize the buffer size.
811+
* @return {@link Builder this}.
812+
* @throws IllegalArgumentException if {@code localInfileBufferSize} is not positive.
813+
* @since 1.1.0
814+
*/
815+
public Builder localInfileBufferSize(int localInfileBufferSize) {
816+
require(localInfileBufferSize > 0, "localInfileBufferSize must be positive");
817+
818+
this.localInfileBufferSize = localInfileBufferSize;
819+
return this;
820+
}
821+
757822
/**
758823
* Configures the maximum size of the {@link Query} parsing cache. Usually it should be power of two.
759824
* Default to {@code 0}. Driver will use unbounded cache if size is less than {@code 0}.
@@ -823,6 +888,7 @@ public Builder extendWith(Extension extension) {
823888

824889
/**
825890
* Registers a password publisher function.
891+
*
826892
* @param passwordPublisher function to retrieve password before making connection.
827893
* @return this {@link Builder}.
828894
*/

src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactory.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,12 @@ public static MySqlConnectionFactory from(MySqlConnectionConfiguration configura
9090
String user = configuration.getUser();
9191
CharSequence password = configuration.getPassword();
9292
SslMode sslMode = ssl.getSslMode();
93-
ConnectionContext context = new ConnectionContext(configuration.getZeroDateOption(),
94-
configuration.getServerZoneId());
93+
ConnectionContext context = new ConnectionContext(
94+
configuration.getZeroDateOption(),
95+
configuration.getLoadLocalInfilePath(),
96+
configuration.getLocalInfileBufferSize(),
97+
configuration.getServerZoneId()
98+
);
9599
Extensions extensions = configuration.getExtensions();
96100
Predicate<String> prepare = configuration.getPreferPrepareStatement();
97101
int prepareCacheSize = configuration.getPrepareCacheSize();

src/main/java/io/asyncer/r2dbc/mysql/MySqlConnectionFactoryProvider.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,9 @@ public final class MySqlConnectionFactoryProvider implements ConnectionFactoryPr
191191
public static final Option<Object> USE_SERVER_PREPARE_STATEMENT =
192192
Option.valueOf("useServerPrepareStatement");
193193

194+
public static final Option<String> ALLOW_LOAD_LOCAL_INFILE_IN_PATH =
195+
Option.valueOf("allowLoadLocalInfileInPath");
196+
194197
/**
195198
* Option to set the maximum size of the {@link Query} parsing cache. Default to {@code 256}.
196199
*
@@ -266,6 +269,8 @@ static MySqlConnectionConfiguration setup(ConnectionFactoryOptions options) {
266269
.to(builder::zeroDateOption);
267270
mapper.optional(USE_SERVER_PREPARE_STATEMENT).prepare(builder::useClientPrepareStatement,
268271
builder::useServerPrepareStatement, builder::useServerPrepareStatement);
272+
mapper.optional(ALLOW_LOAD_LOCAL_INFILE_IN_PATH).asString()
273+
.to(builder::allowLoadLocalInfileInPath);
269274
mapper.optional(QUERY_CACHE_SIZE).asInt()
270275
.to(builder::queryCacheSize);
271276
mapper.optional(PREPARE_CACHE_SIZE).asInt()

src/main/java/io/asyncer/r2dbc/mysql/QueryFlow.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -973,6 +973,10 @@ private Capability clientCapability(Capability serverCapability) {
973973
builder.disableConnectWithDatabase();
974974
}
975975

976+
if (context.getLocalInfilePath() == null) {
977+
builder.disableLoadDataLocalInfile();
978+
}
979+
976980
if (ATTRIBUTES.isEmpty()) {
977981
builder.disableConnectAttributes();
978982
}

src/main/java/io/asyncer/r2dbc/mysql/QueryLogger.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package io.asyncer.r2dbc.mysql;
1818

19-
2019
import io.asyncer.r2dbc.mysql.internal.util.StringUtils;
2120
import io.netty.util.internal.logging.InternalLogger;
2221
import io.netty.util.internal.logging.InternalLoggerFactory;

src/main/java/io/asyncer/r2dbc/mysql/message/client/LocalInfileResponse.java

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@
2020
import io.asyncer.r2dbc.mysql.internal.util.NettyBufferUtils;
2121
import io.netty.buffer.ByteBuf;
2222
import io.netty.buffer.ByteBufAllocator;
23+
import io.r2dbc.spi.R2dbcNonTransientResourceException;
24+
import io.r2dbc.spi.R2dbcPermissionDeniedException;
2325
import reactor.core.publisher.Flux;
26+
import reactor.core.publisher.Mono;
2427
import reactor.core.publisher.SynchronousSink;
2528

26-
import java.io.IOException;
29+
import java.nio.file.InvalidPathException;
2730
import java.nio.file.Path;
2831
import java.nio.file.Paths;
2932
import java.util.concurrent.atomic.AtomicReference;
@@ -52,22 +55,37 @@ public LocalInfileResponse(int envelopeId, String path, SynchronousSink<?> error
5255
@Override
5356
public Flux<ByteBuf> encode(ByteBufAllocator allocator, ConnectionContext context) {
5457
return Flux.defer(() -> {
58+
int bufferSize = context.getLocalInfileBufferSize();
5559
AtomicReference<Throwable> error = new AtomicReference<>();
56-
Path path = Paths.get(this.path);
57-
58-
return NettyBufferUtils.readFile(path, allocator, context.getLocalInfileBufferSize())
59-
.onErrorComplete(e -> {
60-
error.set(e);
61-
return true;
62-
})
63-
.concatWith(Flux.just(allocator.buffer(0, 0)))
64-
.doAfterTerminate(() -> {
65-
Throwable e = error.getAndSet(null);
66-
67-
if (e != null) {
68-
errorSink.error(e);
60+
61+
return Mono.<Path>create(sink -> {
62+
try {
63+
Path safePath = context.getLocalInfilePath();
64+
Path file = Paths.get(this.path);
65+
66+
if (safePath == null || file.startsWith(safePath)) {
67+
sink.success(file);
68+
} else {
69+
String message = String.format("The file '%s' is not under the safe path '%s'",
70+
file, safePath);
71+
sink.error(new R2dbcPermissionDeniedException(message));
6972
}
70-
});
73+
} catch (InvalidPathException e) {
74+
sink.error(new R2dbcNonTransientResourceException("Invalid path: " + this.path, e));
75+
} catch (Throwable e) {
76+
sink.error(e);
77+
}
78+
}).flatMapMany(p -> NettyBufferUtils.readFile(p, allocator, bufferSize)).onErrorComplete(e -> {
79+
// Server needs an empty buffer, so emit error to upstream instead of encoding stream.
80+
error.set(e);
81+
return true;
82+
}).concatWith(Flux.just(allocator.buffer(0, 0))).doAfterTerminate(() -> {
83+
Throwable e = error.getAndSet(null);
84+
85+
if (e != null) {
86+
errorSink.error(e);
87+
}
88+
});
7189
});
7290
}
7391

0 commit comments

Comments
 (0)