diff --git a/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java b/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java index 13b721230..47576e968 100644 --- a/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java +++ b/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java @@ -45,6 +45,8 @@ import io.confluent.connect.jdbc.util.TableId; import io.confluent.connect.jdbc.util.Version; +import static io.confluent.connect.jdbc.source.JdbcSourceTaskConfig.TASK_ID_CONFIG; + /** * JdbcConnector is a Kafka Connect Connector implementation that watches a JDBC database and * generates tasks to ingest database contents. @@ -57,6 +59,7 @@ public class JdbcSourceConnector extends SourceConnector { private Map configProperties; private JdbcSourceConnectorConfig config; + private JdbcSourceTaskConfig taskConfig; private CachedConnectionProvider cachedConnectionProvider; private TableMonitorThread tableMonitorThread; private DatabaseDialect dialect; @@ -131,7 +134,9 @@ public void start(Map properties) throws ConnectException { tablePollMs, whitelistSet, blacklistSet, - Time.SYSTEM + Time.SYSTEM, + config.connectorName(), + taskConfig.getTaskID() ); if (query.isEmpty()) { tableMonitorThread.start(); @@ -174,7 +179,7 @@ public List> taskConfigs(int maxTasks) { log.info("No custom query provided, generating task configurations for tables"); List currentTables = tableMonitorThread.tables(); log.trace("Current tables from tableMonitorThread: {}", currentTables); - + if (currentTables == null || currentTables.isEmpty()) { taskConfigs = new ArrayList<>(1); Map taskProps = new HashMap<>(configProperties); @@ -202,6 +207,7 @@ public List> taskConfigs(int maxTasks) { List> tablesGrouped = ConnectorUtils.groupPartitions(currentTables, numGroups); taskConfigs = new ArrayList<>(tablesGrouped.size()); + int count = 0; for (List taskTables : tablesGrouped) { Map taskProps = new HashMap<>(configProperties); ExpressionBuilder builder = dialect.expressionBuilder(); @@ -209,6 +215,7 @@ public List> taskConfigs(int maxTasks) { taskProps.put(JdbcSourceTaskConfig.TABLES_CONFIG, builder.toString()); taskProps.put(JdbcSourceTaskConfig.TABLES_FETCHED, "true"); log.trace("Assigned tables {} to task with tablesFetched=true", taskTables); + taskProps.put(TASK_ID_CONFIG, count++ + ""); taskConfigs.add(taskProps); } log.info("Current Tables size: {}", currentTables.size()); diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java index e5b3d32a0..9e89c1ae2 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java @@ -38,6 +38,7 @@ import io.confluent.connect.jdbc.util.JdbcCredentialsProviderValidator; import io.confluent.connect.jdbc.util.QuoteMethod; import io.confluent.connect.jdbc.util.TimeZoneValidator; +import io.confluent.connect.jdbc.util.ConfigUtils; import java.util.function.BiFunction; import java.util.function.Function; @@ -824,9 +825,11 @@ public void ensureValid(final String name, final Object value) { } public static final ConfigDef CONFIG_DEF = baseConfigDef(); + public final String connectorName; public JdbcSourceConnectorConfig(Map props) { super(CONFIG_DEF, props); + this.connectorName = ConfigUtils.connectorName(props); } public String topicPrefix() { @@ -1065,6 +1068,11 @@ public static int get(TransactionIsolationMode mode) { protected JdbcSourceConnectorConfig(ConfigDef subclassConfigDef, Map props) { super(subclassConfigDef, props); + connectorName = ConfigUtils.connectorName(props); + } + + public String connectorName() { + return connectorName; } public NumericMapping numericMapping() { diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTaskConfig.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTaskConfig.java index 38d324855..1bdb950fa 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTaskConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTaskConfig.java @@ -31,11 +31,19 @@ public class JdbcSourceTaskConfig extends JdbcSourceConnectorConfig { private static final String TABLES_DOC = "List of tables for this task to watch for changes."; public static final String TABLES_FETCHED = "tables.fetched"; + public static final String TASK_ID_CONFIG = "task.id"; + private static final String TASK_ID_DOC = "Task's id"; + static ConfigDef config = baseConfigDef() .define(TABLES_CONFIG, Type.LIST, Importance.HIGH, TABLES_DOC) - .defineInternal(TABLES_FETCHED, Type.BOOLEAN, false, Importance.HIGH); + .defineInternal(TABLES_FETCHED, Type.BOOLEAN, false, Importance.HIGH) + .define(TASK_ID_CONFIG, Type.STRING, Importance.HIGH, TASK_ID_DOC); public JdbcSourceTaskConfig(Map props) { super(config, props); } + + public String getTaskID() { + return getString(TASK_ID_CONFIG); + } } diff --git a/src/main/java/io/confluent/connect/jdbc/source/TableMonitorThread.java b/src/main/java/io/confluent/connect/jdbc/source/TableMonitorThread.java index cb49e1ee6..c8099c645 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/TableMonitorThread.java +++ b/src/main/java/io/confluent/connect/jdbc/source/TableMonitorThread.java @@ -63,7 +63,9 @@ public TableMonitorThread(DatabaseDialect dialect, long pollMs, Set whitelist, Set blacklist, - Time time + Time time, + String connectorName, + String taskId ) { this.dialect = dialect; this.connectionProvider = connectionProvider; @@ -75,6 +77,8 @@ public TableMonitorThread(DatabaseDialect dialect, this.blacklist = blacklist; this.tables = new AtomicReference<>(); this.time = time; + + this.setName(connectorName + "-" + taskId + "-TableMonitorThread"); } @Override diff --git a/src/test/java/io/confluent/connect/jdbc/source/TableMonitorThreadTest.java b/src/test/java/io/confluent/connect/jdbc/source/TableMonitorThreadTest.java index e58603f5e..26814927b 100644 --- a/src/test/java/io/confluent/connect/jdbc/source/TableMonitorThreadTest.java +++ b/src/test/java/io/confluent/connect/jdbc/source/TableMonitorThreadTest.java @@ -56,6 +56,9 @@ public class TableMonitorThreadTest { private static final long STARTUP_LIMIT = 50; private static final long POLL_INTERVAL = 100; + private static final String connectorName = "test-connector"; + private static final String connectorTaskId = "test-task-id"; + private final static TableId FOO = new TableId(null, null, "foo"); private final static TableId BAR = new TableId(null, null, "bar"); private final static TableId BAZ = new TableId(null, null, "baz"); @@ -92,7 +95,7 @@ public class TableMonitorThreadTest { public void testSingleLookup() throws Exception { EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM); + STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM, connectorName, connectorTaskId); expectTableNames(LIST_FOO, shutdownThread()); EasyMock.replay(connectionProvider, dialect); @@ -107,7 +110,7 @@ public void testSingleLookup() throws Exception { public void testTablesBlockingTimeoutOnUpdateThread() throws Exception { EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, 0, null, null, time); + STARTUP_LIMIT, 0, null, null, time, connectorName, connectorTaskId); CountDownLatch connectionRequested = new CountDownLatch(1); CountDownLatch connectionCompleted = new CountDownLatch(1); @@ -158,7 +161,7 @@ public void testTablesBlockingTimeoutOnUpdateThread() throws Exception { public void testTablesBlockingWithDeadlineOnUpdateThread() throws Exception { EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, null, null, time); + STARTUP_LIMIT, POLL_INTERVAL, null, null, time, connectorName, connectorTaskId); EasyMock.expect(dialect.tableIds(EasyMock.eq(connection))).andReturn(Collections.emptyList()); EasyMock.expect(connectionProvider.getConnection()).andReturn(connection); @@ -185,7 +188,7 @@ public void testWhitelist() throws Exception { Set whitelist = new HashSet<>(Arrays.asList("foo", "bar")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM); + STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM, connectorName, connectorTaskId); expectTableNames(LIST_FOO_BAR, shutdownThread()); EasyMock.replay(connectionProvider, dialect); @@ -201,7 +204,7 @@ public void testBlacklist() throws Exception { Set blacklist = new HashSet<>(Arrays.asList("bar", "baz")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM); + STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM, connectorName, connectorTaskId); expectTableNames(LIST_FOO_BAR_BAZ, shutdownThread()); EasyMock.replay(connectionProvider, dialect); @@ -216,7 +219,7 @@ public void testBlacklist() throws Exception { public void testReconfigOnUpdate() throws Exception { EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM); + STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM, connectorName, connectorTaskId); expectTableNames(LIST_FOO); expectTableNames(LIST_FOO, checkTableNames("foo")); context.requestTaskReconfiguration(); @@ -244,7 +247,7 @@ public void testReconfigOnUpdate() throws Exception { @Test public void testInvalidConnection() throws Exception { tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM); + STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM, connectorName, connectorTaskId); EasyMock.expect(connectionProvider.getConnection()).andThrow(new ConnectException("Simulated error with the db.")); CountDownLatch errorLatch = new CountDownLatch(1); @@ -267,7 +270,7 @@ public void testInvalidConnection() throws Exception { public void testDuplicates() throws Exception { EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM); + STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM, connectorName, connectorTaskId); expectTableNames(LIST_DUP_WITH_ALL, shutdownThread()); context.requestTaskReconfiguration(); EasyMock.expectLastCall(); @@ -285,7 +288,7 @@ public void testDuplicateWithUnqualifiedWhitelist() throws Exception { Set whitelist = new HashSet<>(Arrays.asList("dup")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM); + STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM, connectorName, connectorTaskId); expectTableNames(LIST_DUP_ONLY, shutdownThread()); context.requestTaskReconfiguration(); EasyMock.expectLastCall(); @@ -304,7 +307,7 @@ public void testDuplicateWithUnqualifiedBlacklist() throws Exception { Set blacklist = new HashSet<>(Arrays.asList("foo")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM); + STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM, connectorName, connectorTaskId); expectTableNames(LIST_DUP_WITH_ALL, shutdownThread()); context.requestTaskReconfiguration(); EasyMock.expectLastCall(); @@ -323,7 +326,7 @@ public void testDuplicateWithQualifiedWhitelist() throws Exception { Set whitelist = new HashSet<>(Arrays.asList("dup1.dup", "foo")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM); + STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM, connectorName, connectorTaskId); expectTableNames(LIST_DUP_WITH_ALL, shutdownThread()); EasyMock.replay(connectionProvider, dialect); @@ -338,7 +341,7 @@ public void testDuplicateWithQualifiedBlacklist() throws Exception { Set blacklist = new HashSet<>(Arrays.asList("dup1.dup", "foo")); EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes(); tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context, - STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM); + STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM, connectorName, connectorTaskId); expectTableNames(LIST_DUP_WITH_ALL, shutdownThread()); EasyMock.replay(connectionProvider, dialect);