Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import io.confluent.connect.jdbc.util.TableId;
import io.confluent.connect.jdbc.util.Version;

import static io.confluent.connect.jdbc.source.JdbcSourceTaskConfig.TASK_ID_CONFIG;

/**
* JdbcConnector is a Kafka Connect Connector implementation that watches a JDBC database and
* generates tasks to ingest database contents.
Expand All @@ -57,6 +59,7 @@ public class JdbcSourceConnector extends SourceConnector {

private Map<String, String> configProperties;
private JdbcSourceConnectorConfig config;
private JdbcSourceTaskConfig taskConfig;
private CachedConnectionProvider cachedConnectionProvider;
private TableMonitorThread tableMonitorThread;
private DatabaseDialect dialect;
Expand Down Expand Up @@ -131,7 +134,9 @@ public void start(Map<String, String> properties) throws ConnectException {
tablePollMs,
whitelistSet,
blacklistSet,
Time.SYSTEM
Time.SYSTEM,
config.connectorName(),
taskConfig.getTaskID()
);
if (query.isEmpty()) {
tableMonitorThread.start();
Expand Down Expand Up @@ -174,7 +179,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
log.info("No custom query provided, generating task configurations for tables");
List<TableId> currentTables = tableMonitorThread.tables();
log.trace("Current tables from tableMonitorThread: {}", currentTables);

if (currentTables == null || currentTables.isEmpty()) {
taskConfigs = new ArrayList<>(1);
Map<String, String> taskProps = new HashMap<>(configProperties);
Expand Down Expand Up @@ -202,13 +207,15 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
List<List<TableId>> tablesGrouped =
ConnectorUtils.groupPartitions(currentTables, numGroups);
taskConfigs = new ArrayList<>(tablesGrouped.size());
int count = 0;
for (List<TableId> taskTables : tablesGrouped) {
Map<String, String> taskProps = new HashMap<>(configProperties);
ExpressionBuilder builder = dialect.expressionBuilder();
builder.appendList().delimitedBy(",").of(taskTables);
taskProps.put(JdbcSourceTaskConfig.TABLES_CONFIG, builder.toString());
taskProps.put(JdbcSourceTaskConfig.TABLES_FETCHED, "true");
log.trace("Assigned tables {} to task with tablesFetched=true", taskTables);
taskProps.put(TASK_ID_CONFIG, count++ + "");
taskConfigs.add(taskProps);
}
log.info("Current Tables size: {}", currentTables.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.confluent.connect.jdbc.util.JdbcCredentialsProviderValidator;
import io.confluent.connect.jdbc.util.QuoteMethod;
import io.confluent.connect.jdbc.util.TimeZoneValidator;
import io.confluent.connect.jdbc.util.ConfigUtils;

import java.util.function.BiFunction;
import java.util.function.Function;
Expand Down Expand Up @@ -824,9 +825,11 @@ public void ensureValid(final String name, final Object value) {
}

public static final ConfigDef CONFIG_DEF = baseConfigDef();
public final String connectorName;

public JdbcSourceConnectorConfig(Map<String, ?> props) {
super(CONFIG_DEF, props);
this.connectorName = ConfigUtils.connectorName(props);
}

public String topicPrefix() {
Expand Down Expand Up @@ -1065,6 +1068,11 @@ public static int get(TransactionIsolationMode mode) {

protected JdbcSourceConnectorConfig(ConfigDef subclassConfigDef, Map<String, String> props) {
super(subclassConfigDef, props);
connectorName = ConfigUtils.connectorName(props);
}

public String connectorName() {
return connectorName;
}

public NumericMapping numericMapping() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,19 @@ public class JdbcSourceTaskConfig extends JdbcSourceConnectorConfig {
private static final String TABLES_DOC = "List of tables for this task to watch for changes.";
public static final String TABLES_FETCHED = "tables.fetched";

public static final String TASK_ID_CONFIG = "task.id";
private static final String TASK_ID_DOC = "Task's id";

static ConfigDef config = baseConfigDef()
.define(TABLES_CONFIG, Type.LIST, Importance.HIGH, TABLES_DOC)
.defineInternal(TABLES_FETCHED, Type.BOOLEAN, false, Importance.HIGH);
.defineInternal(TABLES_FETCHED, Type.BOOLEAN, false, Importance.HIGH)
.define(TASK_ID_CONFIG, Type.STRING, Importance.HIGH, TASK_ID_DOC);

public JdbcSourceTaskConfig(Map<String, String> props) {
super(config, props);
}

public String getTaskID() {
return getString(TASK_ID_CONFIG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public TableMonitorThread(DatabaseDialect dialect,
long pollMs,
Set<String> whitelist,
Set<String> blacklist,
Time time
Time time,
String connectorName,
String taskId
) {
this.dialect = dialect;
this.connectionProvider = connectionProvider;
Expand All @@ -75,6 +77,8 @@ public TableMonitorThread(DatabaseDialect dialect,
this.blacklist = blacklist;
this.tables = new AtomicReference<>();
this.time = time;

this.setName(connectorName + "-" + taskId + "-TableMonitorThread");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public class TableMonitorThreadTest {
private static final long STARTUP_LIMIT = 50;
private static final long POLL_INTERVAL = 100;

private static final String connectorName = "test-connector";
private static final String connectorTaskId = "test-task-id";

private final static TableId FOO = new TableId(null, null, "foo");
private final static TableId BAR = new TableId(null, null, "bar");
private final static TableId BAZ = new TableId(null, null, "baz");
Expand Down Expand Up @@ -92,7 +95,7 @@ public class TableMonitorThreadTest {
public void testSingleLookup() throws Exception {
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes();
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM);
STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM, connectorName, connectorTaskId);
expectTableNames(LIST_FOO, shutdownThread());
EasyMock.replay(connectionProvider, dialect);

Expand All @@ -107,7 +110,7 @@ public void testSingleLookup() throws Exception {
public void testTablesBlockingTimeoutOnUpdateThread() throws Exception {
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes();
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, 0, null, null, time);
STARTUP_LIMIT, 0, null, null, time, connectorName, connectorTaskId);

CountDownLatch connectionRequested = new CountDownLatch(1);
CountDownLatch connectionCompleted = new CountDownLatch(1);
Expand Down Expand Up @@ -158,7 +161,7 @@ public void testTablesBlockingTimeoutOnUpdateThread() throws Exception {
public void testTablesBlockingWithDeadlineOnUpdateThread() throws Exception {
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create());
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, null, null, time);
STARTUP_LIMIT, POLL_INTERVAL, null, null, time, connectorName, connectorTaskId);

EasyMock.expect(dialect.tableIds(EasyMock.eq(connection))).andReturn(Collections.emptyList());
EasyMock.expect(connectionProvider.getConnection()).andReturn(connection);
Expand All @@ -185,7 +188,7 @@ public void testWhitelist() throws Exception {
Set<String> whitelist = new HashSet<>(Arrays.asList("foo", "bar"));
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes();
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM);
STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM, connectorName, connectorTaskId);
expectTableNames(LIST_FOO_BAR, shutdownThread());
EasyMock.replay(connectionProvider, dialect);

Expand All @@ -201,7 +204,7 @@ public void testBlacklist() throws Exception {
Set<String> blacklist = new HashSet<>(Arrays.asList("bar", "baz"));
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes();
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM);
STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM, connectorName, connectorTaskId);
expectTableNames(LIST_FOO_BAR_BAZ, shutdownThread());
EasyMock.replay(connectionProvider, dialect);

Expand All @@ -216,7 +219,7 @@ public void testBlacklist() throws Exception {
public void testReconfigOnUpdate() throws Exception {
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes();
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM);
STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM, connectorName, connectorTaskId);
expectTableNames(LIST_FOO);
expectTableNames(LIST_FOO, checkTableNames("foo"));
context.requestTaskReconfiguration();
Expand Down Expand Up @@ -244,7 +247,7 @@ public void testReconfigOnUpdate() throws Exception {
@Test
public void testInvalidConnection() throws Exception {
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM);
STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM, connectorName, connectorTaskId);
EasyMock.expect(connectionProvider.getConnection()).andThrow(new ConnectException("Simulated error with the db."));

CountDownLatch errorLatch = new CountDownLatch(1);
Expand All @@ -267,7 +270,7 @@ public void testInvalidConnection() throws Exception {
public void testDuplicates() throws Exception {
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes();
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM);
STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM, connectorName, connectorTaskId);
expectTableNames(LIST_DUP_WITH_ALL, shutdownThread());
context.requestTaskReconfiguration();
EasyMock.expectLastCall();
Expand All @@ -285,7 +288,7 @@ public void testDuplicateWithUnqualifiedWhitelist() throws Exception {
Set<String> whitelist = new HashSet<>(Arrays.asList("dup"));
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes();
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM);
STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM, connectorName, connectorTaskId);
expectTableNames(LIST_DUP_ONLY, shutdownThread());
context.requestTaskReconfiguration();
EasyMock.expectLastCall();
Expand All @@ -304,7 +307,7 @@ public void testDuplicateWithUnqualifiedBlacklist() throws Exception {
Set<String> blacklist = new HashSet<>(Arrays.asList("foo"));
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes();
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM);
STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM, connectorName, connectorTaskId);
expectTableNames(LIST_DUP_WITH_ALL, shutdownThread());
context.requestTaskReconfiguration();
EasyMock.expectLastCall();
Expand All @@ -323,7 +326,7 @@ public void testDuplicateWithQualifiedWhitelist() throws Exception {
Set<String> whitelist = new HashSet<>(Arrays.asList("dup1.dup", "foo"));
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes();
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM);
STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM, connectorName, connectorTaskId);
expectTableNames(LIST_DUP_WITH_ALL, shutdownThread());
EasyMock.replay(connectionProvider, dialect);

Expand All @@ -338,7 +341,7 @@ public void testDuplicateWithQualifiedBlacklist() throws Exception {
Set<String> blacklist = new HashSet<>(Arrays.asList("dup1.dup", "foo"));
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes();
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM);
STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM, connectorName, connectorTaskId);
expectTableNames(LIST_DUP_WITH_ALL, shutdownThread());
EasyMock.replay(connectionProvider, dialect);

Expand Down