diff --git a/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java b/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java index 12f8cfdbbc..0cd94cbb66 100644 --- a/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java +++ b/src/main/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialect.java @@ -44,6 +44,9 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Types; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -51,6 +54,9 @@ import java.util.Set; import java.util.UUID; +import static java.time.format.DateTimeFormatter.ISO_DATE_TIME; +import static java.time.format.DateTimeFormatter.ISO_OFFSET_DATE_TIME; + /** * A {@link DatabaseDialect} for PostgreSQL. */ @@ -78,6 +84,8 @@ public DatabaseDialect create(AbstractConfig config) { static final String JSON_TYPE_NAME = "json"; static final String JSONB_TYPE_NAME = "jsonb"; static final String UUID_TYPE_NAME = "uuid"; + static final String TIMESTAMP_WITH_TIMEZONE = "timestamptz"; + static final String TIMESTAMP_WITHOUT_TIMEZONE = "timestamp"; /** * Define the PG datatypes that require casting upon insert/update statements. @@ -86,7 +94,9 @@ public DatabaseDialect create(AbstractConfig config) { Utils.mkSet( JSON_TYPE_NAME, JSONB_TYPE_NAME, - UUID_TYPE_NAME + UUID_TYPE_NAME, + TIMESTAMP_WITH_TIMEZONE, + TIMESTAMP_WITHOUT_TIMEZONE ) ); @@ -239,6 +249,28 @@ public String addFieldToSchema( return fieldName; } + if (OffsetDateTime.class.getName().equals(columnDefn.classNameForType())) { + builder.field( + fieldName, + columnDefn.isOptional() + ? + Schema.OPTIONAL_STRING_SCHEMA : + Schema.STRING_SCHEMA + ); + return fieldName; + } + + if (LocalDate.class.getName().equals(columnDefn.classNameForType())) { + builder.field( + fieldName, + columnDefn.isOptional() + ? + Schema.OPTIONAL_STRING_SCHEMA : + Schema.STRING_SCHEMA + ); + return fieldName; + } + break; } default: @@ -280,6 +312,13 @@ protected ColumnConverter columnConverterFor( if (UUID.class.getName().equals(columnDefn.classNameForType())) { return rs -> rs.getString(col); } + + if (OffsetDateTime.class.getName().equals(columnDefn.classNameForType())) { + return rs -> rs.getObject(col, OffsetDateTime.class).format(ISO_OFFSET_DATE_TIME); + } + if (LocalDateTime.class.getName().equals(columnDefn.classNameForType())) { + return rs -> rs.getObject(col, LocalDateTime.class).format(ISO_DATE_TIME); + } break; } default: diff --git a/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java b/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java index 651160e790..06b1eec607 100644 --- a/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java +++ b/src/test/java/io/confluent/connect/jdbc/dialect/PostgreSqlDatabaseDialectTest.java @@ -39,6 +39,8 @@ import java.sql.SQLException; import java.sql.Statement; import java.sql.Types; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -83,6 +85,8 @@ public void testCustomColumnConverters() { assertColumnConverter(Types.OTHER, PostgreSqlDatabaseDialect.JSON_TYPE_NAME, Schema.STRING_SCHEMA, String.class); assertColumnConverter(Types.OTHER, PostgreSqlDatabaseDialect.JSONB_TYPE_NAME, Schema.STRING_SCHEMA, String.class); assertColumnConverter(Types.OTHER, PostgreSqlDatabaseDialect.UUID_TYPE_NAME, Schema.STRING_SCHEMA, UUID.class); + assertColumnConverter(Types.OTHER, PostgreSqlDatabaseDialect.TIMESTAMP_WITHOUT_TIMEZONE, Schema.STRING_SCHEMA, LocalDateTime.class); + assertColumnConverter(Types.OTHER, PostgreSqlDatabaseDialect.TIMESTAMP_WITH_TIMEZONE, Schema.STRING_SCHEMA, OffsetDateTime.class); } @Test @@ -221,16 +225,20 @@ public void shouldBuildInsertStatement() { builder.withColumn("id2").type("int", JDBCType.INTEGER, Integer.class); builder.withColumn("columnA").type("varchar", JDBCType.VARCHAR, Integer.class); builder.withColumn("uuidColumn").type("uuid", JDBCType.OTHER, UUID.class); + builder.withColumn("timestamptzColumn").type("timestamptz", JDBCType.OTHER, OffsetDateTime.class); + builder.withColumn("timestampColumn").type("timestamp", JDBCType.OTHER, LocalDateTime.class); builder.withColumn("dateColumn").type("date", JDBCType.DATE, java.sql.Date.class); tableDefn = builder.build(); List nonPkColumns = new ArrayList<>(); nonPkColumns.add(new ColumnId(tableId, "columnA")); nonPkColumns.add(new ColumnId(tableId, "uuidColumn")); + nonPkColumns.add(new ColumnId(tableId, "timestamptzColumn")); + nonPkColumns.add(new ColumnId(tableId, "timestampColumn")); nonPkColumns.add(new ColumnId(tableId, "dateColumn")); assertEquals( "INSERT INTO myTable (" + - "id1,id2,columnA,uuidColumn,dateColumn" + - ") VALUES (?,?,?,?::uuid,?)", + "id1,id2,columnA,uuidColumn,timestamptzColumn,timestampColumn,dateColumn" + + ") VALUES (?,?,?,?::uuid,?::timestamptz,?::timestamp,?)", dialect.buildInsertStatement(tableId, pkColumns, nonPkColumns, tableDefn) ); } @@ -270,19 +278,25 @@ public void shouldBuildUpsertStatement() { builder.withColumn("id2").type("int", JDBCType.INTEGER, Integer.class); builder.withColumn("columnA").type("varchar", JDBCType.VARCHAR, Integer.class); builder.withColumn("uuidColumn").type("uuid", JDBCType.OTHER, UUID.class); + builder.withColumn("timestamptzColumn").type("timestamptz", JDBCType.OTHER, OffsetDateTime.class); + builder.withColumn("timestampColumn").type("timestamp", JDBCType.OTHER, LocalDateTime.class); builder.withColumn("dateColumn").type("date", JDBCType.DATE, java.sql.Date.class); tableDefn = builder.build(); List nonPkColumns = new ArrayList<>(); nonPkColumns.add(new ColumnId(tableId, "columnA")); nonPkColumns.add(new ColumnId(tableId, "uuidColumn")); + nonPkColumns.add(new ColumnId(tableId, "timestamptzColumn")); + nonPkColumns.add(new ColumnId(tableId, "timestampColumn")); nonPkColumns.add(new ColumnId(tableId, "dateColumn")); assertEquals( "INSERT INTO myTable (" + - "id1,id2,columnA,uuidColumn,dateColumn" + - ") VALUES (?,?,?,?::uuid,?) ON CONFLICT (id1," + + "id1,id2,columnA,uuidColumn,timestamptzColumn,timestampColumn,dateColumn" + + ") VALUES (?,?,?,?::uuid,?::timestamptz,?::timestamp,?) ON CONFLICT (id1," + "id2) DO UPDATE SET " + "columnA=EXCLUDED.columnA," + "uuidColumn=EXCLUDED.uuidColumn," + + "timestamptzColumn=EXCLUDED.timestamptzColumn," + + "timestampColumn=EXCLUDED.timestampColumn," + "dateColumn=EXCLUDED.dateColumn", dialect.buildUpsertQueryStatement(tableId, pkColumns, nonPkColumns, tableDefn) ); @@ -295,14 +309,20 @@ public void shouldComputeValueTypeCast() { builder.withColumn("id2").type("int", JDBCType.INTEGER, Integer.class); builder.withColumn("columnA").type("varchar", JDBCType.VARCHAR, Integer.class); builder.withColumn("uuidColumn").type("uuid", JDBCType.OTHER, UUID.class); + builder.withColumn("timestamptzColumn").type("timestamptz", JDBCType.OTHER, OffsetDateTime.class); + builder.withColumn("timestampColumn").type("timestamp", JDBCType.OTHER, LocalDateTime.class); builder.withColumn("dateColumn").type("date", JDBCType.DATE, java.sql.Date.class); TableDefinition tableDefn = builder.build(); ColumnId uuidColumn = tableDefn.definitionForColumn("uuidColumn").id(); + ColumnId timestamptzColumn = tableDefn.definitionForColumn("timestamptzColumn").id(); + ColumnId timestampColumn = tableDefn.definitionForColumn("timestampColumn").id(); ColumnId dateColumn = tableDefn.definitionForColumn("dateColumn").id(); assertEquals("", dialect.valueTypeCast(tableDefn, columnPK1)); assertEquals("", dialect.valueTypeCast(tableDefn, columnPK2)); assertEquals("", dialect.valueTypeCast(tableDefn, columnA)); assertEquals("::uuid", dialect.valueTypeCast(tableDefn, uuidColumn)); + assertEquals("::timestamptz", dialect.valueTypeCast(tableDefn, timestamptzColumn)); + assertEquals("::timestamp", dialect.valueTypeCast(tableDefn, timestampColumn)); assertEquals("", dialect.valueTypeCast(tableDefn, dateColumn)); }