diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 413d5d43e338..8a3d52390793 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -350,6 +350,7 @@ jobs: !:trino-bigquery, !:trino-cassandra, !:trino-clickhouse, + !:trino-databend, !:trino-delta-lake, !:trino-docs, !:trino-druid, @@ -480,6 +481,7 @@ jobs: - { modules: plugin/trino-bigquery, profile: cloud-tests-2 } - { modules: plugin/trino-cassandra } - { modules: plugin/trino-clickhouse } + - { modules: plugin/trino-databend } - { modules: plugin/trino-delta-lake } - { modules: plugin/trino-delta-lake, profile: cloud-tests } - { modules: plugin/trino-delta-lake, profile: fte-tests } diff --git a/core/trino-server/src/main/provisio/trino.xml b/core/trino-server/src/main/provisio/trino.xml index 97a366b39290..ce528458bc64 100644 --- a/core/trino-server/src/main/provisio/trino.xml +++ b/core/trino-server/src/main/provisio/trino.xml @@ -40,6 +40,12 @@ + + + + + + diff --git a/docs/src/main/sphinx/connector.md b/docs/src/main/sphinx/connector.md index a954b30cf059..d9d52f682a1f 100644 --- a/docs/src/main/sphinx/connector.md +++ b/docs/src/main/sphinx/connector.md @@ -12,6 +12,7 @@ BigQuery Black Hole Cassandra ClickHouse +Databend Delta Lake Druid DuckDB diff --git a/docs/src/main/sphinx/connector/databend.md b/docs/src/main/sphinx/connector/databend.md new file mode 100644 index 000000000000..33322dde6d88 --- /dev/null +++ b/docs/src/main/sphinx/connector/databend.md @@ -0,0 +1,279 @@ +# Databend connector + +```{raw} html + +``` + +The Databend connector allows querying and creating tables in an external +[Databend](https://databend.rs/) database. This can be used to join data between +different systems like Databend and Hive, or between two different +Databend instances. + +## Requirements + +To connect to Databend, you need: + +- Databend 1.2.0 or higher +- Network access from the Trino coordinator and workers to Databend. Port 8000 is the default port. + +## Configuration + +To configure the Databend connector, create a catalog properties file +in `etc/catalog` named, for example, `example.properties`, to +mount the Databend connector as the `databend` catalog. +Create the file with the following contents, replacing the +connection properties as appropriate for your setup: + +```none +connector.name=databend +connection-url=jdbc:databend://host:8000/ +connection-user=root +connection-password= +``` + +The `connection-url` defines the connection information and parameters to pass +to the Databend JDBC driver. The supported parameters for the URL are +available in the [Databend JDBC driver documentation](https://github.com/databendlabs/databend-jdbc). + +The `connection-user` and `connection-password` are typically required and +determine the user credentials for the connection, often a service user. You can +use {doc}`secrets ` to avoid actual values in the catalog +properties files. + +### Multiple Databend databases + +The Databend connector can query multiple databases within a Databend instance. +If you have multiple Databend instances, or want to connect to multiple +catalogs in the same instance, configure another instance of the +Databend connector as a separate catalog. + +### Connection security + +If you have TLS configured with a globally-trusted certificate installed on your +data source, you can enable TLS between your cluster and the data +source by appending the `ssl=true` parameter to the JDBC connection string set in the +`connection-url` catalog configuration property. + +For example, with the Databend connector, enable TLS by appending the `ssl=true` +parameter to the `connection-url` configuration property: + +```properties +connection-url=jdbc:databend://host:8000/?ssl=true +``` + +For more information on TLS configuration options, see the +[Databend JDBC driver documentation](https://github.com/databendlabs/databend-jdbc). + +```{include} jdbc-authentication.fragment +``` + +```{include} jdbc-kerberos.fragment +``` + +(databend-type-mapping)= +## Type mapping + +Because Trino and Databend each support types that the other does not, this +connector {ref}`modifies some types ` when reading or +writing data. Data types may not map the same way in both directions between +Trino and the data source. Refer to the following sections for type mapping in +each direction. + +### Databend type to Trino type mapping + +The connector maps Databend types to the corresponding Trino types following +this table: + +:::{list-table} Databend type to Trino type mapping +:widths: 30, 30, 40 +:header-rows: 1 + +* - Databend type + - Trino type + - Notes +* - `BOOLEAN` + - `BOOLEAN` + - +* - `TINYINT` + - `TINYINT` + - +* - `SMALLINT` + - `SMALLINT` + - +* - `INTEGER` + - `INTEGER` + - +* - `BIGINT` + - `BIGINT` + - +* - `REAL` + - `REAL` + - +* - `DOUBLE` + - `DOUBLE` + - +* - `DECIMAL(p, s)` + - `DECIMAL(p, s)` + - See [](decimal-type-handling) +* - `VARCHAR` + - `VARCHAR` + - +* - `DATE` + - `DATE` + - +* - `TIMESTAMP` + - `TIMESTAMP(0)` + - Databend TIMESTAMP is mapped to TIMESTAMP(0) in Trino +::: + +No other types are supported. + +### Trino type to Databend type mapping + +The connector maps Trino types to the corresponding Databend types following +this table: + +:::{list-table} Trino type to Databend type mapping +:widths: 30, 30, 40 +:header-rows: 1 + +* - Trino type + - Databend type + - Notes +* - `BOOLEAN` + - `BOOLEAN` + - +* - `TINYINT` + - `TINYINT` + - +* - `SMALLINT` + - `SMALLINT` + - +* - `INTEGER` + - `INTEGER` + - +* - `BIGINT` + - `BIGINT` + - +* - `REAL` + - `REAL` + - +* - `DOUBLE` + - `DOUBLE` + - +* - `DECIMAL(p, s)` + - `DECIMAL(p, s)` + - +* - `CHAR(n)` + - `VARCHAR` + - +* - `VARCHAR(n)` + - `VARCHAR` + - +* - `VARBINARY` + - `VARCHAR` + - +* - `DATE` + - `DATE` + - +* - `TIMESTAMP(0)` + - `TIMESTAMP` + - +::: + +No other types are supported. + +```{include} jdbc-type-mapping.fragment +``` + +(databend-sql-support)= +## SQL support + +The connector provides read access and write access to data and metadata in +a Databend database. In addition to the {ref}`globally available +` and {ref}`read operation ` +statements, the connector supports the following features: + +- {doc}`/sql/insert` +- {doc}`/sql/truncate` +- {doc}`/sql/create-table` +- {doc}`/sql/create-table-as` +- {doc}`/sql/drop-table` +- {doc}`/sql/alter-table` +- {doc}`/sql/create-schema` +- {doc}`/sql/drop-schema` + +```{include} alter-table-limitation.fragment +``` + +```{include} alter-schema-limitation.fragment +``` + +### Procedures + +```{include} jdbc-procedures-flush.fragment +``` +```{include} procedures-execute.fragment +``` + +### Table functions + +The connector provides specific [table functions](/functions/table) to +access Databend. + +(databend-query-function)= +#### `query(varchar) -> table` + +The `query` function allows you to query the underlying database directly. It +requires syntax native to Databend, because the full query is pushed down and +processed in Databend. This can be useful for accessing native features which +are not available in Trino or for improving query performance in situations +where running a query natively may be faster. + +```{include} query-passthrough-warning.fragment +``` + +As a simple example, query the `example` catalog and select an entire table: + +``` +SELECT + * +FROM + TABLE( + example.system.query( + query => 'SELECT + * + FROM + tpch.nation' + ) + ); +``` + +```{include} query-table-function-ordering.fragment +``` + +## Performance + +The connector includes a number of performance improvements, detailed in the +following sections. + +(databend-pushdown)= +### Pushdown + +The connector supports pushdown for a number of operations: + +- {ref}`limit-pushdown` + +{ref}`Aggregate pushdown ` for the following functions: + +- {func}`avg` +- {func}`count` +- {func}`max` +- {func}`min` +- {func}`sum` + +```{include} pushdown-correctness-behavior.fragment +``` + +```{include} no-inequality-pushdown-text-type.fragment +``` diff --git a/plugin/trino-databend/pom.xml b/plugin/trino-databend/pom.xml new file mode 100644 index 000000000000..6af5fab9f7d0 --- /dev/null +++ b/plugin/trino-databend/pom.xml @@ -0,0 +1,325 @@ + + + 4.0.0 + + + io.trino + trino-root + 479-SNAPSHOT + ../../pom.xml + + + trino-databend + trino-plugin + Trino - Databend Connector + + + true + + + + + com.databend + databend-jdbc + 0.4.0 + + + com.squareup.okio + okio + + + com.squareup.okio + okio-jvm + + + joda-time + joda-time + + + org.slf4j + slf4j-api + + + + + + com.google.guava + guava + + + + com.google.inject + guice + classes + + + + io.airlift + configuration + + + + io.airlift + units + + + + io.trino + trino-base-jdbc + + + + io.trino + trino-plugin-toolkit + + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + io.airlift + slice + provided + + + + io.opentelemetry + opentelemetry-api + provided + + + + io.opentelemetry + opentelemetry-api-incubator + provided + + + + io.opentelemetry + opentelemetry-context + provided + + + + io.trino + trino-spi + provided + + + + org.openjdk.jol + jol-core + provided + + + + io.airlift + log + runtime + + + + io.airlift + log-manager + runtime + + + + + io.airlift + junit-extensions + test + + + + io.airlift + testing + test + + + + io.trino + trino-base-jdbc + test-jar + test + + + + io.trino + trino-main + test + + + + io.trino + trino-main + test-jar + test + + + + io.trino + trino-plugin-toolkit + test-jar + test + + + + io.trino + trino-testing + test + + + + io.trino + trino-testing-containers + test + + + + io.trino + trino-testing-services + test + + + + io.trino + trino-tpch + test + + + + io.trino.tpch + tpch + test + + + + org.apache.commons + commons-lang3 + test + + + + org.assertj + assertj-core + test + + + + org.jetbrains + annotations + test + + + + org.junit.jupiter + junit-jupiter-api + test + + + + org.junit.jupiter + junit-jupiter-engine + test + + + + org.junit.jupiter + junit-jupiter-params + test + + + + org.testcontainers + databend + 1.20.2 + test + + + + org.testcontainers + testcontainers + test + + + + software.amazon.awssdk + apache-client + test + + + commons-logging + commons-logging + + + + + + software.amazon.awssdk + auth + test + + + + software.amazon.awssdk + aws-core + test + + + + software.amazon.awssdk + http-client-spi + test + + + + software.amazon.awssdk + regions + test + + + + software.amazon.awssdk + s3 + test + + + + software.amazon.awssdk + sdk-core + test + + + + + + + org.basepom.maven + duplicate-finder-maven-plugin + + + commonMain/default/.* + kotlin/.* + okio/.* + org/joda/time/tz/data/.* + + + kotlin\..* + okhttp3\..* + okio\..* + + + + com.databend + databend-jdbc + + + + + + + diff --git a/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendClient.java b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendClient.java new file mode 100644 index 000000000000..724b22c8e40d --- /dev/null +++ b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendClient.java @@ -0,0 +1,720 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.databend; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.primitives.Shorts; +import com.google.inject.Inject; +import io.trino.plugin.base.mapping.IdentifierMapping; +import io.trino.plugin.jdbc.BaseJdbcClient; +import io.trino.plugin.jdbc.BaseJdbcConfig; +import io.trino.plugin.jdbc.CaseSensitivity; +import io.trino.plugin.jdbc.ColumnMapping; +import io.trino.plugin.jdbc.ConnectionFactory; +import io.trino.plugin.jdbc.JdbcColumnHandle; +import io.trino.plugin.jdbc.JdbcSortItem; +import io.trino.plugin.jdbc.JdbcTableHandle; +import io.trino.plugin.jdbc.JdbcTypeHandle; +import io.trino.plugin.jdbc.LongReadFunction; +import io.trino.plugin.jdbc.LongWriteFunction; +import io.trino.plugin.jdbc.PreparedQuery; +import io.trino.plugin.jdbc.QueryBuilder; +import io.trino.plugin.jdbc.RemoteTableName; +import io.trino.plugin.jdbc.StandardColumnMappings; +import io.trino.plugin.jdbc.WriteMapping; +import io.trino.plugin.jdbc.expression.ParameterizedExpression; +import io.trino.plugin.jdbc.logging.RemoteQueryModifier; +import io.trino.spi.TrinoException; +import io.trino.spi.connector.ColumnMetadata; +import io.trino.spi.connector.ColumnPosition; +import io.trino.spi.connector.ConnectorSession; +import io.trino.spi.connector.ConnectorTableMetadata; +import io.trino.spi.connector.JoinStatistics; +import io.trino.spi.connector.JoinType; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.type.CharType; +import io.trino.spi.type.DecimalType; +import io.trino.spi.type.Decimals; +import io.trino.spi.type.Type; +import io.trino.spi.type.VarbinaryType; +import io.trino.spi.type.VarcharType; +import org.weakref.jmx.$internal.guava.base.Enums; + +import javax.annotation.Nullable; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; +import java.time.LocalDate; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.function.BiFunction; + +import static com.google.common.base.Strings.isNullOrEmpty; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static io.trino.plugin.databend.DatabendTableProperties.ENGINE_PROPERTY; +import static io.trino.plugin.jdbc.CaseSensitivity.CASE_INSENSITIVE; +import static io.trino.plugin.jdbc.CaseSensitivity.CASE_SENSITIVE; +import static io.trino.plugin.jdbc.DecimalConfig.DecimalMapping.ALLOW_OVERFLOW; +import static io.trino.plugin.jdbc.DecimalSessionSessionProperties.getDecimalDefaultScale; +import static io.trino.plugin.jdbc.DecimalSessionSessionProperties.getDecimalRounding; +import static io.trino.plugin.jdbc.DecimalSessionSessionProperties.getDecimalRoundingMode; +import static io.trino.plugin.jdbc.JdbcErrorCode.JDBC_ERROR; +import static io.trino.plugin.jdbc.JdbcJoinPushdownUtil.implementJoinCostAware; +import static io.trino.plugin.jdbc.PredicatePushdownController.DISABLE_PUSHDOWN; +import static io.trino.plugin.jdbc.StandardColumnMappings.bigintColumnMapping; +import static io.trino.plugin.jdbc.StandardColumnMappings.bigintWriteFunction; +import static io.trino.plugin.jdbc.StandardColumnMappings.booleanWriteFunction; +import static io.trino.plugin.jdbc.StandardColumnMappings.decimalColumnMapping; +import static io.trino.plugin.jdbc.StandardColumnMappings.doubleColumnMapping; +import static io.trino.plugin.jdbc.StandardColumnMappings.doubleWriteFunction; +import static io.trino.plugin.jdbc.StandardColumnMappings.integerColumnMapping; +import static io.trino.plugin.jdbc.StandardColumnMappings.integerWriteFunction; +import static io.trino.plugin.jdbc.StandardColumnMappings.longDecimalWriteFunction; +import static io.trino.plugin.jdbc.StandardColumnMappings.realWriteFunction; +import static io.trino.plugin.jdbc.StandardColumnMappings.shortDecimalWriteFunction; +import static io.trino.plugin.jdbc.StandardColumnMappings.smallintColumnMapping; +import static io.trino.plugin.jdbc.StandardColumnMappings.smallintWriteFunction; +import static io.trino.plugin.jdbc.StandardColumnMappings.tinyintColumnMapping; +import static io.trino.plugin.jdbc.StandardColumnMappings.tinyintWriteFunction; +import static io.trino.plugin.jdbc.StandardColumnMappings.varbinaryWriteFunction; +import static io.trino.plugin.jdbc.StandardColumnMappings.varcharWriteFunction; +import static io.trino.plugin.jdbc.TypeHandlingJdbcSessionProperties.getUnsupportedTypeHandling; +import static io.trino.plugin.jdbc.UnsupportedTypeHandling.CONVERT_TO_VARCHAR; +import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static io.trino.spi.StandardErrorCode.SCHEMA_NOT_EMPTY; +import static io.trino.spi.type.BigintType.BIGINT; +import static io.trino.spi.type.BooleanType.BOOLEAN; +import static io.trino.spi.type.DateType.DATE; +import static io.trino.spi.type.DecimalType.createDecimalType; +import static io.trino.spi.type.DoubleType.DOUBLE; +import static io.trino.spi.type.IntegerType.INTEGER; +import static io.trino.spi.type.RealType.REAL; +import static io.trino.spi.type.SmallintType.SMALLINT; +import static io.trino.spi.type.TinyintType.TINYINT; +import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; +import static io.trino.spi.type.VarcharType.createVarcharType; +import static java.lang.Float.floatToRawIntBits; +import static java.lang.Math.max; +import static java.lang.Math.toIntExact; +import static java.lang.String.format; +import static java.lang.String.join; +import static java.util.Locale.ENGLISH; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.joining; +import static org.weakref.jmx.$internal.guava.base.Preconditions.checkArgument; +import static org.weakref.jmx.$internal.guava.base.Strings.emptyToNull; + +public final class DatabendClient + extends BaseJdbcClient +{ + @Inject + public DatabendClient( + BaseJdbcConfig config, + DatabendConfig databendConfig, + ConnectionFactory connectionFactory, + QueryBuilder queryBuilder, + IdentifierMapping identifierMapping, + RemoteQueryModifier queryModifier) + { + super("\"", connectionFactory, queryBuilder, config.getJdbcTypesMappedToVarchar(), identifierMapping, queryModifier, false); + requireNonNull(databendConfig, "databendConfig is null"); + } + + @Override + public boolean supportsTopN(ConnectorSession session, JdbcTableHandle handle, List sortOrder) + { + return true; + } + + @Override + protected Optional topNFunction() + { + return Optional.of((query, sortItems, limit) -> { + String orderBy = sortItems.stream().map(sortItem -> { + String ordering = sortItem.sortOrder().isAscending() ? "ASC" : "DESC"; + String nullsHandling = sortItem.sortOrder().isNullsFirst() ? "NULLS FIRST" : "NULLS LAST"; + return format("%s %s %s", quoted(sortItem.column().getColumnName()), ordering, nullsHandling); + }).collect(joining(", ")); + return format("%s ORDER BY %s LIMIT %d", query, orderBy, limit); + }); + } + + @Override + public boolean isTopNGuaranteed(ConnectorSession session) + { + return true; + } + + @Override + public Optional implementJoin( + ConnectorSession session, + JoinType joinType, + PreparedQuery leftSource, + Map leftProjections, + PreparedQuery rightSource, + Map rightProjections, + List joinConditions, + JoinStatistics statistics) + { + if (joinType == JoinType.FULL_OUTER) { + return Optional.empty(); + } + return implementJoinCostAware( + session, + joinType, + leftSource, + rightSource, + statistics, + () -> super.implementJoin(session, joinType, leftSource, leftProjections, rightSource, rightProjections, joinConditions, statistics)); + } + + @Override + public ResultSet getTables(Connection connection, Optional schemaName, Optional tableName) + throws SQLException + { + DatabaseMetaData metadata = connection.getMetaData(); + if (tableName.isPresent()) { + // Databend maps their "database" to SQL catalogs and does not have schemas + return metadata.getTables(schemaName.orElse(null), null, tableName.get(), getTableTypes().map(types -> types.toArray(String[]::new)).orElse(null)); + } + return metadata.getTables(schemaName.orElse(null), null, tableName.orElse(null), getTableTypes().map(types -> types.toArray(String[]::new)).orElse(null)); + } + + @Override + protected ResultSet getColumns(RemoteTableName remoteTableName, DatabaseMetaData metadata) + throws SQLException + { + // Databend exposes databases via JDBC catalogs and does not use schemas + Optional database = remoteTableName.getSchemaName(); + if (database.isEmpty()) { + database = remoteTableName.getCatalogName(); + } + return metadata.getColumns( + database.orElse(null), + null, + escapeObjectNameForMetadataQuery(remoteTableName.getTableName(), metadata.getSearchStringEscape()), + null); + } + + @Override + public List getColumns(ConnectorSession session, SchemaTableName schemaTableName, RemoteTableName remoteTableName) + { + Map remoteNullability = getRemoteNullability(session, remoteTableName); + return super.getColumns(session, schemaTableName, remoteTableName).stream() + .map(column -> applyRemoteNullability(column, remoteNullability)) + .collect(toImmutableList()); + } + + private Map getRemoteNullability(ConnectorSession session, RemoteTableName remoteTableName) + { + Optional database = remoteTableName.getSchemaName(); + if (database.isEmpty()) { + database = remoteTableName.getCatalogName(); + } + if (database.isEmpty()) { + return Map.of(); + } + try (Connection connection = connectionFactory.openConnection(session); + PreparedStatement statement = connection.prepareStatement(""" + SELECT name, is_nullable + FROM system.columns + WHERE database = ? + AND table = ? + """)) { + statement.setString(1, database.get()); + statement.setString(2, remoteTableName.getTableName()); + try (ResultSet resultSet = statement.executeQuery()) { + ImmutableMap.Builder builder = ImmutableMap.builder(); + while (resultSet.next()) { + builder.put( + resultSet.getString("name").toLowerCase(ENGLISH), + "YES".equalsIgnoreCase(resultSet.getString("is_nullable"))); + } + return builder.buildOrThrow(); + } + } + catch (SQLException e) { + throw new TrinoException(JDBC_ERROR, "Failed to fetch column nullability", e); + } + } + + private JdbcColumnHandle applyRemoteNullability(JdbcColumnHandle column, Map remoteNullability) + { + Boolean remoteNullable = remoteNullability.get(column.getColumnName().toLowerCase(ENGLISH)); + if (remoteNullable == null || remoteNullable == column.isNullable()) { + return column; + } + return JdbcColumnHandle.builderFrom(column) + .setNullable(remoteNullable) + .build(); + } + + @Override + protected String quoted(@Nullable String catalog, @Nullable String schema, String table) + { + StringBuilder sb = new StringBuilder(); + if (!isNullOrEmpty(schema)) { + sb.append(quoted(schema)).append("."); + } + else if (!isNullOrEmpty(catalog)) { + sb.append(quoted(catalog)).append("."); + } + sb.append(quoted(table)); + return sb.toString(); + } + + @Override + protected void copyTableSchema(ConnectorSession session, Connection connection, String catalogName, String schemaName, String tableName, String newTableName, List columnNames) + { + String sql = format( + "CREATE TABLE %s AS SELECT %s FROM %s WHERE 0 = 1", + quoted(catalogName, schemaName, newTableName), + columnNames.stream() + .map(this::quoted) + .collect(joining(", ")), + quoted(catalogName, schemaName, tableName)); + try { + execute(session, connection, sql); + } + catch (SQLException e) { + throw new TrinoException(JDBC_ERROR, e); + } + } + + @Override + public Collection listSchemas(Connection connection) + { + try (ResultSet resultSet = connection.getMetaData().getCatalogs()) { + ImmutableSet.Builder schemaNames = ImmutableSet.builder(); + while (resultSet.next()) { + String schemaName = resultSet.getString("TABLE_CAT"); + if (filterSchema(schemaName)) { + schemaNames.add(schemaName); + } + } + return schemaNames.build(); + } + catch (SQLException e) { + throw new TrinoException(JDBC_ERROR, e); + } + } + + @Override + public Optional getTableComment(ResultSet resultSet) + throws SQLException + { + // Empty remarks means that the table doesn't have a comment in Databend + return Optional.ofNullable(emptyToNull(resultSet.getString("REMARKS"))); + } + + @Override + protected List createTableSqls(RemoteTableName remoteTableName, List columns, ConnectorTableMetadata tableMetadata) + { + ImmutableList.Builder tableOptions = ImmutableList.builder(); + Map tableProperties = tableMetadata.getProperties(); + if (tableMetadata.getComment().isPresent()) { + throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with table comment"); + } + DatabendEngineType engine = DatabendTableProperties.getEngine(tableProperties); + tableOptions.add("ENGINE = " + engine.getEngineType()); + + formatProperty(DatabendTableProperties.getOrderBy(tableProperties)).ifPresent(value -> tableOptions.add("ORDER BY " + value)); + + return ImmutableList.of(format("CREATE TABLE %s (%s) %s", quoted(remoteTableName), join(", ", columns), join(" ", tableOptions.build()))); + } + + /** + * format property to match Databend create table statement + * + * @param prop property will be formatted + * @return formatted property + */ + private Optional formatProperty(List prop) + { + if (prop == null || prop.isEmpty()) { + return Optional.empty(); + } + if (prop.size() == 1) { + // only one column + return Optional.of(quoted(prop.getFirst())); + } + // include more than one column + return Optional.of(prop.stream().map(this::quoted).collect(joining(",", "(", ")"))); + } + + @Override + public Map getTableProperties(ConnectorSession session, JdbcTableHandle tableHandle) + { + try (Connection connection = connectionFactory.openConnection(session)) { + PreparedStatement statement = connection.prepareStatement("SELECT engine " + "FROM system.tables " + "WHERE database = ? AND name = ?"); + statement.setString(1, tableHandle.asPlainTable().getRemoteTableName().getCatalogName().orElse(null)); + statement.setString(2, tableHandle.asPlainTable().getRemoteTableName().getTableName()); + + try (ResultSet resultSet = statement.executeQuery()) { + ImmutableMap.Builder properties = ImmutableMap.builder(); + while (resultSet.next()) { + String engine = resultSet.getString("engine"); + if (!isNullOrEmpty(engine)) { + Optional engineType = Enums.getIfPresent(DatabendEngineType.class, engine.toUpperCase(ENGLISH)).toJavaUtil(); + engineType.ifPresent(type -> properties.put(ENGINE_PROPERTY, type)); + } + } + return properties.buildOrThrow(); + } + } + catch (SQLException e) { + throw new TrinoException(JDBC_ERROR, e); + } + } + + @Override + public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery preparedQuery) + { + try { + return super.getTableHandle(session, preparedQuery); + } + catch (TrinoException e) { + if (e.getErrorCode().equals(NOT_SUPPORTED.toErrorCode()) && + e.getMessage().startsWith("Query not supported: ResultSetMetaData not available for query")) { + throw new TrinoException(JDBC_ERROR, "Failed to get table handle for prepared query. " + e.getMessage(), e); + } + throw e; + } + } + + @Override + public void setTableProperties(ConnectorSession session, JdbcTableHandle handle, Map> nullableProperties) + { + checkArgument(nullableProperties.values().stream().noneMatch(Optional::isEmpty), "Setting a property to null is not supported"); + + if (nullableProperties.isEmpty()) { + return; + } + + throw new TrinoException(NOT_SUPPORTED, "Setting table properties is not supported for Databend tables"); + } + + @Override + protected String getColumnDefinitionSql(ConnectorSession session, ColumnMetadata column, String columnName) + { + if (column.getComment() != null) { + throw new TrinoException(NOT_SUPPORTED, "This connector does not support creating tables with column comment"); + } + StringBuilder sb = new StringBuilder() + .append(quoted(columnName)) + .append(" "); + if (column.isNullable()) { + // set column nullable property explicitly + sb.append("Nullable(").append(toWriteMapping(session, column.getType()).getDataType()).append(")"); + } + else { + // By default, the clickhouse column is not allowed to be null + sb.append(toWriteMapping(session, column.getType()).getDataType()); + } + return sb.toString(); + } + + @Override + protected void createSchema(ConnectorSession session, Connection connection, String remoteSchemaName) + throws SQLException + { + execute(session, connection, "CREATE DATABASE " + quoted(remoteSchemaName)); + } + + @Override + protected void dropSchema(ConnectorSession session, Connection connection, String remoteSchemaName, boolean cascade) + throws SQLException + { + if (!cascade) { + try (ResultSet tables = getTables(connection, Optional.of(remoteSchemaName), Optional.empty())) { + if (tables.next()) { + throw new TrinoException(SCHEMA_NOT_EMPTY, String.format("Cannot drop non-empty schema '%s'", remoteSchemaName)); + } + } + } + execute(session, connection, "DROP DATABASE " + quoted(remoteSchemaName)); + } + + @Override + protected void renameSchema(ConnectorSession session, Connection connection, String remoteSchemaName, String newRemoteSchemaName) + throws SQLException + { + throw new TrinoException(NOT_SUPPORTED, "This connector does not support renaming schemas"); + } + + @Override + public void addColumn(ConnectorSession session, JdbcTableHandle handle, ColumnMetadata column, ColumnPosition position) + { + if (column.getComment() != null) { + throw new TrinoException(NOT_SUPPORTED, "This connector does not support adding columns with comments"); + } + if (!(position instanceof ColumnPosition.Last)) { + if (position instanceof ColumnPosition.First) { + throw new TrinoException(NOT_SUPPORTED, "This connector does not support adding columns with FIRST clause"); + } + throw new TrinoException(NOT_SUPPORTED, "This connector does not support adding columns with AFTER clause"); + } + try (Connection connection = connectionFactory.openConnection(session)) { + String remoteColumnName = getIdentifierMapping().toRemoteColumnName(getRemoteIdentifiers(connection), column.getName()); + String sql = format("ALTER TABLE %s ADD COLUMN %s", quoted(handle.asPlainTable().getRemoteTableName()), getColumnDefinitionSql(session, column, remoteColumnName)); + execute(session, connection, sql); + } + catch (SQLException e) { + throw new TrinoException(JDBC_ERROR, e); + } + } + + @Override + public void setColumnType(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column, Type type) + { + throw new TrinoException(NOT_SUPPORTED, "This connector does not support setting column types"); + } + + @Override + public void dropNotNullConstraint(ConnectorSession session, JdbcTableHandle handle, JdbcColumnHandle column) + { + throw new TrinoException(NOT_SUPPORTED, "This connector does not support dropping a not null constraint"); + } + + @Override + protected Optional> getTableTypes() + { + return Optional.empty(); + } + + @Override + protected void renameTable(ConnectorSession session, Connection connection, String catalogName, String remoteSchemaName, String remoteTableName, String newRemoteSchemaName, String newRemoteTableName) + throws SQLException + { + if (!Objects.equals(remoteSchemaName, newRemoteSchemaName)) { + throw new TrinoException(NOT_SUPPORTED, "This connector does not support renaming tables across schemas"); + } + execute(session, connection, format("RENAME TABLE %s TO %s", quoted(catalogName, remoteSchemaName, remoteTableName), quoted(catalogName, newRemoteSchemaName, newRemoteTableName))); + } + + @Override + protected Optional> limitFunction() + { + return Optional.of((sql, limit) -> sql + " LIMIT " + limit); + } + + @Override + public boolean isLimitGuaranteed(ConnectorSession session) + { + return true; + } + + @Override + public Optional toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle) + { + String jdbcTypeName = typeHandle.jdbcTypeName().orElseThrow(() -> new TrinoException(JDBC_ERROR, "Type name is missing: " + typeHandle)); + + Optional mapping = getForcedMappingToVarchar(typeHandle); + if (mapping.isPresent()) { + return mapping; + } + + switch (jdbcTypeName.toLowerCase(ENGLISH)) { + case "uint8": + return Optional.of(ColumnMapping.longMapping(SMALLINT, ResultSet::getShort, uInt8WriteFunction())); + case "uint16": + return Optional.of(ColumnMapping.longMapping(INTEGER, ResultSet::getInt, uInt16WriteFunction())); + case "uint32": + return Optional.of(ColumnMapping.longMapping(BIGINT, ResultSet::getLong, uInt32WriteFunction())); + case "uint64": + return Optional.of(decimalColumnMapping(createDecimalType(20))); + case "string": + return Optional.of(varcharColumnMapping(typeHandle.requiredColumnSize(), typeHandle.caseSensitivity())); + default: + } + + switch (typeHandle.jdbcType()) { + case Types.TINYINT: + return Optional.of(tinyintColumnMapping()); + + case Types.BOOLEAN: + case Types.BIT: + return Optional.of(StandardColumnMappings.booleanColumnMapping()); + + case Types.SMALLINT: + return Optional.of(smallintColumnMapping()); + + case Types.INTEGER: + return Optional.of(integerColumnMapping()); + + case Types.BIGINT: + return Optional.of(bigintColumnMapping()); + + case Types.FLOAT: + return Optional.of(ColumnMapping.longMapping(REAL, (resultSet, columnIndex) -> floatToRawIntBits(resultSet.getFloat(columnIndex)), realWriteFunction(), DISABLE_PUSHDOWN)); + + case Types.DOUBLE: + return Optional.of(doubleColumnMapping()); + + case Types.VARCHAR: + if (jdbcTypeName.equals("varchar")) { + return Optional.of(varcharColumnMapping(typeHandle.requiredColumnSize(), typeHandle.caseSensitivity())); + } + // Some other Databend types (ARRAY, VARIANT, etc.) are also mapped to Types.VARCHAR, but they're unsupported. + break; + + case Types.DECIMAL: + int decimalDigits = typeHandle.requiredDecimalDigits(); + int precision = typeHandle.requiredColumnSize(); + + ColumnMapping decimalColumnMapping; + if (getDecimalRounding(session) == ALLOW_OVERFLOW && precision > Decimals.MAX_PRECISION) { + int scale = Math.min(decimalDigits, getDecimalDefaultScale(session)); + decimalColumnMapping = decimalColumnMapping(createDecimalType(Decimals.MAX_PRECISION, scale), getDecimalRoundingMode(session)); + } + else { + decimalColumnMapping = decimalColumnMapping(createDecimalType(precision, max(decimalDigits, 0))); + } + return Optional.of(ColumnMapping.mapping(decimalColumnMapping.getType(), decimalColumnMapping.getReadFunction(), decimalColumnMapping.getWriteFunction(), + DISABLE_PUSHDOWN)); // To avoid potential data loss or precision issues during pushdown operations + + case Types.DATE: + return Optional.of(dateColumnMappingUsingLocalDate()); + } + + if (getUnsupportedTypeHandling(session) == CONVERT_TO_VARCHAR) { + return mapToUnboundedVarchar(typeHandle); + } + + return Optional.empty(); + } + + private static ColumnMapping varcharColumnMapping(int varcharLength, Optional caseSensitivity) + { + VarcharType varcharType = varcharLength <= VarcharType.MAX_LENGTH ? createVarcharType(varcharLength) : createUnboundedVarcharType(); + return StandardColumnMappings.varcharColumnMapping(varcharType, caseSensitivity.orElse(CASE_INSENSITIVE) == CASE_SENSITIVE); + } + + @Override + public WriteMapping toWriteMapping(ConnectorSession session, Type type) + { + if (type == BOOLEAN) { + return WriteMapping.booleanMapping("Boolean", booleanWriteFunction()); + } + if (type == TINYINT) { + return WriteMapping.longMapping("Int8", tinyintWriteFunction()); + } + if (type == SMALLINT) { + return WriteMapping.longMapping("Int16", smallintWriteFunction()); + } + if (type == INTEGER) { + return WriteMapping.longMapping("Int32", integerWriteFunction()); + } + if (type == BIGINT) { + return WriteMapping.longMapping("Int64", bigintWriteFunction()); + } + if (type == REAL) { + return WriteMapping.longMapping("Float32", realWriteFunction()); + } + if (type == DOUBLE) { + return WriteMapping.doubleMapping("Float64", doubleWriteFunction()); + } + if (type instanceof DecimalType decimalType) { + String dataType = format("Decimal(%s, %s)", decimalType.getPrecision(), decimalType.getScale()); + if (decimalType.isShort()) { + return WriteMapping.longMapping(dataType, shortDecimalWriteFunction(decimalType)); + } + return WriteMapping.objectMapping(dataType, longDecimalWriteFunction(decimalType)); + } + if (type instanceof CharType || type instanceof VarcharType) { + // The String type replaces the types VARCHAR, BLOB, CLOB, and others from other DBMSs. + return WriteMapping.sliceMapping("String", varcharWriteFunction()); + } + if (type instanceof VarbinaryType) { + // Strings of an arbitrary length. The length is not limited + return WriteMapping.sliceMapping("String", varbinaryWriteFunction()); + } + if (type == DATE) { + return WriteMapping.longMapping("Date", dateWriteFunctionUsingLocalDate()); + } + throw new TrinoException(NOT_SUPPORTED, "Unsupported column type: " + type); + } + + private static LongWriteFunction uInt8WriteFunction() + { + return (statement, index, value) -> { + statement.setShort(index, Shorts.checkedCast(value)); + }; + } + + private static LongWriteFunction uInt16WriteFunction() + { + return (statement, index, value) -> { + statement.setInt(index, toIntExact(value)); + }; + } + + private static LongWriteFunction uInt32WriteFunction() + { + return PreparedStatement::setLong; + } + + private static ColumnMapping dateColumnMappingUsingLocalDate() + { + return ColumnMapping.longMapping(DATE, databendDateReadFunction(), dateWriteFunctionUsingLocalDate()); + } + + private static LongReadFunction databendDateReadFunction() + { + return new LongReadFunction() { + @Override + public boolean isNull(ResultSet resultSet, int columnIndex) + throws SQLException + { + resultSet.getObject(columnIndex); + return resultSet.wasNull(); + } + + @Override + public long readLong(ResultSet resultSet, int columnIndex) + throws SQLException + { + Object value = resultSet.getObject(columnIndex); + if (value == null) { + throw new TrinoException(JDBC_ERROR, "Driver returned null LocalDate for a non-null value"); + } + if (value instanceof LocalDate localDate) { + return localDate.toEpochDay(); + } + if (value instanceof Date date) { + return date.toLocalDate().toEpochDay(); + } + return LocalDate.parse(value.toString()).toEpochDay(); + } + }; + } + + private static LongWriteFunction dateWriteFunctionUsingLocalDate() + { + return (statement, index, value) -> { + LocalDate date = LocalDate.ofEpochDay(value); + statement.setObject(index, date); + }; + } +} diff --git a/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendClientModule.java b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendClientModule.java new file mode 100644 index 000000000000..bde1c06e0d81 --- /dev/null +++ b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendClientModule.java @@ -0,0 +1,66 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.databend; + +import com.databend.jdbc.DatabendDriver; +import com.google.inject.Binder; +import com.google.inject.Provides; +import com.google.inject.Scopes; +import com.google.inject.Singleton; +import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.opentelemetry.api.OpenTelemetry; +import io.trino.plugin.jdbc.BaseJdbcConfig; +import io.trino.plugin.jdbc.ConnectionFactory; +import io.trino.plugin.jdbc.DecimalConfig; +import io.trino.plugin.jdbc.DriverConnectionFactory; +import io.trino.plugin.jdbc.ForBaseJdbc; +import io.trino.plugin.jdbc.JdbcClient; +import io.trino.plugin.jdbc.JdbcStatisticsConfig; +import io.trino.plugin.jdbc.credential.CredentialProvider; + +import java.util.Properties; + +import static io.airlift.configuration.ConfigBinder.configBinder; +import static io.trino.plugin.jdbc.JdbcModule.bindSessionPropertiesProvider; +import static io.trino.plugin.jdbc.JdbcModule.bindTablePropertiesProvider; + +public final class DatabendClientModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + binder.bind(JdbcClient.class).annotatedWith(ForBaseJdbc.class).to(DatabendClient.class).in(Scopes.SINGLETON); + configBinder(binder).bindConfig(JdbcStatisticsConfig.class); + configBinder(binder).bindConfig(DatabendConfig.class); + configBinder(binder).bindConfig(DecimalConfig.class); + bindSessionPropertiesProvider(binder, DatabendSessionProperties.class); + bindTablePropertiesProvider(binder, DatabendTableProperties.class); + } + + @Provides + @Singleton + @ForBaseJdbc + public static ConnectionFactory connectionFactory(BaseJdbcConfig config, CredentialProvider credentialProvider, OpenTelemetry openTelemetry) + { + Properties connectionProperties = new Properties(); + return DriverConnectionFactory.builder( + new DatabendDriver(), + config.getConnectionUrl(), + credentialProvider) + .setConnectionProperties(connectionProperties) + .setOpenTelemetry(openTelemetry) + .build(); + } +} diff --git a/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendConfig.java b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendConfig.java new file mode 100644 index 000000000000..86f28d514e18 --- /dev/null +++ b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendConfig.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.databend; + +import io.airlift.configuration.Config; +import io.airlift.configuration.ConfigDescription; +import io.airlift.units.Duration; + +import java.util.concurrent.TimeUnit; + +public class DatabendConfig +{ + private Duration connectionTimeout = new Duration(60, TimeUnit.SECONDS); + + public Duration getConnectionTimeout() + { + return connectionTimeout; + } + + @ConfigDescription("Connection timeout") + @Config("databend.connection-timeout") + public DatabendConfig setConnectionTimeout(Duration connectionTimeout) + { + this.connectionTimeout = connectionTimeout; + return this; + } +} diff --git a/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendEngineType.java b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendEngineType.java new file mode 100644 index 000000000000..fcc2a53fd413 --- /dev/null +++ b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendEngineType.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.databend; + +public enum DatabendEngineType +{ + FUSE("FUSE"); + + private final String engineType; + + DatabendEngineType(String engineType) + { + this.engineType = engineType; + } + + public String getEngineType() + { + return this.engineType; + } +} diff --git a/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendPlugin.java b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendPlugin.java new file mode 100644 index 000000000000..bb081159f872 --- /dev/null +++ b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendPlugin.java @@ -0,0 +1,25 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.databend; + +import io.trino.plugin.jdbc.JdbcPlugin; + +public final class DatabendPlugin + extends JdbcPlugin +{ + public DatabendPlugin() + { + super("databend", DatabendClientModule::new); + } +} diff --git a/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendSessionProperties.java b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendSessionProperties.java new file mode 100644 index 000000000000..1c690c7d878a --- /dev/null +++ b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendSessionProperties.java @@ -0,0 +1,40 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.databend; + +import com.google.inject.Inject; +import io.trino.plugin.base.session.SessionPropertiesProvider; +import io.trino.plugin.jdbc.DecimalConfig; +import io.trino.plugin.jdbc.DecimalSessionSessionProperties; +import io.trino.spi.session.PropertyMetadata; + +import java.util.List; + +public final class DatabendSessionProperties + implements SessionPropertiesProvider +{ + private final List> sessionProperties; + + @Inject + public DatabendSessionProperties(DecimalConfig decimalConfig) + { + sessionProperties = new DecimalSessionSessionProperties(decimalConfig).getSessionProperties(); + } + + @Override + public List> getSessionProperties() + { + return sessionProperties; + } +} diff --git a/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendTableProperties.java b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendTableProperties.java new file mode 100644 index 000000000000..5a15c8c605f6 --- /dev/null +++ b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendTableProperties.java @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.databend; + +import com.google.common.collect.ImmutableList; +import com.google.inject.Inject; +import io.trino.plugin.jdbc.TablePropertiesProvider; +import io.trino.spi.session.PropertyMetadata; +import io.trino.spi.type.ArrayType; + +import java.util.List; +import java.util.Map; + +import static io.trino.spi.session.PropertyMetadata.enumProperty; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static java.util.Objects.requireNonNull; + +public final class DatabendTableProperties + implements TablePropertiesProvider +{ + public static final String ENGINE_PROPERTY = "engine"; + public static final String ORDER_BY_PROPERTY = "order_by"; //required + + private final List> tableProperties; + + @Inject + public DatabendTableProperties() + { + tableProperties = ImmutableList.>builder() + .add(enumProperty( + ENGINE_PROPERTY, + "Databend Table Engine, defaults to Log", + DatabendEngineType.class, + DatabendEngineType.FUSE, // FUSE is default engine of Databend which optimized for both read and write operations with improved indexing and compression. + false)) + .add(new PropertyMetadata<>( + ORDER_BY_PROPERTY, + "columns to be the sorting key, it's required for table MergeTree engine family", + new ArrayType(VARCHAR), + List.class, + ImmutableList.of(), + false, + value -> (List) value, + value -> value)) + .build(); + } + + public static DatabendEngineType getEngine(Map tableProperties) + { + requireNonNull(tableProperties, "tableProperties is null"); + DatabendEngineType engine = (DatabendEngineType) tableProperties.get(ENGINE_PROPERTY); + return engine != null ? engine : DatabendEngineType.FUSE; + } + + public static List getOrderBy(Map tableProperties) + { + requireNonNull(tableProperties, "tableProperties is null"); + @SuppressWarnings("unchecked") + List orderBy = (List) tableProperties.get("order_by"); + if (orderBy == null) { + return ImmutableList.of(); + } + return orderBy; + } + + @Override + public List> getTableProperties() + { + return tableProperties; + } +} diff --git a/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendUtil.java b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendUtil.java new file mode 100644 index 000000000000..2f1b80c58156 --- /dev/null +++ b/plugin/trino-databend/src/main/java/io/trino/plugin/databend/DatabendUtil.java @@ -0,0 +1,44 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.databend; + +public final class DatabendUtil +{ + private DatabendUtil() {} + + public static String convertToQuotedString(Object value) + { + return value == null ? "NULL" : '\'' + escape(value.toString(), '\'') + '\''; + } + + private static String escape(String str, char quote) + { + if (str == null) { + return str; + } + + int len = str.length(); + StringBuilder sb = new StringBuilder(len + 10); + + for (int i = 0; i < len; ++i) { + char ch = str.charAt(i); + if (ch == quote || ch == '\\') { + sb.append('\\'); + } + sb.append(ch); + } + + return sb.toString(); + } +} diff --git a/plugin/trino-databend/src/test/java/io/trino/plugin/databend/DatabendQueryRunner.java b/plugin/trino-databend/src/test/java/io/trino/plugin/databend/DatabendQueryRunner.java new file mode 100644 index 000000000000..bb4163097ea1 --- /dev/null +++ b/plugin/trino-databend/src/test/java/io/trino/plugin/databend/DatabendQueryRunner.java @@ -0,0 +1,77 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.databend; + +import com.google.common.collect.ImmutableMap; +import io.trino.plugin.tpch.TpchPlugin; +import io.trino.testing.DistributedQueryRunner; +import io.trino.testing.QueryRunner; +import io.trino.tpch.TpchTable; + +import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; +import java.util.TimeZone; + +import static io.trino.plugin.tpch.TpchMetadata.TINY_SCHEMA_NAME; +import static io.trino.spi.type.TimeZoneKey.getTimeZoneKey; +import static io.trino.testing.QueryAssertions.copyTpchTables; +import static io.trino.testing.TestingSession.testSessionBuilder; + +public final class DatabendQueryRunner +{ + private DatabendQueryRunner() {} + + public static QueryRunner createDatabendQueryRunner( + TestingDatabendServer server, + Map extraProperties, + Map connectorProperties) + throws Exception + { + System.setProperty("user.timezone", "UTC"); + TimeZone.setDefault(TimeZone.getTimeZone(ZoneId.of("UTC"))); + + QueryRunner queryRunner = DistributedQueryRunner.builder( + testSessionBuilder() + .setCatalog("databend") + .setSchema("default") + .setTimeZoneKey(getTimeZoneKey("UTC")) + .build()) + .setExtraProperties(extraProperties) + .build(); + + try { + queryRunner.installPlugin(new TpchPlugin()); + queryRunner.createCatalog("tpch", "tpch"); + + Map properties = new HashMap<>(ImmutableMap.builder() + .put("connection-url", server.getJdbcUrl()) + .put("connection-user", server.getUser()) + .put("connection-password", server.getPassword()) + .buildOrThrow()); + properties.putAll(connectorProperties); + + queryRunner.installPlugin(new DatabendPlugin()); + queryRunner.createCatalog("databend", "databend", properties); + + copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, TpchTable.getTables()); + + return queryRunner; + } + catch (Throwable e) { + queryRunner.close(); + throw e; + } + } +} diff --git a/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestDatabendConnectorTest.java b/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestDatabendConnectorTest.java new file mode 100644 index 000000000000..cf784b1d6951 --- /dev/null +++ b/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestDatabendConnectorTest.java @@ -0,0 +1,499 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.databend; + +import io.trino.plugin.jdbc.BaseJdbcConnectorTest; +import io.trino.testing.MaterializedResult; +import io.trino.testing.QueryRunner; +import io.trino.testing.TestingConnectorBehavior; +import io.trino.testing.sql.SqlExecutor; +import io.trino.testing.sql.TestTable; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; + +import java.util.Locale; +import java.util.Optional; + +import static com.google.common.base.Strings.nullToEmpty; +import static io.trino.spi.type.VarcharType.VARCHAR; +import static io.trino.testing.MaterializedResult.resultBuilder; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static java.lang.String.format; +import static java.util.Collections.emptyMap; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assumptions.abort; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; + +@TestInstance(PER_CLASS) +public class TestDatabendConnectorTest + extends BaseJdbcConnectorTest +{ + private TestingDatabendServer databendServer; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + databendServer = closeAfterClass(new TestingDatabendServer()); + return DatabendQueryRunner.createDatabendQueryRunner(databendServer, emptyMap(), emptyMap()); + } + + @Override + protected SqlExecutor onRemoteDatabase() + { + return databendServer::execute; + } + + @Override + protected boolean hasBehavior(TestingConnectorBehavior behavior) + { + return switch (behavior) { + case SUPPORTS_ARRAY, + SUPPORTS_MAP_TYPE, + SUPPORTS_ROW_TYPE, + SUPPORTS_DELETE, + SUPPORTS_ROW_LEVEL_DELETE, + SUPPORTS_UPDATE, + SUPPORTS_ROW_LEVEL_UPDATE, + SUPPORTS_RENAME_SCHEMA, + SUPPORTS_ADD_COLUMN_WITH_POSITION, + SUPPORTS_ADD_COLUMN_WITH_COMMENT, + SUPPORTS_NOT_NULL_CONSTRAINT, + SUPPORTS_ADD_COLUMN_NOT_NULL_CONSTRAINT, + SUPPORTS_DEFAULT_COLUMN_VALUE, + SUPPORTS_DROP_NOT_NULL_CONSTRAINT, + SUPPORTS_COMMENT_ON_COLUMN, + SUPPORTS_COMMENT_ON_TABLE, + SUPPORTS_CREATE_TABLE_WITH_COLUMN_COMMENT, + SUPPORTS_CREATE_TABLE_WITH_TABLE_COMMENT, + SUPPORTS_SET_COLUMN_TYPE, + SUPPORTS_RENAME_TABLE_ACROSS_SCHEMAS, + SUPPORTS_NATIVE_QUERY, + SUPPORTS_AGGREGATION_PUSHDOWN, + SUPPORTS_AGGREGATION_PUSHDOWN_STDDEV, + SUPPORTS_AGGREGATION_PUSHDOWN_VARIANCE, + SUPPORTS_AGGREGATION_PUSHDOWN_COVARIANCE, + SUPPORTS_AGGREGATION_PUSHDOWN_CORRELATION, + SUPPORTS_AGGREGATION_PUSHDOWN_REGRESSION, + SUPPORTS_AGGREGATION_PUSHDOWN_COUNT_DISTINCT, + SUPPORTS_CREATE_VIEW, + SUPPORTS_CREATE_MATERIALIZED_VIEW, + SUPPORTS_NEGATIVE_DATE -> false; + default -> super.hasBehavior(behavior); + }; + } + + @Test + @Override + public void testShowColumns() + { + assertThat(computeActual("SHOW COLUMNS FROM orders")) + .isEqualTo(getDescribeOrdersResult()); + } + + @Override + protected MaterializedResult getDescribeOrdersResult() + { + return resultBuilder(getSession(), VARCHAR, VARCHAR, VARCHAR, VARCHAR) + .row("orderkey", "bigint", "", "") + .row("custkey", "bigint", "", "") + .row("orderstatus", "varchar", "", "") + .row("totalprice", "double", "", "") + .row("orderdate", "date", "", "") + .row("orderpriority", "varchar", "", "") + .row("clerk", "varchar", "", "") + .row("shippriority", "integer", "", "") + .row("comment", "varchar", "", "") + .build(); + } + + @Override + protected Optional filterColumnNameTestData(String columnName) + { + // Databend converts column names to lowercase + return Optional.of(columnName.toLowerCase(Locale.ROOT)); + } + + @Override + protected Optional filterDataMappingSmokeTestData(DataMappingTestSetup setup) + { + String type = setup.getTrinoTypeName(); + if (type.equals("time") || type.startsWith("time(")) { + return Optional.of(setup.asUnsupported()); + } + return Optional.of(setup); + } + + @Override + protected boolean isColumnNameRejected(Exception exception, String columnName, boolean delimited) + { + String message = nullToEmpty(exception.getMessage()); + return message.contains("unable to recognize the rest tokens"); + } + + @Test + @Override + public void testAddColumn() + { + String tableName; + try (TestTable table = newTrinoTable("test_add_column_", tableDefinitionForAddColumn())) { + tableName = table.getName(); + assertUpdate("INSERT INTO " + table.getName() + " SELECT 'first'", 1); + assertQueryFails("ALTER TABLE " + table.getName() + " ADD COLUMN x bigint", ".* Column 'x' already exists"); + assertQueryFails("ALTER TABLE " + table.getName() + " ADD COLUMN X bigint", ".* Column 'X' already exists"); + + assertUpdate("ALTER TABLE " + table.getName() + " ADD COLUMN a varchar(50)"); + assertQuery( + "SELECT * FROM " + table.getName(), + "VALUES ('first', NULL)"); + assertQuery( + "SELECT * FROM " + table.getName() + " WHERE a IS NULL", + "VALUES ('first', NULL)"); + } + } + + @Test + @Override // Overridden because the default storage type doesn't support adding columns + public void testAddNotNullColumnToEmptyTable() + { + try (TestTable table = newTrinoTable("test_add_notnull_col_to_empty", "(a_varchar varchar NOT NULL)")) { + String tableName = table.getName(); + + assertUpdate("ALTER TABLE " + tableName + " ADD COLUMN b_varchar varchar NOT NULL"); + assertUpdate("INSERT INTO " + tableName + " VALUES ('a', 'b')", 1); + assertThat(query("TABLE " + tableName)) + .skippingTypesCheck() + .matches("VALUES ('a', 'b')"); + } + } + + @Test + @Override + public void testDropNotNullConstraint() + { + abort("Databend Not Support"); + } + + @Test + @Override + public void testInsertIntoNotNullColumn() + { + try (TestTable table = newTrinoTable("test_insert_not_null_", "(nullable_col INTEGER, not_null_col INTEGER NOT NULL)")) { + assertUpdate(format("INSERT INTO %s (not_null_col) VALUES (2)", table.getName()), 1); + assertQuery("SELECT * FROM " + table.getName(), "VALUES (NULL, 2)"); + } + + try (TestTable table = newTrinoTable("test_commuted_not_null_table", "(nullable_col BIGINT, not_null_col BIGINT NOT NULL)")) { + assertUpdate(format("INSERT INTO %s (not_null_col) VALUES (2)", table.getName()), 1); + assertQuery("SELECT * FROM " + table.getName(), "VALUES (NULL, 2)"); + } + } + + @Test + @Override + public void testUpdateNotNullColumn() + { + abort("Databend rejects updates that temporarily assign NULL to NOT NULL columns"); + } + + @Test + @Override + public void testColumnName() + { + abort("Databend rejects some generated column names used in the generic test"); + } + + @Test + @Override + public void testShowCreateTable() + { + String catalog = getSession().getCatalog().orElseThrow(); + String schema = getSession().getSchema().orElseThrow(); + assertThat(computeScalar("SHOW CREATE TABLE orders")) + .isEqualTo(format( + """ + CREATE TABLE %s.%s.orders ( + orderkey bigint, + custkey bigint, + orderstatus varchar, + totalprice double, + orderdate date, + orderpriority varchar, + clerk varchar, + shippriority integer, + comment varchar + ) + WITH ( + engine = 'FUSE' + )\ + """, + catalog, + schema)); + } + + @Test + @Override + public void testDateYearOfEraPredicate() + { + assertQuery("SELECT orderdate FROM orders WHERE orderdate = DATE '1997-09-14'", "VALUES DATE '1997-09-14'"); + assertQueryFails("SELECT * FROM orders WHERE orderdate = DATE '-1996-09-14'", invalidDateReadError("-1996-09-14")); + } + + @Override + protected String errorMessageForInsertIntoNotNullColumn(String columnName) + { + return format("(?s).*%s.*", columnName); + } + + @Override + protected String errorMessageForInsertNegativeDate(String date) + { + return invalidDateWriteError(date); + } + + @Override + protected String errorMessageForCreateTableAsSelectNegativeDate(String date) + { + return invalidDateWriteError(date); + } + + @Override + protected TestTable createTableWithDefaultColumns() + { + String schema = getSession().getSchema().orElseThrow(); + return new TestTable( + onRemoteDatabase(), + schema + ".test_default_cols", + "(col_required BIGINT NOT NULL," + + "col_nullable BIGINT," + + "col_default BIGINT DEFAULT 43," + + "col_nonnull_default BIGINT NOT NULL DEFAULT 42," + + "col_required2 BIGINT NOT NULL)"); + } + + @Test + @Override + public void testCharVarcharComparison() + { + assertThatThrownBy(super::testCharVarcharComparison) + .isInstanceOf(AssertionError.class); + abort("Databend trims CHAR values differently than Trino"); + } + + @Test + @Override + public void testVarcharCharComparison() + { + assertThatThrownBy(super::testVarcharCharComparison) + .isInstanceOf(AssertionError.class); + abort("Databend trims CHAR values differently than Trino"); + } + + @Test + @Override + public void testCharTrailingSpace() + { + assertThatThrownBy(super::testCharTrailingSpace) + .isInstanceOf(AssertionError.class); + abort("Databend trims CHAR values differently than Trino"); + } + + @Test + @Override + public void testExecuteProcedure() + { + assertThatThrownBy(super::testExecuteProcedure) + .isInstanceOf(AssertionError.class); + abort("system.execute is not supported by Databend"); + } + + @Test + @Override + public void testExecuteProcedureWithInvalidQuery() + { + assertThatThrownBy(super::testExecuteProcedureWithInvalidQuery) + .isInstanceOf(AssertionError.class); + abort("system.execute is not supported by Databend"); + } + + @Test + @Override + public void testExecuteProcedureWithNamedArgument() + { + assertThatThrownBy(super::testExecuteProcedureWithNamedArgument) + .isInstanceOf(AssertionError.class); + abort("system.execute is not supported by Databend"); + } + + @Override + @Test + public void testInsert() + { + String tableName = "test_insert_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (id BIGINT, name VARCHAR)"); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 'test')", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (1, 'test')"); + assertUpdate("DROP TABLE " + tableName); + } + + @Override + @Test + public void testCreateTable() + { + String tableName = "test_create_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (id BIGINT, name VARCHAR, price DOUBLE)"); + assertThat(computeActual("SHOW TABLES LIKE '" + tableName + "'").getRowCount()) + .isEqualTo(1); + assertUpdate("DROP TABLE " + tableName); + } + + @Override + @Test + public void testTruncateTable() + { + String tableName = "test_truncate_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25); + assertUpdate("TRUNCATE TABLE " + tableName); + assertQuery("SELECT COUNT(*) FROM " + tableName, "SELECT 0"); + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testBasicDataTypes() + { + String tableName = "test_types_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (" + + "bool_col BOOLEAN, " + + "tinyint_col TINYINT, " + + "smallint_col SMALLINT, " + + "int_col INTEGER, " + + "bigint_col BIGINT, " + + "real_col REAL, " + + "double_col DOUBLE, " + + "varchar_col VARCHAR, " + + "date_col DATE)"); + + assertUpdate("INSERT INTO " + tableName + " VALUES (" + + "true, " + + "127, " + + "32767, " + + "2147483647, " + + "9223372036854775807, " + + "REAL '3.14', " + + "2.718, " + + "'test string', " + + "DATE '2024-01-01')", 1); + + assertQuery("SELECT * FROM " + tableName, + "VALUES (true, 127, 32767, 2147483647, 9223372036854775807, CAST(3.14 AS real), 2.718, 'test string', DATE '2024-01-01')"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Test + public void testDecimalType() + { + String tableName = "test_decimal_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " (decimal_col DECIMAL(10, 2))"); + assertUpdate("INSERT INTO " + tableName + " VALUES (123.45)", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (123.45)"); + assertUpdate("DROP TABLE " + tableName); + } + + @Override + @Test + public void testAggregationPushdown() + { + String tableName = "test_agg_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25); + + assertQuery("SELECT COUNT(*) FROM " + tableName, "SELECT 25"); + assertQuery("SELECT COUNT(regionkey) FROM " + tableName, "SELECT 25"); + assertQuery("SELECT MIN(regionkey) FROM " + tableName, "SELECT 0"); + assertQuery("SELECT MAX(regionkey) FROM " + tableName, "SELECT 4"); + assertQuery("SELECT SUM(regionkey) FROM " + tableName, "SELECT 50"); + + assertUpdate("DROP TABLE " + tableName); + } + + @Override + @Test + public void testLimitPushdown() + { + String tableName = "test_limit_" + randomNameSuffix(); + assertUpdate("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.tiny.nation", 25); + + assertThat(computeActual("SELECT * FROM " + tableName + " LIMIT 10").getRowCount()) + .isEqualTo(10); + assertThat(computeActual("SELECT * FROM " + tableName + " LIMIT 5").getRowCount()) + .isEqualTo(5); + + assertUpdate("DROP TABLE " + tableName); + } + + @Override + protected void verifyConcurrentAddColumnFailurePermissible(Exception e) + { + if (e.getMessage() != null && e.getMessage().contains("QueryErrors{code=2009")) { + return; + } + super.verifyConcurrentAddColumnFailurePermissible(e); + } + + @Test + @Override + public void verifySupportsDeleteDeclaration() + { + abort("Databend executes DELETE statements instead of rejecting them for unsupported connectors"); + } + + @Test + @Override + public void verifySupportsRowLevelDeleteDeclaration() + { + abort("Databend executes DELETE statements instead of rejecting them for unsupported connectors"); + } + + @Test + @Override + public void verifySupportsUpdateDeclaration() + { + abort("Databend executes UPDATE statements instead of rejecting them for unsupported connectors"); + } + + @Test + @Override + public void testDataMappingSmokeTest() + { + abort("Databend does not support all TIME/TIMESTAMP combinations exercised by the generic smoke test"); + } + + @Test + @Override + public void testInsertWithoutTemporaryTable() + { + abort("Databend requires elevated privileges to inspect written tables, so the temporary-table bypass test cannot run"); + } + + private static String invalidDateWriteError(String date) + { + return format("(?s).*Invalid value '%s'.*", date); + } + + private static String invalidDateReadError(String date) + { + return format("(?s).*cannot parse to type `DATE`.*%s.*", date); + } +} diff --git a/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestDatabendPlugin.java b/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestDatabendPlugin.java new file mode 100644 index 000000000000..79adf3663627 --- /dev/null +++ b/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestDatabendPlugin.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.databend; + +import com.google.common.collect.ImmutableMap; +import io.trino.spi.Plugin; +import io.trino.spi.connector.Connector; +import io.trino.spi.connector.ConnectorFactory; +import io.trino.testing.TestingConnectorContext; +import org.junit.jupiter.api.Test; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestDatabendPlugin +{ + @Test + public void testCreateConnector() + { + Plugin plugin = new DatabendPlugin(); + ConnectorFactory factory = getOnlyElement(plugin.getConnectorFactories()); + Connector connector = factory.create( + "test", + ImmutableMap.of( + "connection-url", "jdbc:databend://localhost:8000/", + "connection-user", "databend", + "connection-password", "databend"), + new TestingConnectorContext()); + assertThat(connector).isNotNull(); + connector.shutdown(); + } +} diff --git a/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestDatabendTypeMapping.java b/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestDatabendTypeMapping.java new file mode 100644 index 000000000000..ea0dac534b91 --- /dev/null +++ b/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestDatabendTypeMapping.java @@ -0,0 +1,130 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.databend; + +import io.trino.Session; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.QueryRunner; +import io.trino.testing.sql.SqlExecutor; +import org.junit.jupiter.api.Test; + +import java.util.Locale; + +import static java.lang.String.format; +import static java.util.Collections.emptyMap; + +public class TestDatabendTypeMapping + extends AbstractTestQueryFramework +{ + private TestingDatabendServer databendServer; + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + databendServer = closeAfterClass(new TestingDatabendServer()); + return DatabendQueryRunner.createDatabendQueryRunner(databendServer, emptyMap(), emptyMap()); + } + + protected SqlExecutor onRemoteDatabase() + { + return databendServer::execute; + } + + @Test + public void testBoolean() + { + // Databend supports BOOLEAN type + SqlExecutor jdbcSqlExecutor = onRemoteDatabase(); + Session session = getSession(); + + jdbcSqlExecutor.execute("CREATE TABLE test_boolean_type (boolean_column BOOLEAN)"); + assertTypeMapping(session, jdbcSqlExecutor, "BOOLEAN", "true", "BOOLEAN"); + assertTypeMapping(session, jdbcSqlExecutor, "BOOLEAN", "false", "BOOLEAN"); + jdbcSqlExecutor.execute("DROP TABLE test_boolean_type"); + } + + @Test + public void testTinyint() + { + // Databend supports TINYINT + SqlExecutor jdbcSqlExecutor = onRemoteDatabase(); + Session session = getSession(); + + jdbcSqlExecutor.execute("CREATE TABLE test_tinyint_type (tinyint_column TINYINT)"); + assertTypeMapping(session, jdbcSqlExecutor, "TINYINT", "127", "TINYINT"); + assertTypeMapping(session, jdbcSqlExecutor, "TINYINT", "-128", "TINYINT"); + jdbcSqlExecutor.execute("DROP TABLE test_tinyint_type"); + } + + @Test + public void testSmallint() + { + // Databend supports SMALLINT + SqlExecutor jdbcSqlExecutor = onRemoteDatabase(); + Session session = getSession(); + + jdbcSqlExecutor.execute("CREATE TABLE test_smallint_type (smallint_column SMALLINT)"); + assertTypeMapping(session, jdbcSqlExecutor, "SMALLINT", "32767", "SMALLINT"); + assertTypeMapping(session, jdbcSqlExecutor, "SMALLINT", "-32768", "SMALLINT"); + jdbcSqlExecutor.execute("DROP TABLE test_smallint_type"); + } + + @Test + public void testInteger() + { + // Databend supports INTEGER + SqlExecutor jdbcSqlExecutor = onRemoteDatabase(); + Session session = getSession(); + + jdbcSqlExecutor.execute("CREATE TABLE test_integer_type (integer_column INTEGER)"); + assertTypeMapping(session, jdbcSqlExecutor, "INTEGER", "2147483647", "INTEGER"); + assertTypeMapping(session, jdbcSqlExecutor, "INTEGER", "-2147483648", "INTEGER"); + jdbcSqlExecutor.execute("DROP TABLE test_integer_type"); + } + + @Test + public void testBigint() + { + // Databend supports BIGINT + SqlExecutor jdbcSqlExecutor = onRemoteDatabase(); + Session session = getSession(); + + jdbcSqlExecutor.execute("CREATE TABLE test_bigint_type (bigint_column BIGINT)"); + assertTypeMapping(session, jdbcSqlExecutor, "BIGINT", "9223372036854775807", "BIGINT"); + assertTypeMapping(session, jdbcSqlExecutor, "BIGINT", "-9223372036854775808", "BIGINT"); + jdbcSqlExecutor.execute("DROP TABLE test_bigint_type"); + } + + private void assertTypeMapping(Session session, SqlExecutor remoteExecutor, String remoteType, String remoteValue, String expectedTrinoType) + { + String baseName = remoteType.toLowerCase(Locale.ENGLISH); + String tableName = format("test_%s_type", baseName); + String columnName = format("%s_column", baseName); + + remoteExecutor.execute(format("INSERT INTO %s (%s) VALUES (%s)", tableName, columnName, remoteValue)); + + assertQuery( + session, + format("SELECT UPPER(typeof(%s)) FROM %s LIMIT 1", columnName, tableName), + format("VALUES CAST('%s' AS varchar)", expectedTrinoType)); + + assertQuery( + session, + format("SELECT %s FROM %s", columnName, tableName), + format("VALUES (%s)", remoteValue)); + + remoteExecutor.execute(format("DELETE FROM %s", tableName)); + } +} diff --git a/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestingDatabendServer.java b/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestingDatabendServer.java new file mode 100644 index 000000000000..148a57827e90 --- /dev/null +++ b/plugin/trino-databend/src/test/java/io/trino/plugin/databend/TestingDatabendServer.java @@ -0,0 +1,220 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.databend; + +import org.testcontainers.databend.DatabendContainer; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; + +import static java.util.Objects.requireNonNull; + +public class TestingDatabendServer + implements Closeable +{ + private static final DockerImageName DATABEND_IMAGE = DockerImageName.parse("datafuselabs/databend:nightly"); + private static final String QUERY_CONFIG_PATH = "/databend-config/query.toml"; + + private final DatabendContainer databendContainer; + private final String jdbcUrl; + + public TestingDatabendServer() + { + DatabendContainer started = null; + String readyJdbcUrl = null; + boolean success = false; + try { + Path configFile = createConfigFile(buildDatabendConfig()); + + started = new DatabendContainer(DATABEND_IMAGE) + .withEnv("QUERY_DEFAULT_USER", getUser()) + .withEnv("QUERY_DEFAULT_PASSWORD", getPassword()) + .withEnv("QUERY_CONFIG_FILE", QUERY_CONFIG_PATH) + .withCopyFileToContainer(MountableFile.forHostPath(configFile), QUERY_CONFIG_PATH); + + started.start(); + readyJdbcUrl = ensureReadyAndGetJdbcUrl(started); + success = true; + } + catch (IOException e) { + throw new RuntimeException("Failed to prepare Databend config", e); + } + finally { + if (!success) { + closeQuietly(started); + } + } + + this.databendContainer = started; + this.jdbcUrl = readyJdbcUrl; + } + + public String getJdbcUrl() + { + return jdbcUrl; + } + + public String getUser() + { + return "root"; + } + + public String getPassword() + { + return ""; + } + + public void execute(String sql) + { + try (Connection connection = DriverManager.getConnection(jdbcUrl, getUser(), getPassword()); + Statement statement = connection.createStatement()) { + statement.execute(sql); + } + catch (SQLException e) { + throw new RuntimeException("Failed to execute SQL: " + sql, e); + } + } + + @Override + public void close() + { + closeQuietly(databendContainer); + } + + private String ensureReadyAndGetJdbcUrl(DatabendContainer container) + { + String url = withPresignDisabled(container.getJdbcUrl()); + requireNonNull(url, "Databend JDBC URL is null"); + + long deadline = System.nanoTime() + Duration.ofMinutes(2).toNanos(); + + while (true) { + try (Connection connection = DriverManager.getConnection(url, getUser(), getPassword()); + Statement statement = connection.createStatement()) { + statement.execute("SELECT 1"); + statement.execute("CREATE DATABASE IF NOT EXISTS default"); + return url; + } + catch (SQLException ex) { + if (System.nanoTime() > deadline) { + throw new RuntimeException("Databend server did not become ready in time", ex); + } + try { + Thread.sleep(2_000); + } + catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for Databend to start", interruptedException); + } + } + } + } + + private static void closeQuietly(DatabendContainer container) + { + if (container != null) { + try { + container.close(); + } + catch (RuntimeException ignored) { + } + } + } + + private static Path createConfigFile(String contents) + throws IOException + { + Path configFile = Files.createTempFile("databend-config", ".toml"); + Files.writeString(configFile, contents, StandardCharsets.UTF_8); + configFile.toFile().deleteOnExit(); + return configFile; + } + + private static String buildDatabendConfig() + { + return String.join("\n", + "[query]", + "max_active_sessions = 256", + "shutdown_wait_timeout_ms = 5000", + "", + "flight_api_address = \"0.0.0.0:9090\"", + "admin_api_address = \"0.0.0.0:8080\"", + "metric_api_address = \"0.0.0.0:7070\"", + "", + "mysql_handler_host = \"0.0.0.0\"", + "mysql_handler_port = 3307", + "", + "clickhouse_http_handler_host = \"0.0.0.0\"", + "clickhouse_http_handler_port = 8124", + "", + "http_handler_host = \"0.0.0.0\"", + "http_handler_port = 8000", + "", + "flight_sql_handler_host = \"0.0.0.0\"", + "flight_sql_handler_port = 8900", + "", + "tenant_id = \"default\"", + "cluster_id = \"default\"", + "", + "[log]", + "", + "[log.stderr]", + "level = \"WARN\"", + "format = \"text\"", + "", + "[log.file]", + "level = \"INFO\"", + "dir = \"/var/log/databend\"", + "", + "[meta]", + "endpoints = [\"0.0.0.0:9191\"]", + "username = \"root\"", + "password = \"root\"", + "client_timeout_in_second = 60", + "", + "[[query.users]]", + "name = \"databend\"", + "auth_type = \"double_sha1_password\"", + "auth_string = \"3081f32caef285c232d066033c89a78d88a6d8a5\"", + "", + "[[query.users]]", + "name = \"root\"", + "auth_type = \"no_password\"", + "", + "[storage]", + "type = \"fs\"", + "", + "[storage.fs]", + "data_path = \"/var/lib/databend/data\"", + "") + "\n"; + } + + private static String withPresignDisabled(String jdbcUrl) + { + if (jdbcUrl.contains("?")) { + return jdbcUrl + "&presigned_url_disabled=true"; + } + return jdbcUrl + "?presigned_url_disabled=true"; + } +} diff --git a/pom.xml b/pom.xml index 25b8e0fe5947..11ad2530ef05 100644 --- a/pom.xml +++ b/pom.xml @@ -65,6 +65,7 @@ plugin/trino-blackhole plugin/trino-cassandra plugin/trino-clickhouse + plugin/trino-databend plugin/trino-delta-lake plugin/trino-druid plugin/trino-duckdb diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/multinode-all/databend.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/multinode-all/databend.properties new file mode 100644 index 000000000000..5dc861fcd574 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/trino-product-tests/conf/environment/multinode-all/databend.properties @@ -0,0 +1,4 @@ +connector.name=databend +connection-url=jdbc:databend://databend:8000/ +connection-user=root +connection-password= diff --git a/testing/trino-testing-containers/pom.xml b/testing/trino-testing-containers/pom.xml index c22ee0d6db83..aff03c733482 100644 --- a/testing/trino-testing-containers/pom.xml +++ b/testing/trino-testing-containers/pom.xml @@ -117,6 +117,12 @@ true + + org.apache.commons + commons-lang3 + runtime + + io.airlift junit-extensions diff --git a/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/junit/ReportLeakedContainers.java b/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/junit/ReportLeakedContainers.java index f5a3284e629e..ea3745f2e654 100644 --- a/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/junit/ReportLeakedContainers.java +++ b/testing/trino-testing-containers/src/main/java/io/trino/testing/containers/junit/ReportLeakedContainers.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.ServiceConfigurationError; import java.util.Set; import static com.google.common.base.MoreObjects.toStringHelper; @@ -76,28 +77,38 @@ public void testPlanExecutionFinished(TestPlan testPlan) log.info("Checking for leaked containers"); - @SuppressWarnings("resource") // Throws when close is attempted, as this is a global instance. - DockerClient dockerClient = DockerClientFactory.lazyClient(); + ClassLoader previousContextClassLoader = Thread.currentThread().getContextClassLoader(); + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + try { + @SuppressWarnings("resource") // Throws when close is attempted, as this is a global instance. + DockerClient dockerClient = DockerClientFactory.lazyClient(); - List containers = dockerClient.listContainersCmd() - .withLabelFilter(Map.of(DockerClientFactory.TESTCONTAINERS_SESSION_ID_LABEL, DockerClientFactory.SESSION_ID)) - // ignore status "exited" - for example, failed containers after using `withStartupAttempts()` - .withStatusFilter(List.of("created", "restarting", "running", "paused")) - .exec() - .stream() - // testcontainers/sshd is implicitly started by testcontainers and we trust the library to stop if when no longer needed - .filter(container -> !container.getImage().startsWith("testcontainers/sshd:")) - .filter(container -> !ignoredIds.contains(container.getId())) - .collect(toImmutableList()); + List containers = dockerClient.listContainersCmd() + .withLabelFilter(Map.of(DockerClientFactory.TESTCONTAINERS_SESSION_ID_LABEL, DockerClientFactory.SESSION_ID)) + // ignore status "exited" - for example, failed containers after using `withStartupAttempts()` + .withStatusFilter(List.of("created", "restarting", "running", "paused")) + .exec() + .stream() + // testcontainers/sshd is implicitly started by testcontainers and we trust the library to stop if when no longer needed + .filter(container -> !container.getImage().startsWith("testcontainers/sshd:")) + .filter(container -> !ignoredIds.contains(container.getId())) + .collect(toImmutableList()); - if (!containers.isEmpty()) { - reportListenerFailure(getClass(), "Leaked containers: %s", containers.stream() - .map(container -> toStringHelper("container") - .add("id", container.getId()) - .add("image", container.getImage()) - .add("imageId", container.getImageId()) - .toString()) - .collect(joining(", ", "[", "]"))); + if (!containers.isEmpty()) { + reportListenerFailure(getClass(), "Leaked containers: %s", containers.stream() + .map(container -> toStringHelper("container") + .add("id", container.getId()) + .add("image", container.getImage()) + .add("imageId", container.getImageId()) + .toString()) + .collect(joining(", ", "[", "]"))); + } + } + catch (ServiceConfigurationError | NoClassDefFoundError e) { + log.warn(e, "Docker client unavailable, skipping leaked container check"); + } + finally { + Thread.currentThread().setContextClassLoader(previousContextClassLoader); } } }