diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 413d5d43e338..8a3d52390793 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -350,6 +350,7 @@ jobs:
!:trino-bigquery,
!:trino-cassandra,
!:trino-clickhouse,
+ !:trino-databend,
!:trino-delta-lake,
!:trino-docs,
!:trino-druid,
@@ -480,6 +481,7 @@ jobs:
- { modules: plugin/trino-bigquery, profile: cloud-tests-2 }
- { modules: plugin/trino-cassandra }
- { modules: plugin/trino-clickhouse }
+ - { modules: plugin/trino-databend }
- { modules: plugin/trino-delta-lake }
- { modules: plugin/trino-delta-lake, profile: cloud-tests }
- { modules: plugin/trino-delta-lake, profile: fte-tests }
diff --git a/core/trino-server/src/main/provisio/trino.xml b/core/trino-server/src/main/provisio/trino.xml
index 97a366b39290..ce528458bc64 100644
--- a/core/trino-server/src/main/provisio/trino.xml
+++ b/core/trino-server/src/main/provisio/trino.xml
@@ -40,6 +40,12 @@
+
+
+
+
+
+
diff --git a/docs/src/main/sphinx/connector.md b/docs/src/main/sphinx/connector.md
index a954b30cf059..d9d52f682a1f 100644
--- a/docs/src/main/sphinx/connector.md
+++ b/docs/src/main/sphinx/connector.md
@@ -12,6 +12,7 @@ BigQuery
Black Hole
Cassandra
ClickHouse
+Databend
Delta Lake
Druid
DuckDB
diff --git a/docs/src/main/sphinx/connector/databend.md b/docs/src/main/sphinx/connector/databend.md
new file mode 100644
index 000000000000..33322dde6d88
--- /dev/null
+++ b/docs/src/main/sphinx/connector/databend.md
@@ -0,0 +1,279 @@
+# Databend connector
+
+```{raw} html
+
+```
+
+The Databend connector allows querying and creating tables in an external
+[Databend](https://databend.rs/) database. This can be used to join data between
+different systems like Databend and Hive, or between two different
+Databend instances.
+
+## Requirements
+
+To connect to Databend, you need:
+
+- Databend 1.2.0 or higher
+- Network access from the Trino coordinator and workers to Databend. Port 8000 is the default port.
+
+## Configuration
+
+To configure the Databend connector, create a catalog properties file
+in `etc/catalog` named, for example, `example.properties`, to
+mount the Databend connector as the `databend` catalog.
+Create the file with the following contents, replacing the
+connection properties as appropriate for your setup:
+
+```none
+connector.name=databend
+connection-url=jdbc:databend://host:8000/
+connection-user=root
+connection-password=
+```
+
+The `connection-url` defines the connection information and parameters to pass
+to the Databend JDBC driver. The supported parameters for the URL are
+available in the [Databend JDBC driver documentation](https://github.com/databendlabs/databend-jdbc).
+
+The `connection-user` and `connection-password` are typically required and
+determine the user credentials for the connection, often a service user. You can
+use {doc}`secrets ` to avoid actual values in the catalog
+properties files.
+
+### Multiple Databend databases
+
+The Databend connector can query multiple databases within a Databend instance.
+If you have multiple Databend instances, or want to connect to multiple
+catalogs in the same instance, configure another instance of the
+Databend connector as a separate catalog.
+
+### Connection security
+
+If you have TLS configured with a globally-trusted certificate installed on your
+data source, you can enable TLS between your cluster and the data
+source by appending the `ssl=true` parameter to the JDBC connection string set in the
+`connection-url` catalog configuration property.
+
+For example, with the Databend connector, enable TLS by appending the `ssl=true`
+parameter to the `connection-url` configuration property:
+
+```properties
+connection-url=jdbc:databend://host:8000/?ssl=true
+```
+
+For more information on TLS configuration options, see the
+[Databend JDBC driver documentation](https://github.com/databendlabs/databend-jdbc).
+
+```{include} jdbc-authentication.fragment
+```
+
+```{include} jdbc-kerberos.fragment
+```
+
+(databend-type-mapping)=
+## Type mapping
+
+Because Trino and Databend each support types that the other does not, this
+connector {ref}`modifies some types ` when reading or
+writing data. Data types may not map the same way in both directions between
+Trino and the data source. Refer to the following sections for type mapping in
+each direction.
+
+### Databend type to Trino type mapping
+
+The connector maps Databend types to the corresponding Trino types following
+this table:
+
+:::{list-table} Databend type to Trino type mapping
+:widths: 30, 30, 40
+:header-rows: 1
+
+* - Databend type
+ - Trino type
+ - Notes
+* - `BOOLEAN`
+ - `BOOLEAN`
+ -
+* - `TINYINT`
+ - `TINYINT`
+ -
+* - `SMALLINT`
+ - `SMALLINT`
+ -
+* - `INTEGER`
+ - `INTEGER`
+ -
+* - `BIGINT`
+ - `BIGINT`
+ -
+* - `REAL`
+ - `REAL`
+ -
+* - `DOUBLE`
+ - `DOUBLE`
+ -
+* - `DECIMAL(p, s)`
+ - `DECIMAL(p, s)`
+ - See [](decimal-type-handling)
+* - `VARCHAR`
+ - `VARCHAR`
+ -
+* - `DATE`
+ - `DATE`
+ -
+* - `TIMESTAMP`
+ - `TIMESTAMP(0)`
+ - Databend TIMESTAMP is mapped to TIMESTAMP(0) in Trino
+:::
+
+No other types are supported.
+
+### Trino type to Databend type mapping
+
+The connector maps Trino types to the corresponding Databend types following
+this table:
+
+:::{list-table} Trino type to Databend type mapping
+:widths: 30, 30, 40
+:header-rows: 1
+
+* - Trino type
+ - Databend type
+ - Notes
+* - `BOOLEAN`
+ - `BOOLEAN`
+ -
+* - `TINYINT`
+ - `TINYINT`
+ -
+* - `SMALLINT`
+ - `SMALLINT`
+ -
+* - `INTEGER`
+ - `INTEGER`
+ -
+* - `BIGINT`
+ - `BIGINT`
+ -
+* - `REAL`
+ - `REAL`
+ -
+* - `DOUBLE`
+ - `DOUBLE`
+ -
+* - `DECIMAL(p, s)`
+ - `DECIMAL(p, s)`
+ -
+* - `CHAR(n)`
+ - `VARCHAR`
+ -
+* - `VARCHAR(n)`
+ - `VARCHAR`
+ -
+* - `VARBINARY`
+ - `VARCHAR`
+ -
+* - `DATE`
+ - `DATE`
+ -
+* - `TIMESTAMP(0)`
+ - `TIMESTAMP`
+ -
+:::
+
+No other types are supported.
+
+```{include} jdbc-type-mapping.fragment
+```
+
+(databend-sql-support)=
+## SQL support
+
+The connector provides read access and write access to data and metadata in
+a Databend database. In addition to the {ref}`globally available
+` and {ref}`read operation `
+statements, the connector supports the following features:
+
+- {doc}`/sql/insert`
+- {doc}`/sql/truncate`
+- {doc}`/sql/create-table`
+- {doc}`/sql/create-table-as`
+- {doc}`/sql/drop-table`
+- {doc}`/sql/alter-table`
+- {doc}`/sql/create-schema`
+- {doc}`/sql/drop-schema`
+
+```{include} alter-table-limitation.fragment
+```
+
+```{include} alter-schema-limitation.fragment
+```
+
+### Procedures
+
+```{include} jdbc-procedures-flush.fragment
+```
+```{include} procedures-execute.fragment
+```
+
+### Table functions
+
+The connector provides specific [table functions](/functions/table) to
+access Databend.
+
+(databend-query-function)=
+#### `query(varchar) -> table`
+
+The `query` function allows you to query the underlying database directly. It
+requires syntax native to Databend, because the full query is pushed down and
+processed in Databend. This can be useful for accessing native features which
+are not available in Trino or for improving query performance in situations
+where running a query natively may be faster.
+
+```{include} query-passthrough-warning.fragment
+```
+
+As a simple example, query the `example` catalog and select an entire table:
+
+```
+SELECT
+ *
+FROM
+ TABLE(
+ example.system.query(
+ query => 'SELECT
+ *
+ FROM
+ tpch.nation'
+ )
+ );
+```
+
+```{include} query-table-function-ordering.fragment
+```
+
+## Performance
+
+The connector includes a number of performance improvements, detailed in the
+following sections.
+
+(databend-pushdown)=
+### Pushdown
+
+The connector supports pushdown for a number of operations:
+
+- {ref}`limit-pushdown`
+
+{ref}`Aggregate pushdown ` for the following functions:
+
+- {func}`avg`
+- {func}`count`
+- {func}`max`
+- {func}`min`
+- {func}`sum`
+
+```{include} pushdown-correctness-behavior.fragment
+```
+
+```{include} no-inequality-pushdown-text-type.fragment
+```
diff --git a/plugin/trino-databend/pom.xml b/plugin/trino-databend/pom.xml
new file mode 100644
index 000000000000..6af5fab9f7d0
--- /dev/null
+++ b/plugin/trino-databend/pom.xml
@@ -0,0 +1,325 @@
+
+
+ 4.0.0
+
+
+ io.trino
+ trino-root
+ 479-SNAPSHOT
+ ../../pom.xml
+
+
+ trino-databend
+ trino-plugin
+ Trino - Databend Connector
+
+
+ true
+
+
+
+
+ com.databend
+ databend-jdbc
+ 0.4.0
+
+
+ com.squareup.okio
+ okio
+
+
+ com.squareup.okio
+ okio-jvm
+
+
+ joda-time
+ joda-time
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+
+
+ com.google.guava
+ guava
+
+
+
+ com.google.inject
+ guice
+ classes
+
+
+
+ io.airlift
+ configuration
+
+
+
+ io.airlift
+ units
+
+
+
+ io.trino
+ trino-base-jdbc
+
+
+
+ io.trino
+ trino-plugin-toolkit
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+
+ io.airlift
+ slice
+ provided
+
+
+
+ io.opentelemetry
+ opentelemetry-api
+ provided
+
+
+
+ io.opentelemetry
+ opentelemetry-api-incubator
+ provided
+
+
+
+ io.opentelemetry
+ opentelemetry-context
+ provided
+
+
+
+ io.trino
+ trino-spi
+ provided
+
+
+
+ org.openjdk.jol
+ jol-core
+ provided
+
+
+
+ io.airlift
+ log
+ runtime
+
+
+
+ io.airlift
+ log-manager
+ runtime
+
+
+
+
+ io.airlift
+ junit-extensions
+ test
+
+
+
+ io.airlift
+ testing
+ test
+
+
+
+ io.trino
+ trino-base-jdbc
+ test-jar
+ test
+
+
+
+ io.trino
+ trino-main
+ test
+
+
+
+ io.trino
+ trino-main
+ test-jar
+ test
+
+
+
+ io.trino
+ trino-plugin-toolkit
+ test-jar
+ test
+
+
+
+ io.trino
+ trino-testing
+ test
+
+
+
+ io.trino
+ trino-testing-containers
+ test
+
+
+
+ io.trino
+ trino-testing-services
+ test
+
+
+
+ io.trino
+ trino-tpch
+ test
+
+
+
+ io.trino.tpch
+ tpch
+ test
+
+
+
+ org.apache.commons
+ commons-lang3
+ test
+
+
+
+ org.assertj
+ assertj-core
+ test
+
+
+
+ org.jetbrains
+ annotations
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+
+ org.junit.jupiter
+ junit-jupiter-params
+ test
+
+
+
+ org.testcontainers
+ databend
+ 1.20.2
+ test
+
+
+
+ org.testcontainers
+ testcontainers
+ test
+
+
+
+ software.amazon.awssdk
+ apache-client
+ test
+
+
+ commons-logging
+ commons-logging
+
+
+
+
+
+ software.amazon.awssdk
+ auth
+ test
+
+
+
+ software.amazon.awssdk
+ aws-core
+ test
+
+
+
+ software.amazon.awssdk
+ http-client-spi
+ test
+
+
+
+ software.amazon.awssdk
+ regions
+ test
+
+
+
+ software.amazon.awssdk
+ s3
+ test
+
+
+
+ software.amazon.awssdk
+ sdk-core
+ test
+
+
+
+
+
+
+ org.basepom.maven
+ duplicate-finder-maven-plugin
+
+
+ commonMain/default/.*
+ kotlin/.*
+ okio/.*
+ org/joda/time/tz/data/.*
+
+
+ kotlin\..*
+ okhttp3\..*
+ okio\..*
+
+
+
+ com.databend
+ databend-jdbc
+
+
+
+
+
+
+
diff --git a/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendClient.java b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendClient.java
new file mode 100644
index 000000000000..724b22c8e40d
--- /dev/null
+++ b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendClient.java
@@ -0,0 +1,720 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.databend;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.primitives.Shorts;
+import com.google.inject.Inject;
+import io.trino.plugin.base.mapping.IdentifierMapping;
+import io.trino.plugin.jdbc.BaseJdbcClient;
+import io.trino.plugin.jdbc.BaseJdbcConfig;
+import io.trino.plugin.jdbc.CaseSensitivity;
+import io.trino.plugin.jdbc.ColumnMapping;
+import io.trino.plugin.jdbc.ConnectionFactory;
+import io.trino.plugin.jdbc.JdbcColumnHandle;
+import io.trino.plugin.jdbc.JdbcSortItem;
+import io.trino.plugin.jdbc.JdbcTableHandle;
+import io.trino.plugin.jdbc.JdbcTypeHandle;
+import io.trino.plugin.jdbc.LongReadFunction;
+import io.trino.plugin.jdbc.LongWriteFunction;
+import io.trino.plugin.jdbc.PreparedQuery;
+import io.trino.plugin.jdbc.QueryBuilder;
+import io.trino.plugin.jdbc.RemoteTableName;
+import io.trino.plugin.jdbc.StandardColumnMappings;
+import io.trino.plugin.jdbc.WriteMapping;
+import io.trino.plugin.jdbc.expression.ParameterizedExpression;
+import io.trino.plugin.jdbc.logging.RemoteQueryModifier;
+import io.trino.spi.TrinoException;
+import io.trino.spi.connector.ColumnMetadata;
+import io.trino.spi.connector.ColumnPosition;
+import io.trino.spi.connector.ConnectorSession;
+import io.trino.spi.connector.ConnectorTableMetadata;
+import io.trino.spi.connector.JoinStatistics;
+import io.trino.spi.connector.JoinType;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.type.CharType;
+import io.trino.spi.type.DecimalType;
+import io.trino.spi.type.Decimals;
+import io.trino.spi.type.Type;
+import io.trino.spi.type.VarbinaryType;
+import io.trino.spi.type.VarcharType;
+import org.weakref.jmx.$internal.guava.base.Enums;
+
+import javax.annotation.Nullable;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.Date;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.time.LocalDate;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.trino.plugin.databend.DatabendTableProperties.ENGINE_PROPERTY;
+import static io.trino.plugin.jdbc.CaseSensitivity.CASE_INSENSITIVE;
+import static io.trino.plugin.jdbc.CaseSensitivity.CASE_SENSITIVE;
+import static io.trino.plugin.jdbc.DecimalConfig.DecimalMapping.ALLOW_OVERFLOW;
+import static io.trino.plugin.jdbc.DecimalSessionSessionProperties.getDecimalDefaultScale;
+import static io.trino.plugin.jdbc.DecimalSessionSessionProperties.getDecimalRounding;
+import static io.trino.plugin.jdbc.DecimalSessionSessionProperties.getDecimalRoundingMode;
+import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_ERROR;
+import static io.trino.plugin.jdbc.JdbcJoinPushdownUtil.implementJoinCostAware;
+import static io.trino.plugin.jdbc.PredicatePushdownController.DISABLE_PUSHDOWN;
+import static io.trino.plugin.jdbc.StandardColumnMappings.bigintColumnMapping;
+import static io.trino.plugin.jdbc.StandardColumnMappings.bigintWriteFunction;
+import static io.trino.plugin.jdbc.StandardColumnMappings.booleanWriteFunction;
+import static io.trino.plugin.jdbc.StandardColumnMappings.decimalColumnMapping;
+import static io.trino.plugin.jdbc.StandardColumnMappings.doubleColumnMapping;
+import static io.trino.plugin.jdbc.StandardColumnMappings.doubleWriteFunction;
+import static io.trino.plugin.jdbc.StandardColumnMappings.integerColumnMapping;
+import static io.trino.plugin.jdbc.StandardColumnMappings.integerWriteFunction;
+import static io.trino.plugin.jdbc.StandardColumnMappings.longDecimalWriteFunction;
+import static io.trino.plugin.jdbc.StandardColumnMappings.realWriteFunction;
+import static io.trino.plugin.jdbc.StandardColumnMappings.shortDecimalWriteFunction;
+import static io.trino.plugin.jdbc.StandardColumnMappings.smallintColumnMapping;
+import static io.trino.plugin.jdbc.StandardColumnMappings.smallintWriteFunction;
+import static io.trino.plugin.jdbc.StandardColumnMappings.tinyintColumnMapping;
+import static io.trino.plugin.jdbc.StandardColumnMappings.tinyintWriteFunction;
+import static io.trino.plugin.jdbc.StandardColumnMappings.varbinaryWriteFunction;
+import static io.trino.plugin.jdbc.StandardColumnMappings.varcharWriteFunction;
+import static io.trino.plugin.jdbc.TypeHandlingJdbcSessionProperties.getUnsupportedTypeHandling;
+import static io.trino.plugin.jdbc.UnsupportedTypeHandling.CONVERT_TO_VARCHAR;
+import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
+import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY;
+import static io.trino.spi.type.BigintType.BIGINT;
+import static io.trino.spi.type.BooleanType.BOOLEAN;
+import static io.trino.spi.type.DateType.DATE;
+import static io.trino.spi.type.DecimalType.createDecimalType;
+import static io.trino.spi.type.DoubleType.DOUBLE;
+import static io.trino.spi.type.IntegerType.INTEGER;
+import static io.trino.spi.type.RealType.REAL;
+import static io.trino.spi.type.SmallintType.SMALLINT;
+import static io.trino.spi.type.TinyintType.TINYINT;
+import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
+import static io.trino.spi.type.VarcharType.createVarcharType;
+import static java.lang.Float.floatToRawIntBits;
+import static java.lang.Math.max;
+import static java.lang.Math.toIntExact;
+import static java.lang.String.format;
+import static java.lang.String.join;
+import static java.util.Locale.ENGLISH;
+import static java.util.Objects.requireNonNull;
+import static java.util.stream.Collectors.joining;
+import static org.weakref.jmx.$internal.guava.base.Preconditions.checkArgument;
+import static org.weakref.jmx.$internal.guava.base.Strings.emptyToNull;
+
+public final class DatabendClient
+ extends BaseJdbcClient
+{
+ @Inject
+ public DatabendClient(
+ BaseJdbcConfig config,
+ DatabendConfig databendConfig,
+ ConnectionFactory connectionFactory,
+ QueryBuilder queryBuilder,
+ IdentifierMapping identifierMapping,
+ RemoteQueryModifier queryModifier)
+ {
+ super("\"", connectionFactory, queryBuilder, config.getJdbcTypesMappedToVarchar(), identifierMapping, queryModifier, false);
+ requireNonNull(databendConfig, "databendConfig is null");
+ }
+
+ @Override
+ public boolean supportsTopN(ConnectorSession session, JdbcTableHandle handle, List sortOrder)
+ {
+ return true;
+ }
+
+ @Override
+ protected Optional topNFunction()
+ {
+ return Optional.of((query, sortItems, limit) -> {
+ String orderBy = sortItems.stream().map(sortItem -> {
+ String ordering = sortItem.sortOrder().isAscending() ? "ASC" : "DESC";
+ String nullsHandling = sortItem.sortOrder().isNullsFirst() ? "NULLS FIRST" : "NULLS LAST";
+ return format("%s %s %s", quoted(sortItem.column().getColumnName()), ordering, nullsHandling);
+ }).collect(joining(", "));
+ return format("%s ORDER BY %s LIMIT %d", query, orderBy, limit);
+ });
+ }
+
+ @Override
+ public boolean isTopNGuaranteed(ConnectorSession session)
+ {
+ return true;
+ }
+
+ @Override
+ public Optional implementJoin(
+ ConnectorSession session,
+ JoinType joinType,
+ PreparedQuery leftSource,
+ Map leftProjections,
+ PreparedQuery rightSource,
+ Map rightProjections,
+ List joinConditions,
+ JoinStatistics statistics)
+ {
+ if (joinType == JoinType.FULL_OUTER) {
+ return Optional.empty();
+ }
+ return implementJoinCostAware(
+ session,
+ joinType,
+ leftSource,
+ rightSource,
+ statistics,
+ () -> super.implementJoin(session, joinType, leftSource, leftProjections, rightSource, rightProjections, joinConditions, statistics));
+ }
+
+ @Override
+ public ResultSet getTables(Connection connection, Optional schemaName, Optional tableName)
+ throws SQLException
+ {
+ DatabaseMetaData metadata = connection.getMetaData();
+ if (tableName.isPresent()) {
+ // Databend maps their "database" to SQL catalogs and does not have schemas
+ return metadata.getTables(schemaName.orElse(null), null, tableName.get(), getTableTypes().map(types -> types.toArray(String[]::new)).orElse(null));
+ }
+ return metadata.getTables(schemaName.orElse(null), null, tableName.orElse(null), getTableTypes().map(types -> types.toArray(String[]::new)).orElse(null));
+ }
+
+ @Override
+ protected ResultSet getColumns(RemoteTableName remoteTableName, DatabaseMetaData metadata)
+ throws SQLException
+ {
+ // Databend exposes databases via JDBC catalogs and does not use schemas
+ Optional database = remoteTableName.getSchemaName();
+ if (database.isEmpty()) {
+ database = remoteTableName.getCatalogName();
+ }
+ return metadata.getColumns(
+ database.orElse(null),
+ null,
+ escapeObjectNameForMetadataQuery(remoteTableName.getTableName(), metadata.getSearchStringEscape()),
+ null);
+ }
+
+ @Override
+ public List getColumns(ConnectorSession session, SchemaTableName schemaTableName, RemoteTableName remoteTableName)
+ {
+ Map remoteNullability = getRemoteNullability(session, remoteTableName);
+ return super.getColumns(session, schemaTableName, remoteTableName).stream()
+ .map(column -> applyRemoteNullability(column, remoteNullability))
+ .collect(toImmutableList());
+ }
+
+ private Map getRemoteNullability(ConnectorSession session, RemoteTableName remoteTableName)
+ {
+ Optional database = remoteTableName.getSchemaName();
+ if (database.isEmpty()) {
+ database = remoteTableName.getCatalogName();
+ }
+ if (database.isEmpty()) {
+ return Map.of();
+ }
+ try (Connection connection = connectionFactory.openConnection(session);
+ PreparedStatement statement = connection.prepareStatement("""
+ SELECT name, is_nullable
+ FROM system.columns
+ WHERE database = ?
+ AND table = ?
+ """)) {
+ statement.setString(1, database.get());
+ statement.setString(2, remoteTableName.getTableName());
+ try (ResultSet resultSet = statement.executeQuery()) {
+ ImmutableMap.Builder builder = ImmutableMap.builder();
+ while (resultSet.next()) {
+ builder.put(
+ resultSet.getString("name").toLowerCase(ENGLISH),
+ "YES".equalsIgnoreCase(resultSet.getString("is_nullable")));
+ }
+ return builder.buildOrThrow();
+ }
+ }
+ catch (SQLException e) {
+ throw new TrinoException(JDBC_ERROR, "Failed to fetch column nullability", e);
+ }
+ }
+
+ private JdbcColumnHandle applyRemoteNullability(JdbcColumnHandle column, Map remoteNullability)
+ {
+ Boolean remoteNullable = remoteNullability.get(column.getColumnName().toLowerCase(ENGLISH));
+ if (remoteNullable == null || remoteNullable == column.isNullable()) {
+ return column;
+ }
+ return JdbcColumnHandle.builderFrom(column)
+ .setNullable(remoteNullable)
+ .build();
+ }
+
+ @Override
+ protected String quoted(@Nullable String catalog, @Nullable String schema, String table)
+ {
+ StringBuilder sb = new StringBuilder();
+ if (!isNullOrEmpty(schema)) {
+ sb.append(quoted(schema)).append(".");
+ }
+ else if (!isNullOrEmpty(catalog)) {
+ sb.append(quoted(catalog)).append(".");
+ }
+ sb.append(quoted(table));
+ return sb.toString();
+ }
+
+ @Override
+ protected void copyTableSchema(ConnectorSession session, Connection connection, String catalogName, String schemaName, String tableName, String newTableName, List columnNames)
+ {
+ String sql = format(
+ "CREATE TABLE %s AS SELECT %s FROM %s WHERE 0 = 1",
+ quoted(catalogName, schemaName, newTableName),
+ columnNames.stream()
+ .map(this::quoted)
+ .collect(joining(", ")),
+ quoted(catalogName, schemaName, tableName));
+ try {
+ execute(session, connection, sql);
+ }
+ catch (SQLException e) {
+ throw new TrinoException(JDBC_ERROR, e);
+ }
+ }
+
+ @Override
+ public Collection listSchemas(Connection connection)
+ {
+ try (ResultSet resultSet = connection.getMetaData().getCatalogs()) {
+ ImmutableSet.Builder schemaNames = ImmutableSet.builder();
+ while (resultSet.next()) {
+ String schemaName = resultSet.getString("TABLE_CAT");
+ if (filterSchema(schemaName)) {
+ schemaNames.add(schemaName);
+ }
+ }
+ return schemaNames.build();
+ }
+ catch (SQLException e) {
+ throw new TrinoException(JDBC_ERROR, e);
+ }
+ }
+
+ @Override
+ public Optional getTableComment(ResultSet resultSet)
+ throws SQLException
+ {
+ // Empty remarks means that the table doesn't have a comment in Databend
+ return Optional.ofNullable(emptyToNull(resultSet.getString("REMARKS")));
+ }
+
+ @Override
+ protected List createTableSqls(RemoteTableName remoteTableName, List columns, ConnectorTableMetadata tableMetadata)
+ {
+ ImmutableList.Builder tableOptions = ImmutableList.builder();
+ Map tableProperties = tableMetadata.getProperties();
+ if (tableMetadata.getComment().isPresent()) {
+ throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with table comment");
+ }
+ DatabendEngineType engine = DatabendTableProperties.getEngine(tableProperties);
+ tableOptions.add("ENGINE = " + engine.getEngineType());
+
+ formatProperty(DatabendTableProperties.getOrderBy(tableProperties)).ifPresent(value -> tableOptions.add("ORDER BY " + value));
+
+ return ImmutableList.of(format("CREATE TABLE %s (%s) %s", quoted(remoteTableName), join(", ", columns), join(" ", tableOptions.build())));
+ }
+
+ /**
+ * format property to match Databend create table statement
+ *
+ * @param prop property will be formatted
+ * @return formatted property
+ */
+ private Optional formatProperty(List prop)
+ {
+ if (prop == null || prop.isEmpty()) {
+ return Optional.empty();
+ }
+ if (prop.size() == 1) {
+ // only one column
+ return Optional.of(quoted(prop.getFirst()));
+ }
+ // include more than one column
+ return Optional.of(prop.stream().map(this::quoted).collect(joining(",", "(", ")")));
+ }
+
+ @Override
+ public Map getTableProperties(ConnectorSession session, JdbcTableHandle tableHandle)
+ {
+ try (Connection connection = connectionFactory.openConnection(session)) {
+ PreparedStatement statement = connection.prepareStatement("SELECT engine " + "FROM system.tables " + "WHERE database = ? AND name = ?");
+ statement.setString(1, tableHandle.asPlainTable().getRemoteTableName().getCatalogName().orElse(null));
+ statement.setString(2, tableHandle.asPlainTable().getRemoteTableName().getTableName());
+
+ try (ResultSet resultSet = statement.executeQuery()) {
+ ImmutableMap.Builder properties = ImmutableMap.builder();
+ while (resultSet.next()) {
+ String engine = resultSet.getString("engine");
+ if (!isNullOrEmpty(engine)) {
+ Optional engineType = Enums.getIfPresent(DatabendEngineType.class, engine.toUpperCase(ENGLISH)).toJavaUtil();
+ engineType.ifPresent(type -> properties.put(ENGINE_PROPERTY, type));
+ }
+ }
+ return properties.buildOrThrow();
+ }
+ }
+ catch (SQLException e) {
+ throw new TrinoException(JDBC_ERROR, e);
+ }
+ }
+
+ @Override
+ public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery)
+ {
+ try {
+ return super.getTableHandle(session, preparedQuery);
+ }
+ catch (TrinoException e) {
+ if (e.getErrorCode().equals(NOT_SUPPORTED.toErrorCode()) &&
+ e.getMessage().startsWith("Query not supported: ResultSetMetaData not available for query")) {
+ throw new TrinoException(JDBC_ERROR, "Failed to get table handle for prepared query. " + e.getMessage(), e);
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void setTableProperties(ConnectorSession session, JdbcTableHandle handle, Map> nullableProperties)
+ {
+ checkArgument(nullableProperties.values().stream().noneMatch(Optional::isEmpty), "Setting a property to null is not supported");
+
+ if (nullableProperties.isEmpty()) {
+ return;
+ }
+
+ throw new TrinoException(NOT_SUPPORTED, "Setting table properties is not supported for Databend tables");
+ }
+
+ @Override
+ protected String getColumnDefinitionSql(ConnectorSession session, ColumnMetadata column, String columnName)
+ {
+ if (column.getComment() != null) {
+ throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with column comment");
+ }
+ StringBuilder sb = new StringBuilder()
+ .append(quoted(columnName))
+ .append(" ");
+ if (column.isNullable()) {
+ // set column nullable property explicitly
+ sb.append("Nullable(").append(toWriteMapping(session, column.getType()).getDataType()).append(")");
+ }
+ else {
+ // By default, the clickhouse column is not allowed to be null
+ sb.append(toWriteMapping(session, column.getType()).getDataType());
+ }
+ return sb.toString();
+ }
+
+ @Override
+ protected void createSchema(ConnectorSession session, Connection connection, String remoteSchemaName)
+ throws SQLException
+ {
+ execute(session, connection, "CREATE DATABASE " + quoted(remoteSchemaName));
+ }
+
+ @Override
+ protected void dropSchema(ConnectorSession session, Connection connection, String remoteSchemaName, boolean cascade)
+ throws SQLException
+ {
+ if (!cascade) {
+ try (ResultSet tables = getTables(connection, Optional.of(remoteSchemaName), Optional.empty())) {
+ if (tables.next()) {
+ throw new TrinoException(SCHEMA_NOT_EMPTY, String.format("Cannot drop non-empty schema '%s'", remoteSchemaName));
+ }
+ }
+ }
+ execute(session, connection, "DROP DATABASE " + quoted(remoteSchemaName));
+ }
+
+ @Override
+ protected void renameSchema(ConnectorSession session, Connection connection, String remoteSchemaName, String newRemoteSchemaName)
+ throws SQLException
+ {
+ throw new TrinoException(NOT_SUPPORTED, "This connector does not support renaming schemas");
+ }
+
+ @Override
+ public void addColumn(ConnectorSession session, JdbcTableHandle handle, ColumnMetadata column, ColumnPosition position)
+ {
+ if (column.getComment() != null) {
+ throw new TrinoException(NOT_SUPPORTED, "This connector does not support adding columns with comments");
+ }
+ if (!(position instanceof ColumnPosition.Last)) {
+ if (position instanceof ColumnPosition.First) {
+ throw new TrinoException(NOT_SUPPORTED, "This connector does not support adding columns with FIRST clause");
+ }
+ throw new TrinoException(NOT_SUPPORTED, "This connector does not support adding columns with AFTER clause");
+ }
+ try (Connection connection = connectionFactory.openConnection(session)) {
+ String remoteColumnName = getIdentifierMapping().toRemoteColumnName(getRemoteIdentifiers(connection), column.getName());
+ String sql = format("ALTER TABLE %s ADD COLUMN %s", quoted(handle.asPlainTable().getRemoteTableName()), getColumnDefinitionSql(session, column, remoteColumnName));
+ execute(session, connection, sql);
+ }
+ catch (SQLException e) {
+ throw new TrinoException(JDBC_ERROR, e);
+ }
+ }
+
+ @Override
+ public void setColumnType(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column, Type type)
+ {
+ throw new TrinoException(NOT_SUPPORTED, "This connector does not support setting column types");
+ }
+
+ @Override
+ public void dropNotNullConstraint(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column)
+ {
+ throw new TrinoException(NOT_SUPPORTED, "This connector does not support dropping a not null constraint");
+ }
+
+ @Override
+ protected Optional> getTableTypes()
+ {
+ return Optional.empty();
+ }
+
+ @Override
+ protected void renameTable(ConnectorSession session, Connection connection, String catalogName, String remoteSchemaName, String remoteTableName, String newRemoteSchemaName, String newRemoteTableName)
+ throws SQLException
+ {
+ if (!Objects.equals(remoteSchemaName, newRemoteSchemaName)) {
+ throw new TrinoException(NOT_SUPPORTED, "This connector does not support renaming tables across schemas");
+ }
+ execute(session, connection, format("RENAME TABLE %s TO %s", quoted(catalogName, remoteSchemaName, remoteTableName), quoted(catalogName, newRemoteSchemaName, newRemoteTableName)));
+ }
+
+ @Override
+ protected Optional> limitFunction()
+ {
+ return Optional.of((sql, limit) -> sql + " LIMIT " + limit);
+ }
+
+ @Override
+ public boolean isLimitGuaranteed(ConnectorSession session)
+ {
+ return true;
+ }
+
+ @Override
+ public Optional toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle)
+ {
+ String jdbcTypeName = typeHandle.jdbcTypeName().orElseThrow(() -> new TrinoException(JDBC_ERROR, "Type name is missing: " + typeHandle));
+
+ Optional mapping = getForcedMappingToVarchar(typeHandle);
+ if (mapping.isPresent()) {
+ return mapping;
+ }
+
+ switch (jdbcTypeName.toLowerCase(ENGLISH)) {
+ case "uint8":
+ return Optional.of(ColumnMapping.longMapping(SMALLINT, ResultSet::getShort, uInt8WriteFunction()));
+ case "uint16":
+ return Optional.of(ColumnMapping.longMapping(INTEGER, ResultSet::getInt, uInt16WriteFunction()));
+ case "uint32":
+ return Optional.of(ColumnMapping.longMapping(BIGINT, ResultSet::getLong, uInt32WriteFunction()));
+ case "uint64":
+ return Optional.of(decimalColumnMapping(createDecimalType(20)));
+ case "string":
+ return Optional.of(varcharColumnMapping(typeHandle.requiredColumnSize(), typeHandle.caseSensitivity()));
+ default:
+ }
+
+ switch (typeHandle.jdbcType()) {
+ case Types.TINYINT:
+ return Optional.of(tinyintColumnMapping());
+
+ case Types.BOOLEAN:
+ case Types.BIT:
+ return Optional.of(StandardColumnMappings.booleanColumnMapping());
+
+ case Types.SMALLINT:
+ return Optional.of(smallintColumnMapping());
+
+ case Types.INTEGER:
+ return Optional.of(integerColumnMapping());
+
+ case Types.BIGINT:
+ return Optional.of(bigintColumnMapping());
+
+ case Types.FLOAT:
+ return Optional.of(ColumnMapping.longMapping(REAL, (resultSet, columnIndex) -> floatToRawIntBits(resultSet.getFloat(columnIndex)), realWriteFunction(), DISABLE_PUSHDOWN));
+
+ case Types.DOUBLE:
+ return Optional.of(doubleColumnMapping());
+
+ case Types.VARCHAR:
+ if (jdbcTypeName.equals("varchar")) {
+ return Optional.of(varcharColumnMapping(typeHandle.requiredColumnSize(), typeHandle.caseSensitivity()));
+ }
+ // Some other Databend types (ARRAY, VARIANT, etc.) are also mapped to Types.VARCHAR, but they're unsupported.
+ break;
+
+ case Types.DECIMAL:
+ int decimalDigits = typeHandle.requiredDecimalDigits();
+ int precision = typeHandle.requiredColumnSize();
+
+ ColumnMapping decimalColumnMapping;
+ if (getDecimalRounding(session) == ALLOW_OVERFLOW && precision > Decimals.MAX_PRECISION) {
+ int scale = Math.min(decimalDigits, getDecimalDefaultScale(session));
+ decimalColumnMapping = decimalColumnMapping(createDecimalType(Decimals.MAX_PRECISION, scale), getDecimalRoundingMode(session));
+ }
+ else {
+ decimalColumnMapping = decimalColumnMapping(createDecimalType(precision, max(decimalDigits, 0)));
+ }
+ return Optional.of(ColumnMapping.mapping(decimalColumnMapping.getType(), decimalColumnMapping.getReadFunction(), decimalColumnMapping.getWriteFunction(),
+ DISABLE_PUSHDOWN)); // To avoid potential data loss or precision issues during pushdown operations
+
+ case Types.DATE:
+ return Optional.of(dateColumnMappingUsingLocalDate());
+ }
+
+ if (getUnsupportedTypeHandling(session) == CONVERT_TO_VARCHAR) {
+ return mapToUnboundedVarchar(typeHandle);
+ }
+
+ return Optional.empty();
+ }
+
+ private static ColumnMapping varcharColumnMapping(int varcharLength, Optional caseSensitivity)
+ {
+ VarcharType varcharType = varcharLength <= VarcharType.MAX_LENGTH ? createVarcharType(varcharLength) : createUnboundedVarcharType();
+ return StandardColumnMappings.varcharColumnMapping(varcharType, caseSensitivity.orElse(CASE_INSENSITIVE) == CASE_SENSITIVE);
+ }
+
+ @Override
+ public WriteMapping toWriteMapping(ConnectorSession session, Type type)
+ {
+ if (type == BOOLEAN) {
+ return WriteMapping.booleanMapping("Boolean", booleanWriteFunction());
+ }
+ if (type == TINYINT) {
+ return WriteMapping.longMapping("Int8", tinyintWriteFunction());
+ }
+ if (type == SMALLINT) {
+ return WriteMapping.longMapping("Int16", smallintWriteFunction());
+ }
+ if (type == INTEGER) {
+ return WriteMapping.longMapping("Int32", integerWriteFunction());
+ }
+ if (type == BIGINT) {
+ return WriteMapping.longMapping("Int64", bigintWriteFunction());
+ }
+ if (type == REAL) {
+ return WriteMapping.longMapping("Float32", realWriteFunction());
+ }
+ if (type == DOUBLE) {
+ return WriteMapping.doubleMapping("Float64", doubleWriteFunction());
+ }
+ if (type instanceof DecimalType decimalType) {
+ String dataType = format("Decimal(%s, %s)", decimalType.getPrecision(), decimalType.getScale());
+ if (decimalType.isShort()) {
+ return WriteMapping.longMapping(dataType, shortDecimalWriteFunction(decimalType));
+ }
+ return WriteMapping.objectMapping(dataType, longDecimalWriteFunction(decimalType));
+ }
+ if (type instanceof CharType || type instanceof VarcharType) {
+ // The String type replaces the types VARCHAR, BLOB, CLOB, and others from other DBMSs.
+ return WriteMapping.sliceMapping("String", varcharWriteFunction());
+ }
+ if (type instanceof VarbinaryType) {
+ // Strings of an arbitrary length. The length is not limited
+ return WriteMapping.sliceMapping("String", varbinaryWriteFunction());
+ }
+ if (type == DATE) {
+ return WriteMapping.longMapping("Date", dateWriteFunctionUsingLocalDate());
+ }
+ throw new TrinoException(NOT_SUPPORTED, "Unsupported column type: " + type);
+ }
+
+ private static LongWriteFunction uInt8WriteFunction()
+ {
+ return (statement, index, value) -> {
+ statement.setShort(index, Shorts.checkedCast(value));
+ };
+ }
+
+ private static LongWriteFunction uInt16WriteFunction()
+ {
+ return (statement, index, value) -> {
+ statement.setInt(index, toIntExact(value));
+ };
+ }
+
+ private static LongWriteFunction uInt32WriteFunction()
+ {
+ return PreparedStatement::setLong;
+ }
+
+ private static ColumnMapping dateColumnMappingUsingLocalDate()
+ {
+ return ColumnMapping.longMapping(DATE, databendDateReadFunction(), dateWriteFunctionUsingLocalDate());
+ }
+
+ private static LongReadFunction databendDateReadFunction()
+ {
+ return new LongReadFunction() {
+ @Override
+ public boolean isNull(ResultSet resultSet, int columnIndex)
+ throws SQLException
+ {
+ resultSet.getObject(columnIndex);
+ return resultSet.wasNull();
+ }
+
+ @Override
+ public long readLong(ResultSet resultSet, int columnIndex)
+ throws SQLException
+ {
+ Object value = resultSet.getObject(columnIndex);
+ if (value == null) {
+ throw new TrinoException(JDBC_ERROR, "Driver returned null LocalDate for a non-null value");
+ }
+ if (value instanceof LocalDate localDate) {
+ return localDate.toEpochDay();
+ }
+ if (value instanceof Date date) {
+ return date.toLocalDate().toEpochDay();
+ }
+ return LocalDate.parse(value.toString()).toEpochDay();
+ }
+ };
+ }
+
+ private static LongWriteFunction dateWriteFunctionUsingLocalDate()
+ {
+ return (statement, index, value) -> {
+ LocalDate date = LocalDate.ofEpochDay(value);
+ statement.setObject(index, date);
+ };
+ }
+}
diff --git a/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendClientModule.java b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendClientModule.java
new file mode 100644
index 000000000000..bde1c06e0d81
--- /dev/null
+++ b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendClientModule.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.databend;
+
+import com.databend.jdbc.DatabendDriver;
+import com.google.inject.Binder;
+import com.google.inject.Provides;
+import com.google.inject.Scopes;
+import com.google.inject.Singleton;
+import io.airlift.configuration.AbstractConfigurationAwareModule;
+import io.opentelemetry.api.OpenTelemetry;
+import io.trino.plugin.jdbc.BaseJdbcConfig;
+import io.trino.plugin.jdbc.ConnectionFactory;
+import io.trino.plugin.jdbc.DecimalConfig;
+import io.trino.plugin.jdbc.DriverConnectionFactory;
+import io.trino.plugin.jdbc.ForBaseJdbc;
+import io.trino.plugin.jdbc.JdbcClient;
+import io.trino.plugin.jdbc.JdbcStatisticsConfig;
+import io.trino.plugin.jdbc.credential.CredentialProvider;
+
+import java.util.Properties;
+
+import static io.airlift.configuration.ConfigBinder.configBinder;
+import static io.trino.plugin.jdbc.JdbcModule.bindSessionPropertiesProvider;
+import static io.trino.plugin.jdbc.JdbcModule.bindTablePropertiesProvider;
+
+public final class DatabendClientModule
+ extends AbstractConfigurationAwareModule
+{
+ @Override
+ protected void setup(Binder binder)
+ {
+ binder.bind(JdbcClient.class).annotatedWith(ForBaseJdbc.class).to(DatabendClient.class).in(Scopes.SINGLETON);
+ configBinder(binder).bindConfig(JdbcStatisticsConfig.class);
+ configBinder(binder).bindConfig(DatabendConfig.class);
+ configBinder(binder).bindConfig(DecimalConfig.class);
+ bindSessionPropertiesProvider(binder, DatabendSessionProperties.class);
+ bindTablePropertiesProvider(binder, DatabendTableProperties.class);
+ }
+
+ @Provides
+ @Singleton
+ @ForBaseJdbc
+ public static ConnectionFactory connectionFactory(BaseJdbcConfig config, CredentialProvider credentialProvider, OpenTelemetry openTelemetry)
+ {
+ Properties connectionProperties = new Properties();
+ return DriverConnectionFactory.builder(
+ new DatabendDriver(),
+ config.getConnectionUrl(),
+ credentialProvider)
+ .setConnectionProperties(connectionProperties)
+ .setOpenTelemetry(openTelemetry)
+ .build();
+ }
+}
diff --git a/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendConfig.java b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendConfig.java
new file mode 100644
index 000000000000..86f28d514e18
--- /dev/null
+++ b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendConfig.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.databend;
+
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+import io.airlift.units.Duration;
+
+import java.util.concurrent.TimeUnit;
+
+public class DatabendConfig
+{
+ private Duration connectionTimeout = new Duration(60, TimeUnit.SECONDS);
+
+ public Duration getConnectionTimeout()
+ {
+ return connectionTimeout;
+ }
+
+ @ConfigDescription("Connection timeout")
+ @Config("databend.connection-timeout")
+ public DatabendConfig setConnectionTimeout(Duration connectionTimeout)
+ {
+ this.connectionTimeout = connectionTimeout;
+ return this;
+ }
+}
diff --git a/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendEngineType.java b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendEngineType.java
new file mode 100644
index 000000000000..fcc2a53fd413
--- /dev/null
+++ b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendEngineType.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.databend;
+
+public enum DatabendEngineType
+{
+ FUSE("FUSE");
+
+ private final String engineType;
+
+ DatabendEngineType(String engineType)
+ {
+ this.engineType = engineType;
+ }
+
+ public String getEngineType()
+ {
+ return this.engineType;
+ }
+}
diff --git a/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendPlugin.java b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendPlugin.java
new file mode 100644
index 000000000000..bb081159f872
--- /dev/null
+++ b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendPlugin.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.databend;
+
+import io.trino.plugin.jdbc.JdbcPlugin;
+
+public final class DatabendPlugin
+ extends JdbcPlugin
+{
+ public DatabendPlugin()
+ {
+ super("databend", DatabendClientModule::new);
+ }
+}
diff --git a/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendSessionProperties.java b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendSessionProperties.java
new file mode 100644
index 000000000000..1c690c7d878a
--- /dev/null
+++ b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendSessionProperties.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.databend;
+
+import com.google.inject.Inject;
+import io.trino.plugin.base.session.SessionPropertiesProvider;
+import io.trino.plugin.jdbc.DecimalConfig;
+import io.trino.plugin.jdbc.DecimalSessionSessionProperties;
+import io.trino.spi.session.PropertyMetadata;
+
+import java.util.List;
+
+public final class DatabendSessionProperties
+ implements SessionPropertiesProvider
+{
+ private final List> sessionProperties;
+
+ @Inject
+ public DatabendSessionProperties(DecimalConfig decimalConfig)
+ {
+ sessionProperties = new DecimalSessionSessionProperties(decimalConfig).getSessionProperties();
+ }
+
+ @Override
+ public List> getSessionProperties()
+ {
+ return sessionProperties;
+ }
+}
diff --git a/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendTableProperties.java b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendTableProperties.java
new file mode 100644
index 000000000000..5a15c8c605f6
--- /dev/null
+++ b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendTableProperties.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.databend;
+
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Inject;
+import io.trino.plugin.jdbc.TablePropertiesProvider;
+import io.trino.spi.session.PropertyMetadata;
+import io.trino.spi.type.ArrayType;
+
+import java.util.List;
+import java.util.Map;
+
+import static io.trino.spi.session.PropertyMetadata.enumProperty;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+import static java.util.Objects.requireNonNull;
+
+public final class DatabendTableProperties
+ implements TablePropertiesProvider
+{
+ public static final String ENGINE_PROPERTY = "engine";
+ public static final String ORDER_BY_PROPERTY = "order_by"; //required
+
+ private final List> tableProperties;
+
+ @Inject
+ public DatabendTableProperties()
+ {
+ tableProperties = ImmutableList.>builder()
+ .add(enumProperty(
+ ENGINE_PROPERTY,
+ "Databend Table Engine, defaults to Log",
+ DatabendEngineType.class,
+ DatabendEngineType.FUSE, // FUSE is default engine of Databend which optimized for both read and write operations with improved indexing and compression.
+ false))
+ .add(new PropertyMetadata<>(
+ ORDER_BY_PROPERTY,
+ "columns to be the sorting key, it's required for table MergeTree engine family",
+ new ArrayType(VARCHAR),
+ List.class,
+ ImmutableList.of(),
+ false,
+ value -> (List>) value,
+ value -> value))
+ .build();
+ }
+
+ public static DatabendEngineType getEngine(Map tableProperties)
+ {
+ requireNonNull(tableProperties, "tableProperties is null");
+ DatabendEngineType engine = (DatabendEngineType) tableProperties.get(ENGINE_PROPERTY);
+ return engine != null ? engine : DatabendEngineType.FUSE;
+ }
+
+ public static List getOrderBy(Map tableProperties)
+ {
+ requireNonNull(tableProperties, "tableProperties is null");
+ @SuppressWarnings("unchecked")
+ List orderBy = (List) tableProperties.get("order_by");
+ if (orderBy == null) {
+ return ImmutableList.of();
+ }
+ return orderBy;
+ }
+
+ @Override
+ public List> getTableProperties()
+ {
+ return tableProperties;
+ }
+}
diff --git a/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendUtil.java b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendUtil.java
new file mode 100644
index 000000000000..2f1b80c58156
--- /dev/null
+++ b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendUtil.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.databend;
+
+public final class DatabendUtil
+{
+ private DatabendUtil() {}
+
+ public static String convertToQuotedString(Object value)
+ {
+ return value == null ? "NULL" : '\'' + escape(value.toString(), '\'') + '\'';
+ }
+
+ private static String escape(String str, char quote)
+ {
+ if (str == null) {
+ return str;
+ }
+
+ int len = str.length();
+ StringBuilder sb = new StringBuilder(len + 10);
+
+ for (int i = 0; i < len; ++i) {
+ char ch = str.charAt(i);
+ if (ch == quote || ch == '\\') {
+ sb.append('\\');
+ }
+ sb.append(ch);
+ }
+
+ return sb.toString();
+ }
+}
diff --git a/plugin/trino-databend/src/test/java/io/trino/plugin/databend/DatabendQueryRunner.java b/plugin/trino-databend/src/test/java/io/trino/plugin/databend/DatabendQueryRunner.java
new file mode 100644
index 000000000000..bb4163097ea1
--- /dev/null
+++ b/plugin/trino-databend/src/test/java/io/trino/plugin/databend/DatabendQueryRunner.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.databend;
+
+import com.google.common.collect.ImmutableMap;
+import io.trino.plugin.tpch.TpchPlugin;
+import io.trino.testing.DistributedQueryRunner;
+import io.trino.testing.QueryRunner;
+import io.trino.tpch.TpchTable;
+
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+
+import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME;
+import static io.trino.spi.type.TimeZoneKey.getTimeZoneKey;
+import static io.trino.testing.QueryAssertions.copyTpchTables;
+import static io.trino.testing.TestingSession.testSessionBuilder;
+
+public final class DatabendQueryRunner
+{
+ private DatabendQueryRunner() {}
+
+ public static QueryRunner createDatabendQueryRunner(
+ TestingDatabendServer server,
+ Map extraProperties,
+ Map connectorProperties)
+ throws Exception
+ {
+ System.setProperty("user.timezone", "UTC");
+ TimeZone.setDefault(TimeZone.getTimeZone(ZoneId.of("UTC")));
+
+ QueryRunner queryRunner = DistributedQueryRunner.builder(
+ testSessionBuilder()
+ .setCatalog("databend")
+ .setSchema("default")
+ .setTimeZoneKey(getTimeZoneKey("UTC"))
+ .build())
+ .setExtraProperties(extraProperties)
+ .build();
+
+ try {
+ queryRunner.installPlugin(new TpchPlugin());
+ queryRunner.createCatalog("tpch", "tpch");
+
+ Map properties = new HashMap<>(ImmutableMap.builder()
+ .put("connection-url", server.getJdbcUrl())
+ .put("connection-user", server.getUser())
+ .put("connection-password", server.getPassword())
+ .buildOrThrow());
+ properties.putAll(connectorProperties);
+
+ queryRunner.installPlugin(new DatabendPlugin());
+ queryRunner.createCatalog("databend", "databend", properties);
+
+ copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, TpchTable.getTables());
+
+ return queryRunner;
+ }
+ catch (Throwable e) {
+ queryRunner.close();
+ throw e;
+ }
+ }
+}
diff --git a/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestDatabendConnectorTest.java b/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestDatabendConnectorTest.java
new file mode 100644
index 000000000000..cf784b1d6951
--- /dev/null
+++ b/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestDatabendConnectorTest.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.databend;
+
+import io.trino.plugin.jdbc.BaseJdbcConnectorTest;
+import io.trino.testing.MaterializedResult;
+import io.trino.testing.QueryRunner;
+import io.trino.testing.TestingConnectorBehavior;
+import io.trino.testing.sql.SqlExecutor;
+import io.trino.testing.sql.TestTable;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+import java.util.Locale;
+import java.util.Optional;
+
+import static com.google.common.base.Strings.nullToEmpty;
+import static io.trino.spi.type.VarcharType.VARCHAR;
+import static io.trino.testing.MaterializedResult.resultBuilder;
+import static io.trino.testing.TestingNames.randomNameSuffix;
+import static java.lang.String.format;
+import static java.util.Collections.emptyMap;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.junit.jupiter.api.Assumptions.abort;
+import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
+
+@TestInstance(PER_CLASS)
+public class TestDatabendConnectorTest
+ extends BaseJdbcConnectorTest
+{
+ private TestingDatabendServer databendServer;
+
+ @Override
+ protected QueryRunner createQueryRunner()
+ throws Exception
+ {
+ databendServer = closeAfterClass(new TestingDatabendServer());
+ return DatabendQueryRunner.createDatabendQueryRunner(databendServer, emptyMap(), emptyMap());
+ }
+
+ @Override
+ protected SqlExecutor onRemoteDatabase()
+ {
+ return databendServer::execute;
+ }
+
+ @Override
+ protected boolean hasBehavior(TestingConnectorBehavior behavior)
+ {
+ return switch (behavior) {
+ case SUPPORTS_ARRAY,
+ SUPPORTS_MAP_TYPE,
+ SUPPORTS_ROW_TYPE,
+ SUPPORTS_DELETE,
+ SUPPORTS_ROW_LEVEL_DELETE,
+ SUPPORTS_UPDATE,
+ SUPPORTS_ROW_LEVEL_UPDATE,
+ SUPPORTS_RENAME_SCHEMA,
+ SUPPORTS_ADD_COLUMN_WITH_POSITION,
+ SUPPORTS_ADD_COLUMN_WITH_COMMENT,
+ SUPPORTS_NOT_NULL_CONSTRAINT,
+ SUPPORTS_ADD_COLUMN_NOT_NULL_CONSTRAINT,
+ SUPPORTS_DEFAULT_COLUMN_VALUE,
+ SUPPORTS_DROP_NOT_NULL_CONSTRAINT,
+ SUPPORTS_COMMENT_ON_COLUMN,
+ SUPPORTS_COMMENT_ON_TABLE,
+ SUPPORTS_CREATE_TABLE_WITH_COLUMN_COMMENT,
+ SUPPORTS_CREATE_TABLE_WITH_TABLE_COMMENT,
+ SUPPORTS_SET_COLUMN_TYPE,
+ SUPPORTS_RENAME_TABLE_ACROSS_SCHEMAS,
+ SUPPORTS_NATIVE_QUERY,
+ SUPPORTS_AGGREGATION_PUSHDOWN,
+ SUPPORTS_AGGREGATION_PUSHDOWN_STDDEV,
+ SUPPORTS_AGGREGATION_PUSHDOWN_VARIANCE,
+ SUPPORTS_AGGREGATION_PUSHDOWN_COVARIANCE,
+ SUPPORTS_AGGREGATION_PUSHDOWN_CORRELATION,
+ SUPPORTS_AGGREGATION_PUSHDOWN_REGRESSION,
+ SUPPORTS_AGGREGATION_PUSHDOWN_COUNT_DISTINCT,
+ SUPPORTS_CREATE_VIEW,
+ SUPPORTS_CREATE_MATERIALIZED_VIEW,
+ SUPPORTS_NEGATIVE_DATE -> false;
+ default -> super.hasBehavior(behavior);
+ };
+ }
+
+ @Test
+ @Override
+ public void testShowColumns()
+ {
+ assertThat(computeActual("SHOW COLUMNS FROM orders"))
+ .isEqualTo(getDescribeOrdersResult());
+ }
+
+ @Override
+ protected MaterializedResult getDescribeOrdersResult()
+ {
+ return resultBuilder(getSession(), VARCHAR, VARCHAR, VARCHAR, VARCHAR)
+ .row("orderkey", "bigint", "", "")
+ .row("custkey", "bigint", "", "")
+ .row("orderstatus", "varchar", "", "")
+ .row("totalprice", "double", "", "")
+ .row("orderdate", "date", "", "")
+ .row("orderpriority", "varchar", "", "")
+ .row("clerk", "varchar", "", "")
+ .row("shippriority", "integer", "", "")
+ .row("comment", "varchar", "", "")
+ .build();
+ }
+
+ @Override
+ protected Optional filterColumnNameTestData(String columnName)
+ {
+ // Databend converts column names to lowercase
+ return Optional.of(columnName.toLowerCase(Locale.ROOT));
+ }
+
+ @Override
+ protected Optional filterDataMappingSmokeTestData(DataMappingTestSetup setup)
+ {
+ String type = setup.getTrinoTypeName();
+ if (type.equals("time") || type.startsWith("time(")) {
+ return Optional.of(setup.asUnsupported());
+ }
+ return Optional.of(setup);
+ }
+
+ @Override
+ protected boolean isColumnNameRejected(Exception exception, String columnName, boolean delimited)
+ {
+ String message = nullToEmpty(exception.getMessage());
+ return message.contains("unable to recognize the rest tokens");
+ }
+
+ @Test
+ @Override
+ public void testAddColumn()
+ {
+ String tableName;
+ try (TestTable table = newTrinoTable("test_add_column_", tableDefinitionForAddColumn())) {
+ tableName = table.getName();
+ assertUpdate("INSERT INTO " + table.getName() + " SELECT 'first'", 1);
+ assertQueryFails("ALTER TABLE " + table.getName() + " ADD COLUMN x bigint", ".* Column 'x' already exists");
+ assertQueryFails("ALTER TABLE " + table.getName() + " ADD COLUMN X bigint", ".* Column 'X' already exists");
+
+ assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN a varchar(50)");
+ assertQuery(
+ "SELECT * FROM " + table.getName(),
+ "VALUES ('first', NULL)");
+ assertQuery(
+ "SELECT * FROM " + table.getName() + " WHERE a IS NULL",
+ "VALUES ('first', NULL)");
+ }
+ }
+
+ @Test
+ @Override // Overridden because the default storage type doesn't support adding columns
+ public void testAddNotNullColumnToEmptyTable()
+ {
+ try (TestTable table = newTrinoTable("test_add_notnull_col_to_empty", "(a_varchar varchar NOT NULL)")) {
+ String tableName = table.getName();
+
+ assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN b_varchar varchar NOT NULL");
+ assertUpdate("INSERT INTO " + tableName + " VALUES ('a', 'b')", 1);
+ assertThat(query("TABLE " + tableName))
+ .skippingTypesCheck()
+ .matches("VALUES ('a', 'b')");
+ }
+ }
+
+ @Test
+ @Override
+ public void testDropNotNullConstraint()
+ {
+ abort("Databend Not Support");
+ }
+
+ @Test
+ @Override
+ public void testInsertIntoNotNullColumn()
+ {
+ try (TestTable table = newTrinoTable("test_insert_not_null_", "(nullable_col INTEGER, not_null_col INTEGER NOT NULL)")) {
+ assertUpdate(format("INSERT INTO %s (not_null_col) VALUES (2)", table.getName()), 1);
+ assertQuery("SELECT * FROM " + table.getName(), "VALUES (NULL, 2)");
+ }
+
+ try (TestTable table = newTrinoTable("test_commuted_not_null_table", "(nullable_col BIGINT, not_null_col BIGINT NOT NULL)")) {
+ assertUpdate(format("INSERT INTO %s (not_null_col) VALUES (2)", table.getName()), 1);
+ assertQuery("SELECT * FROM " + table.getName(), "VALUES (NULL, 2)");
+ }
+ }
+
+ @Test
+ @Override
+ public void testUpdateNotNullColumn()
+ {
+ abort("Databend rejects updates that temporarily assign NULL to NOT NULL columns");
+ }
+
+ @Test
+ @Override
+ public void testColumnName()
+ {
+ abort("Databend rejects some generated column names used in the generic test");
+ }
+
+ @Test
+ @Override
+ public void testShowCreateTable()
+ {
+ String catalog = getSession().getCatalog().orElseThrow();
+ String schema = getSession().getSchema().orElseThrow();
+ assertThat(computeScalar("SHOW CREATE TABLE orders"))
+ .isEqualTo(format(
+ """
+ CREATE TABLE %s.%s.orders (
+ orderkey bigint,
+ custkey bigint,
+ orderstatus varchar,
+ totalprice double,
+ orderdate date,
+ orderpriority varchar,
+ clerk varchar,
+ shippriority integer,
+ comment varchar
+ )
+ WITH (
+ engine = 'FUSE'
+ )\
+ """,
+ catalog,
+ schema));
+ }
+
+ @Test
+ @Override
+ public void testDateYearOfEraPredicate()
+ {
+ assertQuery("SELECT orderdate FROM orders WHERE orderdate = DATE '1997-09-14'", "VALUES DATE '1997-09-14'");
+ assertQueryFails("SELECT * FROM orders WHERE orderdate = DATE '-1996-09-14'", invalidDateReadError("-1996-09-14"));
+ }
+
+ @Override
+ protected String errorMessageForInsertIntoNotNullColumn(String columnName)
+ {
+ return format("(?s).*%s.*", columnName);
+ }
+
+ @Override
+ protected String errorMessageForInsertNegativeDate(String date)
+ {
+ return invalidDateWriteError(date);
+ }
+
+ @Override
+ protected String errorMessageForCreateTableAsSelectNegativeDate(String date)
+ {
+ return invalidDateWriteError(date);
+ }
+
+ @Override
+ protected TestTable createTableWithDefaultColumns()
+ {
+ String schema = getSession().getSchema().orElseThrow();
+ return new TestTable(
+ onRemoteDatabase(),
+ schema + ".test_default_cols",
+ "(col_required BIGINT NOT NULL," +
+ "col_nullable BIGINT," +
+ "col_default BIGINT DEFAULT 43," +
+ "col_nonnull_default BIGINT NOT NULL DEFAULT 42," +
+ "col_required2 BIGINT NOT NULL)");
+ }
+
+ @Test
+ @Override
+ public void testCharVarcharComparison()
+ {
+ assertThatThrownBy(super::testCharVarcharComparison)
+ .isInstanceOf(AssertionError.class);
+ abort("Databend trims CHAR values differently than Trino");
+ }
+
+ @Test
+ @Override
+ public void testVarcharCharComparison()
+ {
+ assertThatThrownBy(super::testVarcharCharComparison)
+ .isInstanceOf(AssertionError.class);
+ abort("Databend trims CHAR values differently than Trino");
+ }
+
+ @Test
+ @Override
+ public void testCharTrailingSpace()
+ {
+ assertThatThrownBy(super::testCharTrailingSpace)
+ .isInstanceOf(AssertionError.class);
+ abort("Databend trims CHAR values differently than Trino");
+ }
+
+ @Test
+ @Override
+ public void testExecuteProcedure()
+ {
+ assertThatThrownBy(super::testExecuteProcedure)
+ .isInstanceOf(AssertionError.class);
+ abort("system.execute is not supported by Databend");
+ }
+
+ @Test
+ @Override
+ public void testExecuteProcedureWithInvalidQuery()
+ {
+ assertThatThrownBy(super::testExecuteProcedureWithInvalidQuery)
+ .isInstanceOf(AssertionError.class);
+ abort("system.execute is not supported by Databend");
+ }
+
+ @Test
+ @Override
+ public void testExecuteProcedureWithNamedArgument()
+ {
+ assertThatThrownBy(super::testExecuteProcedureWithNamedArgument)
+ .isInstanceOf(AssertionError.class);
+ abort("system.execute is not supported by Databend");
+ }
+
+ @Override
+ @Test
+ public void testInsert()
+ {
+ String tableName = "test_insert_" + randomNameSuffix();
+ assertUpdate("CREATE TABLE " + tableName + " (id BIGINT, name VARCHAR)");
+ assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'test')", 1);
+ assertQuery("SELECT * FROM " + tableName, "VALUES (1, 'test')");
+ assertUpdate("DROP TABLE " + tableName);
+ }
+
+ @Override
+ @Test
+ public void testCreateTable()
+ {
+ String tableName = "test_create_" + randomNameSuffix();
+ assertUpdate("CREATE TABLE " + tableName + " (id BIGINT, name VARCHAR, price DOUBLE)");
+ assertThat(computeActual("SHOW TABLES LIKE '" + tableName + "'").getRowCount())
+ .isEqualTo(1);
+ assertUpdate("DROP TABLE " + tableName);
+ }
+
+ @Override
+ @Test
+ public void testTruncateTable()
+ {
+ String tableName = "test_truncate_" + randomNameSuffix();
+ assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25);
+ assertUpdate("TRUNCATE TABLE " + tableName);
+ assertQuery("SELECT COUNT(*) FROM " + tableName, "SELECT 0");
+ assertUpdate("DROP TABLE " + tableName);
+ }
+
+ @Test
+ public void testBasicDataTypes()
+ {
+ String tableName = "test_types_" + randomNameSuffix();
+ assertUpdate("CREATE TABLE " + tableName + " (" +
+ "bool_col BOOLEAN, " +
+ "tinyint_col TINYINT, " +
+ "smallint_col SMALLINT, " +
+ "int_col INTEGER, " +
+ "bigint_col BIGINT, " +
+ "real_col REAL, " +
+ "double_col DOUBLE, " +
+ "varchar_col VARCHAR, " +
+ "date_col DATE)");
+
+ assertUpdate("INSERT INTO " + tableName + " VALUES (" +
+ "true, " +
+ "127, " +
+ "32767, " +
+ "2147483647, " +
+ "9223372036854775807, " +
+ "REAL '3.14', " +
+ "2.718, " +
+ "'test string', " +
+ "DATE '2024-01-01')", 1);
+
+ assertQuery("SELECT * FROM " + tableName,
+ "VALUES (true, 127, 32767, 2147483647, 9223372036854775807, CAST(3.14 AS real), 2.718, 'test string', DATE '2024-01-01')");
+
+ assertUpdate("DROP TABLE " + tableName);
+ }
+
+ @Test
+ public void testDecimalType()
+ {
+ String tableName = "test_decimal_" + randomNameSuffix();
+ assertUpdate("CREATE TABLE " + tableName + " (decimal_col DECIMAL(10, 2))");
+ assertUpdate("INSERT INTO " + tableName + " VALUES (123.45)", 1);
+ assertQuery("SELECT * FROM " + tableName, "VALUES (123.45)");
+ assertUpdate("DROP TABLE " + tableName);
+ }
+
+ @Override
+ @Test
+ public void testAggregationPushdown()
+ {
+ String tableName = "test_agg_" + randomNameSuffix();
+ assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25);
+
+ assertQuery("SELECT COUNT(*) FROM " + tableName, "SELECT 25");
+ assertQuery("SELECT COUNT(regionkey) FROM " + tableName, "SELECT 25");
+ assertQuery("SELECT MIN(regionkey) FROM " + tableName, "SELECT 0");
+ assertQuery("SELECT MAX(regionkey) FROM " + tableName, "SELECT 4");
+ assertQuery("SELECT SUM(regionkey) FROM " + tableName, "SELECT 50");
+
+ assertUpdate("DROP TABLE " + tableName);
+ }
+
+ @Override
+ @Test
+ public void testLimitPushdown()
+ {
+ String tableName = "test_limit_" + randomNameSuffix();
+ assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25);
+
+ assertThat(computeActual("SELECT * FROM " + tableName + " LIMIT 10").getRowCount())
+ .isEqualTo(10);
+ assertThat(computeActual("SELECT * FROM " + tableName + " LIMIT 5").getRowCount())
+ .isEqualTo(5);
+
+ assertUpdate("DROP TABLE " + tableName);
+ }
+
+ @Override
+ protected void verifyConcurrentAddColumnFailurePermissible(Exception e)
+ {
+ if (e.getMessage() != null && e.getMessage().contains("QueryErrors{code=2009")) {
+ return;
+ }
+ super.verifyConcurrentAddColumnFailurePermissible(e);
+ }
+
+ @Test
+ @Override
+ public void verifySupportsDeleteDeclaration()
+ {
+ abort("Databend executes DELETE statements instead of rejecting them for unsupported connectors");
+ }
+
+ @Test
+ @Override
+ public void verifySupportsRowLevelDeleteDeclaration()
+ {
+ abort("Databend executes DELETE statements instead of rejecting them for unsupported connectors");
+ }
+
+ @Test
+ @Override
+ public void verifySupportsUpdateDeclaration()
+ {
+ abort("Databend executes UPDATE statements instead of rejecting them for unsupported connectors");
+ }
+
+ @Test
+ @Override
+ public void testDataMappingSmokeTest()
+ {
+ abort("Databend does not support all TIME/TIMESTAMP combinations exercised by the generic smoke test");
+ }
+
+ @Test
+ @Override
+ public void testInsertWithoutTemporaryTable()
+ {
+ abort("Databend requires elevated privileges to inspect written tables, so the temporary-table bypass test cannot run");
+ }
+
+ private static String invalidDateWriteError(String date)
+ {
+ return format("(?s).*Invalid value '%s'.*", date);
+ }
+
+ private static String invalidDateReadError(String date)
+ {
+ return format("(?s).*cannot parse to type `DATE`.*%s.*", date);
+ }
+}
diff --git a/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestDatabendPlugin.java b/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestDatabendPlugin.java
new file mode 100644
index 000000000000..79adf3663627
--- /dev/null
+++ b/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestDatabendPlugin.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.databend;
+
+import com.google.common.collect.ImmutableMap;
+import io.trino.spi.Plugin;
+import io.trino.spi.connector.Connector;
+import io.trino.spi.connector.ConnectorFactory;
+import io.trino.testing.TestingConnectorContext;
+import org.junit.jupiter.api.Test;
+
+import static com.google.common.collect.Iterables.getOnlyElement;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestDatabendPlugin
+{
+ @Test
+ public void testCreateConnector()
+ {
+ Plugin plugin = new DatabendPlugin();
+ ConnectorFactory factory = getOnlyElement(plugin.getConnectorFactories());
+ Connector connector = factory.create(
+ "test",
+ ImmutableMap.of(
+ "connection-url", "jdbc:databend://localhost:8000/",
+ "connection-user", "databend",
+ "connection-password", "databend"),
+ new TestingConnectorContext());
+ assertThat(connector).isNotNull();
+ connector.shutdown();
+ }
+}
diff --git a/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestDatabendTypeMapping.java b/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestDatabendTypeMapping.java
new file mode 100644
index 000000000000..ea0dac534b91
--- /dev/null
+++ b/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestDatabendTypeMapping.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.databend;
+
+import io.trino.Session;
+import io.trino.testing.AbstractTestQueryFramework;
+import io.trino.testing.QueryRunner;
+import io.trino.testing.sql.SqlExecutor;
+import org.junit.jupiter.api.Test;
+
+import java.util.Locale;
+
+import static java.lang.String.format;
+import static java.util.Collections.emptyMap;
+
+public class TestDatabendTypeMapping
+ extends AbstractTestQueryFramework
+{
+ private TestingDatabendServer databendServer;
+
+ @Override
+ protected QueryRunner createQueryRunner()
+ throws Exception
+ {
+ databendServer = closeAfterClass(new TestingDatabendServer());
+ return DatabendQueryRunner.createDatabendQueryRunner(databendServer, emptyMap(), emptyMap());
+ }
+
+ protected SqlExecutor onRemoteDatabase()
+ {
+ return databendServer::execute;
+ }
+
+ @Test
+ public void testBoolean()
+ {
+ // Databend supports BOOLEAN type
+ SqlExecutor jdbcSqlExecutor = onRemoteDatabase();
+ Session session = getSession();
+
+ jdbcSqlExecutor.execute("CREATE TABLE test_boolean_type (boolean_column BOOLEAN)");
+ assertTypeMapping(session, jdbcSqlExecutor, "BOOLEAN", "true", "BOOLEAN");
+ assertTypeMapping(session, jdbcSqlExecutor, "BOOLEAN", "false", "BOOLEAN");
+ jdbcSqlExecutor.execute("DROP TABLE test_boolean_type");
+ }
+
+ @Test
+ public void testTinyint()
+ {
+ // Databend supports TINYINT
+ SqlExecutor jdbcSqlExecutor = onRemoteDatabase();
+ Session session = getSession();
+
+ jdbcSqlExecutor.execute("CREATE TABLE test_tinyint_type (tinyint_column TINYINT)");
+ assertTypeMapping(session, jdbcSqlExecutor, "TINYINT", "127", "TINYINT");
+ assertTypeMapping(session, jdbcSqlExecutor, "TINYINT", "-128", "TINYINT");
+ jdbcSqlExecutor.execute("DROP TABLE test_tinyint_type");
+ }
+
+ @Test
+ public void testSmallint()
+ {
+ // Databend supports SMALLINT
+ SqlExecutor jdbcSqlExecutor = onRemoteDatabase();
+ Session session = getSession();
+
+ jdbcSqlExecutor.execute("CREATE TABLE test_smallint_type (smallint_column SMALLINT)");
+ assertTypeMapping(session, jdbcSqlExecutor, "SMALLINT", "32767", "SMALLINT");
+ assertTypeMapping(session, jdbcSqlExecutor, "SMALLINT", "-32768", "SMALLINT");
+ jdbcSqlExecutor.execute("DROP TABLE test_smallint_type");
+ }
+
+ @Test
+ public void testInteger()
+ {
+ // Databend supports INTEGER
+ SqlExecutor jdbcSqlExecutor = onRemoteDatabase();
+ Session session = getSession();
+
+ jdbcSqlExecutor.execute("CREATE TABLE test_integer_type (integer_column INTEGER)");
+ assertTypeMapping(session, jdbcSqlExecutor, "INTEGER", "2147483647", "INTEGER");
+ assertTypeMapping(session, jdbcSqlExecutor, "INTEGER", "-2147483648", "INTEGER");
+ jdbcSqlExecutor.execute("DROP TABLE test_integer_type");
+ }
+
+ @Test
+ public void testBigint()
+ {
+ // Databend supports BIGINT
+ SqlExecutor jdbcSqlExecutor = onRemoteDatabase();
+ Session session = getSession();
+
+ jdbcSqlExecutor.execute("CREATE TABLE test_bigint_type (bigint_column BIGINT)");
+ assertTypeMapping(session, jdbcSqlExecutor, "BIGINT", "9223372036854775807", "BIGINT");
+ assertTypeMapping(session, jdbcSqlExecutor, "BIGINT", "-9223372036854775808", "BIGINT");
+ jdbcSqlExecutor.execute("DROP TABLE test_bigint_type");
+ }
+
+ private void assertTypeMapping(Session session, SqlExecutor remoteExecutor, String remoteType, String remoteValue, String expectedTrinoType)
+ {
+ String baseName = remoteType.toLowerCase(Locale.ENGLISH);
+ String tableName = format("test_%s_type", baseName);
+ String columnName = format("%s_column", baseName);
+
+ remoteExecutor.execute(format("INSERT INTO %s (%s) VALUES (%s)", tableName, columnName, remoteValue));
+
+ assertQuery(
+ session,
+ format("SELECT UPPER(typeof(%s)) FROM %s LIMIT 1", columnName, tableName),
+ format("VALUES CAST('%s' AS varchar)", expectedTrinoType));
+
+ assertQuery(
+ session,
+ format("SELECT %s FROM %s", columnName, tableName),
+ format("VALUES (%s)", remoteValue));
+
+ remoteExecutor.execute(format("DELETE FROM %s", tableName));
+ }
+}
diff --git a/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestingDatabendServer.java b/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestingDatabendServer.java
new file mode 100644
index 000000000000..148a57827e90
--- /dev/null
+++ b/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestingDatabendServer.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.databend;
+
+import org.testcontainers.databend.DatabendContainer;
+import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+
+import static java.util.Objects.requireNonNull;
+
+public class TestingDatabendServer
+ implements Closeable
+{
+ private static final DockerImageName DATABEND_IMAGE = DockerImageName.parse("datafuselabs/databend:nightly");
+ private static final String QUERY_CONFIG_PATH = "/databend-config/query.toml";
+
+ private final DatabendContainer databendContainer;
+ private final String jdbcUrl;
+
+ public TestingDatabendServer()
+ {
+ DatabendContainer started = null;
+ String readyJdbcUrl = null;
+ boolean success = false;
+ try {
+ Path configFile = createConfigFile(buildDatabendConfig());
+
+ started = new DatabendContainer(DATABEND_IMAGE)
+ .withEnv("QUERY_DEFAULT_USER", getUser())
+ .withEnv("QUERY_DEFAULT_PASSWORD", getPassword())
+ .withEnv("QUERY_CONFIG_FILE", QUERY_CONFIG_PATH)
+ .withCopyFileToContainer(MountableFile.forHostPath(configFile), QUERY_CONFIG_PATH);
+
+ started.start();
+ readyJdbcUrl = ensureReadyAndGetJdbcUrl(started);
+ success = true;
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Failed to prepare Databend config", e);
+ }
+ finally {
+ if (!success) {
+ closeQuietly(started);
+ }
+ }
+
+ this.databendContainer = started;
+ this.jdbcUrl = readyJdbcUrl;
+ }
+
+ public String getJdbcUrl()
+ {
+ return jdbcUrl;
+ }
+
+ public String getUser()
+ {
+ return "root";
+ }
+
+ public String getPassword()
+ {
+ return "";
+ }
+
+ public void execute(String sql)
+ {
+ try (Connection connection = DriverManager.getConnection(jdbcUrl, getUser(), getPassword());
+ Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ }
+ catch (SQLException e) {
+ throw new RuntimeException("Failed to execute SQL: " + sql, e);
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ closeQuietly(databendContainer);
+ }
+
+ private String ensureReadyAndGetJdbcUrl(DatabendContainer container)
+ {
+ String url = withPresignDisabled(container.getJdbcUrl());
+ requireNonNull(url, "Databend JDBC URL is null");
+
+ long deadline = System.nanoTime() + Duration.ofMinutes(2).toNanos();
+
+ while (true) {
+ try (Connection connection = DriverManager.getConnection(url, getUser(), getPassword());
+ Statement statement = connection.createStatement()) {
+ statement.execute("SELECT 1");
+ statement.execute("CREATE DATABASE IF NOT EXISTS default");
+ return url;
+ }
+ catch (SQLException ex) {
+ if (System.nanoTime() > deadline) {
+ throw new RuntimeException("Databend server did not become ready in time", ex);
+ }
+ try {
+ Thread.sleep(2_000);
+ }
+ catch (InterruptedException interruptedException) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while waiting for Databend to start", interruptedException);
+ }
+ }
+ }
+ }
+
+ private static void closeQuietly(DatabendContainer container)
+ {
+ if (container != null) {
+ try {
+ container.close();
+ }
+ catch (RuntimeException ignored) {
+ }
+ }
+ }
+
+ private static Path createConfigFile(String contents)
+ throws IOException
+ {
+ Path configFile = Files.createTempFile("databend-config", ".toml");
+ Files.writeString(configFile, contents, StandardCharsets.UTF_8);
+ configFile.toFile().deleteOnExit();
+ return configFile;
+ }
+
+ private static String buildDatabendConfig()
+ {
+ return String.join("\n",
+ "[query]",
+ "max_active_sessions = 256",
+ "shutdown_wait_timeout_ms = 5000",
+ "",
+ "flight_api_address = \"0.0.0.0:9090\"",
+ "admin_api_address = \"0.0.0.0:8080\"",
+ "metric_api_address = \"0.0.0.0:7070\"",
+ "",
+ "mysql_handler_host = \"0.0.0.0\"",
+ "mysql_handler_port = 3307",
+ "",
+ "clickhouse_http_handler_host = \"0.0.0.0\"",
+ "clickhouse_http_handler_port = 8124",
+ "",
+ "http_handler_host = \"0.0.0.0\"",
+ "http_handler_port = 8000",
+ "",
+ "flight_sql_handler_host = \"0.0.0.0\"",
+ "flight_sql_handler_port = 8900",
+ "",
+ "tenant_id = \"default\"",
+ "cluster_id = \"default\"",
+ "",
+ "[log]",
+ "",
+ "[log.stderr]",
+ "level = \"WARN\"",
+ "format = \"text\"",
+ "",
+ "[log.file]",
+ "level = \"INFO\"",
+ "dir = \"/var/log/databend\"",
+ "",
+ "[meta]",
+ "endpoints = [\"0.0.0.0:9191\"]",
+ "username = \"root\"",
+ "password = \"root\"",
+ "client_timeout_in_second = 60",
+ "",
+ "[[query.users]]",
+ "name = \"databend\"",
+ "auth_type = \"double_sha1_password\"",
+ "auth_string = \"3081f32caef285c232d066033c89a78d88a6d8a5\"",
+ "",
+ "[[query.users]]",
+ "name = \"root\"",
+ "auth_type = \"no_password\"",
+ "",
+ "[storage]",
+ "type = \"fs\"",
+ "",
+ "[storage.fs]",
+ "data_path = \"/var/lib/databend/data\"",
+ "") + "\n";
+ }
+
+ private static String withPresignDisabled(String jdbcUrl)
+ {
+ if (jdbcUrl.contains("?")) {
+ return jdbcUrl + "&presigned_url_disabled=true";
+ }
+ return jdbcUrl + "?presigned_url_disabled=true";
+ }
+}
diff --git a/pom.xml b/pom.xml
index 25b8e0fe5947..11ad2530ef05 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,6 +65,7 @@
plugin/trino-blackhole
plugin/trino-cassandra
plugin/trino-clickhouse
+ plugin/trino-databend
plugin/trino-delta-lake
plugin/trino-druid
plugin/trino-duckdb
diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/multinode-all/databend.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/multinode-all/databend.properties
new file mode 100644
index 000000000000..5dc861fcd574
--- /dev/null
+++ b/testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/multinode-all/databend.properties
@@ -0,0 +1,4 @@
+connector.name=databend
+connection-url=jdbc:databend://databend:8000/
+connection-user=root
+connection-password=
diff --git a/testing/trino-testing-containers/pom.xml b/testing/trino-testing-containers/pom.xml
index c22ee0d6db83..aff03c733482 100644
--- a/testing/trino-testing-containers/pom.xml
+++ b/testing/trino-testing-containers/pom.xml
@@ -117,6 +117,12 @@
true
+
+ org.apache.commons
+ commons-lang3
+ runtime
+
+
io.airlift
junit-extensions
diff --git a/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/junit/ReportLeakedContainers.java b/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/junit/ReportLeakedContainers.java
index f5a3284e629e..ea3745f2e654 100644
--- a/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/junit/ReportLeakedContainers.java
+++ b/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/junit/ReportLeakedContainers.java
@@ -25,6 +25,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.ServiceConfigurationError;
import java.util.Set;
import static com.google.common.base.MoreObjects.toStringHelper;
@@ -76,28 +77,38 @@ public void testPlanExecutionFinished(TestPlan testPlan)
log.info("Checking for leaked containers");
- @SuppressWarnings("resource") // Throws when close is attempted, as this is a global instance.
- DockerClient dockerClient = DockerClientFactory.lazyClient();
+ ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ try {
+ @SuppressWarnings("resource") // Throws when close is attempted, as this is a global instance.
+ DockerClient dockerClient = DockerClientFactory.lazyClient();
- List containers = dockerClient.listContainersCmd()
- .withLabelFilter(Map.of(DockerClientFactory.TESTCONTAINERS_SESSION_ID_LABEL, DockerClientFactory.SESSION_ID))
- // ignore status "exited" - for example, failed containers after using `withStartupAttempts()`
- .withStatusFilter(List.of("created", "restarting", "running", "paused"))
- .exec()
- .stream()
- // testcontainers/sshd is implicitly started by testcontainers and we trust the library to stop if when no longer needed
- .filter(container -> !container.getImage().startsWith("testcontainers/sshd:"))
- .filter(container -> !ignoredIds.contains(container.getId()))
- .collect(toImmutableList());
+ List containers = dockerClient.listContainersCmd()
+ .withLabelFilter(Map.of(DockerClientFactory.TESTCONTAINERS_SESSION_ID_LABEL, DockerClientFactory.SESSION_ID))
+ // ignore status "exited" - for example, failed containers after using `withStartupAttempts()`
+ .withStatusFilter(List.of("created", "restarting", "running", "paused"))
+ .exec()
+ .stream()
+ // testcontainers/sshd is implicitly started by testcontainers and we trust the library to stop if when no longer needed
+ .filter(container -> !container.getImage().startsWith("testcontainers/sshd:"))
+ .filter(container -> !ignoredIds.contains(container.getId()))
+ .collect(toImmutableList());
- if (!containers.isEmpty()) {
- reportListenerFailure(getClass(), "Leaked containers: %s", containers.stream()
- .map(container -> toStringHelper("container")
- .add("id", container.getId())
- .add("image", container.getImage())
- .add("imageId", container.getImageId())
- .toString())
- .collect(joining(", ", "[", "]")));
+ if (!containers.isEmpty()) {
+ reportListenerFailure(getClass(), "Leaked containers: %s", containers.stream()
+ .map(container -> toStringHelper("container")
+ .add("id", container.getId())
+ .add("image", container.getImage())
+ .add("imageId", container.getImageId())
+ .toString())
+ .collect(joining(", ", "[", "]")));
+ }
+ }
+ catch (ServiceConfigurationError | NoClassDefFoundError e) {
+ log.warn(e, "Docker client unavailable, skipping leaked container check");
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(previousContextClassLoader);
}
}
}