From 5198b37626a46574ee4994659e9d19300e1477a2 Mon Sep 17 00:00:00 2001 From: KGaneshDatta Date: Wed, 12 Nov 2025 16:35:54 +0530 Subject: [PATCH 1/7] Extend query config with query.masked config with Type.PASSWORD for secure query masking --- .../connect/jdbc/JdbcSourceConnector.java | 4 +- .../source/JdbcSourceConnectorConfig.java | 40 +++ .../connect/jdbc/source/JdbcSourceTask.java | 2 +- .../jdbc/source/TableQuerierProcessor.java | 2 +- .../JdbcSourceConnectorValidation.java | 32 ++- .../source/JdbcSourceConnectorConfigTest.java | 266 ++++++++++++++++++ .../JdbcSourceConnectorValidationTest.java | 91 ++++++ 7 files changed, 432 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java b/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java index 2d85039c2..a460ad08f 100644 --- a/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java +++ b/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java @@ -117,7 +117,7 @@ public void start(Map properties) throws ConnectException { List excludeList = config.tableExcludeListRegexes(); Set excludeListSet = excludeList.isEmpty() ? null : new HashSet<>(excludeList); - String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG); + String query = config.getQuery(); if (!query.isEmpty()) { if (whitelistSet != null || blacklistSet != null || includeListSet != null || excludeListSet != null) { @@ -170,7 +170,7 @@ public Config validate(Map connectorConfigs) { @Override public List> taskConfigs(int maxTasks) { log.info("Starting with the task Configuration method."); - String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG); + String query = config.getQuery(); List> taskConfigs; if (!query.isEmpty()) { log.info("Custom query provided, generating task configuration for the query"); diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java index 2fa613205..4a5e105da 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java @@ -54,6 +54,7 @@ import org.apache.kafka.common.config.ConfigDef.Validator; import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.errors.ConnectException; @@ -336,6 +337,19 @@ public class JdbcSourceConnectorConfig extends AbstractConfig { public static final String QUERY_DEFAULT = ""; private static final String QUERY_DISPLAY = "Query"; + public static final String QUERY_MASKED_CONFIG = "query.masked"; + private static final String QUERY_MASKED_DOC = + "If specified, the query to perform to select new or updated rows. This is the same as " + + "'query' config but with Type.PASSWORD to mask the query value from being visible to users. " + + "Use this setting when your query contains sensitive information. If used, this connector " + + "will only copy data using this query -- whole-table copying will be disabled. Different " + + "query modes may still be used for incremental updates, but in order to properly construct " + + "the incremental query, it must be possible to append a WHERE clause to this query (i.e. no " + + "WHERE clauses may be used). If you use a WHERE clause, it must handle incremental queries " + + "itself. Note: Only one of 'query' or 'query.masked' should be set, not both."; + private static final String QUERY_MASKED_DISPLAY = "Query (Masked)"; + + public static final String TOPIC_PREFIX_CONFIG = "topic.prefix"; private static final String TOPIC_PREFIX_DOC = "Prefix to prepend to table names to generate the name of the Kafka topic to publish data " @@ -927,6 +941,16 @@ private static final void addModeOptions(ConfigDef config) { ++orderInGroup, Width.SHORT, QUERY_DISPLAY + ).define( + QUERY_MASKED_CONFIG, + Type.PASSWORD, + QUERY_DEFAULT, + Importance.MEDIUM, + QUERY_MASKED_DOC, + MODE_GROUP, + ++orderInGroup, + Width.SHORT, + QUERY_MASKED_DISPLAY ).define( QUOTE_SQL_IDENTIFIERS_CONFIG, Type.STRING, @@ -1432,6 +1456,22 @@ public List incrementingColMappingRegexes() { .collect(java.util.stream.Collectors.toList()); } + /** + * Get query string from either query.masked (Type.PASSWORD) or query (Type.STRING) config. + * Prioritizes query.masked if set, otherwise falls back to query config for backward compatibility. + * + * @return The query string from whichever config is set, or empty string if neither is set. + */ + public String getQuery() { + // First check if query.masked is set + org.apache.kafka.common.config.types.Password maskedQuery = getPassword(QUERY_MASKED_CONFIG); + if (maskedQuery != null && maskedQuery.value() != null && !maskedQuery.value().isEmpty()) { + return maskedQuery.value(); + } + + return getString(QUERY_CONFIG); + } + public boolean modeUsesTimestampColumn() { String mode = getString(MODE_CONFIG); return Arrays.asList(MODE_TIMESTAMP, MODE_TIMESTAMP_INCREMENTING).contains(mode); diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java index 7508e05ce..402289233 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java @@ -95,7 +95,7 @@ public void start(Map properties) { List tables = config.getList(JdbcSourceTaskConfig.TABLES_CONFIG); Boolean tablesFetched = config.getBoolean(JdbcSourceTaskConfig.TABLES_FETCHED); - String query = config.getString(JdbcSourceTaskConfig.QUERY_CONFIG); + String query = config.getQuery(); List tableType = config.getList(JdbcSourceConnectorConfig.TABLE_TYPE_CONFIG); if ((tables.isEmpty() && query.isEmpty())) { diff --git a/src/main/java/io/confluent/connect/jdbc/source/TableQuerierProcessor.java b/src/main/java/io/confluent/connect/jdbc/source/TableQuerierProcessor.java index f25b5c482..7eb5924f1 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/TableQuerierProcessor.java +++ b/src/main/java/io/confluent/connect/jdbc/source/TableQuerierProcessor.java @@ -95,7 +95,7 @@ private boolean isReadyToProcess() { // If the call to get tables has not completed we will not do anything. // This is only valid in table mode. Boolean tablesFetched = config.getBoolean(JdbcSourceTaskConfig.TABLES_FETCHED); - String query = config.getString(JdbcSourceTaskConfig.QUERY_CONFIG); + String query = config.getQuery(); return !query.isEmpty() || tablesFetched; } diff --git a/src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java b/src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java index b5445287e..6f794097f 100644 --- a/src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java +++ b/src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java @@ -80,7 +80,8 @@ public Config validate() { } boolean validationResult = validateMultiConfigs() - && validateLegacyNewConfigCompatibility(); + && validateLegacyNewConfigCompatibility() + && validateQueryConfigs(); if (validationResult && isUsingNewConfigs()) { validationResult = validateTableInclusionConfigs() @@ -305,6 +306,35 @@ private boolean validateTableInclusionConfigs() { return true; } + /** + * Validate that only one of query or query.masked configs is set at a time. + * Both configs should not be set simultaneously to avoid ambiguity. + */ + private boolean validateQueryConfigs() { + String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG); + org.apache.kafka.common.config.types.Password queryMasked = + config.getPassword(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG); + + boolean hasQuery = query != null && !query.isEmpty(); + boolean hasQueryMasked = queryMasked != null + && queryMasked.value() != null + && !queryMasked.value().isEmpty(); + + if (hasQuery && hasQueryMasked) { + String msg = "Both 'query' and 'query.masked' configs cannot be set at the same time. " + + "Please use only one of them. Use 'query.masked' (Type.PASSWORD) if you want to hide " + + "the query value from being visible, or use 'query' (Type.STRING) for regular usage."; + + addConfigError(JdbcSourceConnectorConfig.QUERY_CONFIG, msg); + addConfigError(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, msg); + + log.error("Validation failed: Both query and query.masked configs are set"); + return false; + } + + return true; + } + /** * Validate plugin-specific needs. This method can be overridden by specific * connector implementations to add their own validation logic. diff --git a/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java b/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java index eea717b34..9e8d8c074 100644 --- a/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java +++ b/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java @@ -424,4 +424,270 @@ public void testWhitespaceInRegexPatternsIsHandledCorrectly() { assertTrue(includeList.contains("schema1\\.users")); assertTrue(includeList.contains("schema2\\.orders")); } + + @Test + public void testQueryMaskedValueIsMaskedInConfigValue() { + // Setup config with query.masked + Map props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); + props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); + String sensitiveQuery = "SELECT * FROM sensitive_table WHERE secret_column = 'confidential'"; + props.put(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, sensitiveQuery); + + // Validate and get config values + Map validatedConfig = JdbcSourceConnectorConfig.CONFIG_DEF.validateAll(props); + ConfigValue queryMaskedValue = validatedConfig.get(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG); + + assertNotNull(queryMaskedValue); + + // The value should be masked (shown as [hidden] or Password object) + Object maskedValue = queryMaskedValue.value(); + assertNotNull(maskedValue); + + // When converted to string, Password objects show as "[hidden]" + String maskedString = maskedValue.toString(); + assertEquals("[hidden]", maskedString); + + // The actual query should NOT be visible in the string representation + assertFalse(maskedString.contains(sensitiveQuery)); + } + + @Test + public void testQueryMaskedValueCanBeRetrievedViaConfig() { + // Setup config with query.masked + Map props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); + props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); + String expectedQuery = "SELECT * FROM users WHERE status = 'active'"; + props.put(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, expectedQuery); + + // Create config + JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); + + // The actual value can be retrieved when needed + org.apache.kafka.common.config.types.Password password = + config.getPassword(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG); + assertNotNull(password); + + String actualQuery = password.value(); + assertEquals(expectedQuery, actualQuery); + } + + @Test + public void testGetQueryStringReturnsQueryMaskedWhenSet() { + // Setup config with query.masked (but not query) + Map props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); + props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); + String expectedQuery = "SELECT id, name, email FROM users"; + props.put(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, expectedQuery); + + JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); + + // getQueryString() should return the masked query value + String actualQuery = config.getQuery(); + assertEquals(expectedQuery, actualQuery); + } + + @Test + public void testGetQueryStringReturnsQueryWhenOnlyQuerySet() { + // Setup config with only query (not query.masked) + Map props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); + props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); + String expectedQuery = "SELECT * FROM public_table"; + props.put(JdbcSourceConnectorConfig.QUERY_CONFIG, expectedQuery); + + JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); + + // getQueryString() should return the regular query value + String actualQuery = config.getQuery(); + assertEquals(expectedQuery, actualQuery); + } + + @Test + public void testGetQueryStringPrioritizesQueryMaskedOverQuery() { + // Setup config with both query and query.masked + // (This violates validation but tests the priority logic) + Map props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); + props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); + String regularQuery = "SELECT * FROM table1"; + String maskedQuery = "SELECT * FROM sensitive_table"; + props.put(JdbcSourceConnectorConfig.QUERY_CONFIG, regularQuery); + props.put(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, maskedQuery); + + JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); + + // getQueryString() should prioritize query.masked + String actualQuery = config.getQuery(); + assertEquals(maskedQuery, actualQuery); + } + + @Test + public void testGetQueryStringReturnsEmptyWhenNeitherSet() { + // Setup config without query or query.masked + Map props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); + props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); + + JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); + + // getQueryString() should return empty string + String actualQuery = config.getQuery(); + assertEquals("", actualQuery); + } + + @Test + public void testQueryMaskedSupportsComplexQueryWithMultipleJoins() { + // Test that complex queries work fine with query.masked + Map props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); + props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); + + String complexQuery = "SELECT u.id, u.username, u.email, " + + "p.profile_pic, a.street, a.city, a.country, " + + "o.order_id, o.order_date, o.total_amount " + + "FROM users u " + + "INNER JOIN profiles p ON u.id = p.user_id " + + "LEFT JOIN addresses a ON u.id = a.user_id " + + "LEFT JOIN orders o ON u.id = o.user_id " + + "WHERE u.status = 'active' " + + "AND u.created_at >= '2024-01-01' " + + "AND o.total_amount > 100.00 " + + "ORDER BY o.order_date DESC " + + "LIMIT 1000"; + + props.put(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, complexQuery); + + JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); + + // Verify the full complex query can be retrieved + String retrievedQuery = config.getQuery(); + assertEquals(complexQuery, retrievedQuery); + } + + @Test + public void testQueryMaskedSupportsVeryLongQueries() { + // Test that very long queries (thousands of characters) work fine + Map props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); + props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); + + // Build a very long query (over 2000 characters) + StringBuilder longQuery = new StringBuilder("SELECT "); + for (int i = 0; i < 100; i++) { + longQuery.append("column_").append(i).append(", "); + } + longQuery.append("id FROM very_wide_table WHERE "); + for (int i = 0; i < 50; i++) { + longQuery.append("column_").append(i).append(" IS NOT NULL"); + if (i < 49) { + longQuery.append(" AND "); + } + } + + String expectedQuery = longQuery.toString(); + props.put(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, expectedQuery); + + JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); + + // Verify the full long query can be retrieved without truncation + String retrievedQuery = config.getQuery(); + assertEquals(expectedQuery, retrievedQuery); + assertTrue(retrievedQuery.length() > 2000); + } + + @Test + public void testQueryMaskedSupportsSpecialCharacters() { + // Test that queries with special characters work fine + Map props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); + props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); + + String queryWithSpecialChars = "SELECT * FROM users " + + "WHERE name LIKE '%O''Brien%' " + + "AND email = 'test@example.com' " + + "AND description LIKE '%Line1\nLine2%' " + + "AND data LIKE '%Tab\tSeparated%'"; + + props.put(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, queryWithSpecialChars); + + JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); + + // Verify special characters are preserved + String retrievedQuery = config.getQuery(); + assertEquals(queryWithSpecialChars, retrievedQuery); + } + + @Test + public void testPasswordObjectToStringShowsHidden() { + // Test that Password type properly masks the value in toString() + Map props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); + props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); + String secretQuery = "SELECT * FROM secret_table WHERE api_key = 'secret123'"; + props.put(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, secretQuery); + + JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); + org.apache.kafka.common.config.types.Password password = + config.getPassword(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG); + + // toString() should hide the value + String passwordString = password.toString(); + assertEquals("[hidden]", passwordString); + + // But value() should return the actual query + String actualValue = password.value(); + assertEquals(secretQuery, actualValue); + } + + @Test + public void testConnectionPasswordIsSimilarlyMasked() { + // Compare with existing connection.password config to ensure consistent behavior + Map props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); + props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); + props.put(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG, "mySecretPassword123"); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); + + Map validatedConfig = JdbcSourceConnectorConfig.CONFIG_DEF.validateAll(props); + ConfigValue passwordValue = validatedConfig.get(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG); + + // Password should be masked in ConfigValue + String maskedString = passwordValue.value().toString(); + assertEquals("[hidden]", maskedString); + + // But can be retrieved via config + JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); + org.apache.kafka.common.config.types.Password password = + config.getPassword(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG); + assertEquals("mySecretPassword123", password.value()); + } + + @Test + public void testQueryMaskedWithEmptyStringBehavior() { + // Test behavior with empty string + Map props = new HashMap<>(); + props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); + props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); + props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); + props.put(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, ""); + + JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); + + // getQueryString() should return empty string + String actualQuery = config.getQuery(); + assertEquals("", actualQuery); + } } diff --git a/src/test/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidationTest.java b/src/test/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidationTest.java index d1ab169a4..341d0506b 100644 --- a/src/test/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidationTest.java +++ b/src/test/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidationTest.java @@ -777,5 +777,96 @@ public void validate_withModeBulkWithNewIncrementingMapping_setsError() { assertErrors(MODE_CONFIG, 1); assertErrorMatches(MODE_CONFIG, ".*Incrementing column configurations should not be provided.*"); } + + // ========== Query and Query.Masked Config Tests ========== + + @Test + public void validate_withBothQueryAndQueryMasked_setsError() { + props.put(MODE_CONFIG, MODE_BULK); + props.put(TABLE_WHITELIST_CONFIG, "table1,table2"); + props.put(QUERY_CONFIG, "SELECT * FROM users"); + props.put(QUERY_MASKED_CONFIG, "SELECT * FROM sensitive_data"); + + validate(); + + assertErrors(2); + assertErrors(QUERY_CONFIG, 1); + assertErrors(QUERY_MASKED_CONFIG, 1); + assertErrorMatches(QUERY_CONFIG, ".*Both 'query' and 'query.masked' configs cannot be set.*"); + assertErrorMatches(QUERY_MASKED_CONFIG, ".*Both 'query' and 'query.masked' configs cannot be set.*"); + } + + @Test + public void validate_withOnlyQuery_noErrors() { + props.put(MODE_CONFIG, MODE_BULK); + props.put(TABLE_WHITELIST_CONFIG, "table1,table2"); + props.put(QUERY_CONFIG, "SELECT * FROM users WHERE active = true"); + + validate(); + + assertNoErrors(); + } + + @Test + public void validate_withOnlyQueryMasked_noErrors() { + props.put(MODE_CONFIG, MODE_BULK); + props.put(TABLE_WHITELIST_CONFIG, "table1,table2"); + props.put(QUERY_MASKED_CONFIG, "SELECT * FROM users WHERE active = true"); + + validate(); + + assertNoErrors(); + } + + @Test + public void validate_withQueryMaskedContainingComplexQuery_noErrors() { + props.put(MODE_CONFIG, MODE_BULK); + props.put(TABLE_WHITELIST_CONFIG, "table1,table2"); + // Test with a complex query containing multiple joins + String complexQuery = "SELECT a.id, a.name, b.email, c.address, d.phone " + + "FROM users a " + + "INNER JOIN emails b ON a.id = b.user_id " + + "LEFT JOIN addresses c ON a.id = c.user_id " + + "LEFT JOIN phones d ON a.id = d.user_id " + + "WHERE a.created_at > '2024-01-01' AND a.status = 'active'"; + props.put(QUERY_MASKED_CONFIG, complexQuery); + + validate(); + + assertNoErrors(); + } + + @Test + public void validate_withQueryMaskedContainingVeryLongQuery_noErrors() { + props.put(MODE_CONFIG, MODE_BULK); + props.put(TABLE_WHITELIST_CONFIG, "table1,table2"); + // Test with a very long query to ensure no length restrictions + StringBuilder longQuery = new StringBuilder("SELECT "); + for (int i = 0; i < 100; i++) { + longQuery.append("column").append(i); + if (i < 99) { + longQuery.append(", "); + } + } + longQuery.append(" FROM large_table WHERE condition = 'test'"); + props.put(QUERY_MASKED_CONFIG, longQuery.toString()); + + validate(); + + assertNoErrors(); + } + + @Test + public void validate_withBothQueryAndQueryMaskedEmpty_noErrors() { + props.put(MODE_CONFIG, MODE_BULK); + props.put(TABLE_WHITELIST_CONFIG, "table1,table2"); + // Both empty should be fine as it's equivalent to neither being set + props.put(QUERY_CONFIG, ""); + props.put(QUERY_MASKED_CONFIG, ""); + + validate(); + + assertNoErrors(); + } } From bb933668ce87032d379fff55d7ac822cb0b88564 Mon Sep 17 00:00:00 2001 From: KGaneshDatta Date: Thu, 13 Nov 2025 13:35:19 +0530 Subject: [PATCH 2/7] Fix checkstyle errors and added tests --- .../source/JdbcSourceConnectorConfig.java | 53 +++++-- .../JdbcSourceConnectorValidation.java | 2 +- .../source/JdbcSourceConnectorConfigTest.java | 140 ------------------ .../JdbcSourceConnectorValidationTest.java | 20 --- 4 files changed, 39 insertions(+), 176 deletions(-) diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java index 4a5e105da..64d44659c 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java @@ -335,21 +335,22 @@ public class JdbcSourceConnectorConfig extends AbstractConfig { + "to this query (i.e. no WHERE clauses may be used). If you use a WHERE clause, it must " + "handle incremental queries itself."; public static final String QUERY_DEFAULT = ""; - private static final String QUERY_DISPLAY = "Query"; + private static final String QUERY_DISPLAY = "Query (Deprecated)"; public static final String QUERY_MASKED_CONFIG = "query.masked"; private static final String QUERY_MASKED_DOC = "If specified, the query to perform to select new or updated rows. This is the same as " - + "'query' config but with Type.PASSWORD to mask the query value from being visible to users. " - + "Use this setting when your query contains sensitive information. If used, this connector " - + "will only copy data using this query -- whole-table copying will be disabled. Different " - + "query modes may still be used for incremental updates, but in order to properly construct " - + "the incremental query, it must be possible to append a WHERE clause to this query (i.e. no " - + "WHERE clauses may be used). If you use a WHERE clause, it must handle incremental queries " - + "itself. Note: Only one of 'query' or 'query.masked' should be set, not both."; + + "'query' config but with Type.PASSWORD to mask the query value from being visible " + + "to users. Use this setting when your query contains sensitive information. " + + "If used, this connector will only copy data using this query -- whole-table " + + "copying will be disabled. Different query modes may still be used for " + + "incremental updates, but in order to properly construct the incremental query," + + " it must be possible to append a WHERE clause to this query (i.e. no " + + "WHERE clauses may be used). If you use a WHERE clause, it must handle " + + "incremental queries itself. Note: Only one of 'query' or 'query.masked' " + + "should be set, not both."; private static final String QUERY_MASKED_DISPLAY = "Query (Masked)"; - public static final String TOPIC_PREFIX_CONFIG = "topic.prefix"; private static final String TOPIC_PREFIX_DOC = "Prefix to prepend to table names to generate the name of the Kafka topic to publish data " @@ -839,6 +840,13 @@ private static void addSchemaAndDialectOptions(ConfigDef config, int orderInGrou private static final void addModeOptions(ConfigDef config) { int orderInGroup = 0; + orderInGroup = defineModeConfig(config, orderInGroup); + orderInGroup = defineIncrementTimestampConfigs(config, orderInGroup); + orderInGroup = defineQueryAndQuoteConfigs(config, orderInGroup); + defineTransactionAndRetryConfigs(config, orderInGroup); + } + + private static int defineModeConfig(ConfigDef config, int orderInGroup) { config.define( MODE_CONFIG, Type.STRING, @@ -863,7 +871,12 @@ private static final void addModeOptions(ConfigDef config) { TIMESTAMP_COLUMN_MAPPING_CONFIG, VALIDATE_NON_NULL_CONFIG ) - ).define( + ); + return orderInGroup; + } + + private static int defineIncrementTimestampConfigs(ConfigDef config, int orderInGroup) { + config.define( INCREMENTING_COLUMN_NAME_CONFIG, Type.STRING, INCREMENTING_COLUMN_NAME_DEFAULT, @@ -931,7 +944,12 @@ private static final void addModeOptions(ConfigDef config) { Width.SHORT, VALIDATE_NON_NULL_DISPLAY, MODE_DEPENDENTS_RECOMMENDER - ).define( + ); + return orderInGroup; + } + + private static int defineQueryAndQuoteConfigs(ConfigDef config, int orderInGroup) { + config.define( QUERY_CONFIG, Type.STRING, QUERY_DEFAULT, @@ -972,7 +990,12 @@ private static final void addModeOptions(ConfigDef config) { ++orderInGroup, Width.MEDIUM, QUERY_SUFFIX_DISPLAY - ).define( + ); + return orderInGroup; + } + + private static void defineTransactionAndRetryConfigs(ConfigDef config, int orderInGroup) { + config.define( TRANSACTION_ISOLATION_MODE_CONFIG, Type.STRING, TRANSACTION_ISOLATION_MODE_DEFAULT, @@ -1458,13 +1481,13 @@ public List incrementingColMappingRegexes() { /** * Get query string from either query.masked (Type.PASSWORD) or query (Type.STRING) config. - * Prioritizes query.masked if set, otherwise falls back to query config for backward compatibility. + * Prioritizes query.masked if set, otherwise falls back to query config for backward + * compatibility. * * @return The query string from whichever config is set, or empty string if neither is set. */ public String getQuery() { - // First check if query.masked is set - org.apache.kafka.common.config.types.Password maskedQuery = getPassword(QUERY_MASKED_CONFIG); + Password maskedQuery = getPassword(QUERY_MASKED_CONFIG); if (maskedQuery != null && maskedQuery.value() != null && !maskedQuery.value().isEmpty()) { return maskedQuery.value(); } diff --git a/src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java b/src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java index 6f794097f..305cf068c 100644 --- a/src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java +++ b/src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java @@ -306,7 +306,7 @@ private boolean validateTableInclusionConfigs() { return true; } - /** + /** * Validate that only one of query or query.masked configs is set at a time. * Both configs should not be set simultaneously to avoid ambiguity. */ diff --git a/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java b/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java index 9e8d8c074..c2ea1f4b0 100644 --- a/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java +++ b/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java @@ -453,45 +453,6 @@ public void testQueryMaskedValueIsMaskedInConfigValue() { assertFalse(maskedString.contains(sensitiveQuery)); } - @Test - public void testQueryMaskedValueCanBeRetrievedViaConfig() { - // Setup config with query.masked - Map props = new HashMap<>(); - props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); - props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); - props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); - String expectedQuery = "SELECT * FROM users WHERE status = 'active'"; - props.put(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, expectedQuery); - - // Create config - JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); - - // The actual value can be retrieved when needed - org.apache.kafka.common.config.types.Password password = - config.getPassword(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG); - assertNotNull(password); - - String actualQuery = password.value(); - assertEquals(expectedQuery, actualQuery); - } - - @Test - public void testGetQueryStringReturnsQueryMaskedWhenSet() { - // Setup config with query.masked (but not query) - Map props = new HashMap<>(); - props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); - props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); - props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); - String expectedQuery = "SELECT id, name, email FROM users"; - props.put(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, expectedQuery); - - JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); - - // getQueryString() should return the masked query value - String actualQuery = config.getQuery(); - assertEquals(expectedQuery, actualQuery); - } - @Test public void testGetQueryStringReturnsQueryWhenOnlyQuerySet() { // Setup config with only query (not query.masked) @@ -504,31 +465,10 @@ public void testGetQueryStringReturnsQueryWhenOnlyQuerySet() { JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); - // getQueryString() should return the regular query value String actualQuery = config.getQuery(); assertEquals(expectedQuery, actualQuery); } - @Test - public void testGetQueryStringPrioritizesQueryMaskedOverQuery() { - // Setup config with both query and query.masked - // (This violates validation but tests the priority logic) - Map props = new HashMap<>(); - props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); - props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); - props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); - String regularQuery = "SELECT * FROM table1"; - String maskedQuery = "SELECT * FROM sensitive_table"; - props.put(JdbcSourceConnectorConfig.QUERY_CONFIG, regularQuery); - props.put(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, maskedQuery); - - JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); - - // getQueryString() should prioritize query.masked - String actualQuery = config.getQuery(); - assertEquals(maskedQuery, actualQuery); - } - @Test public void testGetQueryStringReturnsEmptyWhenNeitherSet() { // Setup config without query or query.masked @@ -569,43 +509,10 @@ public void testQueryMaskedSupportsComplexQueryWithMultipleJoins() { JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); - // Verify the full complex query can be retrieved String retrievedQuery = config.getQuery(); assertEquals(complexQuery, retrievedQuery); } - @Test - public void testQueryMaskedSupportsVeryLongQueries() { - // Test that very long queries (thousands of characters) work fine - Map props = new HashMap<>(); - props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); - props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); - props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); - - // Build a very long query (over 2000 characters) - StringBuilder longQuery = new StringBuilder("SELECT "); - for (int i = 0; i < 100; i++) { - longQuery.append("column_").append(i).append(", "); - } - longQuery.append("id FROM very_wide_table WHERE "); - for (int i = 0; i < 50; i++) { - longQuery.append("column_").append(i).append(" IS NOT NULL"); - if (i < 49) { - longQuery.append(" AND "); - } - } - - String expectedQuery = longQuery.toString(); - props.put(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, expectedQuery); - - JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); - - // Verify the full long query can be retrieved without truncation - String retrievedQuery = config.getQuery(); - assertEquals(expectedQuery, retrievedQuery); - assertTrue(retrievedQuery.length() > 2000); - } - @Test public void testQueryMaskedSupportsSpecialCharacters() { // Test that queries with special characters work fine @@ -629,52 +536,6 @@ public void testQueryMaskedSupportsSpecialCharacters() { assertEquals(queryWithSpecialChars, retrievedQuery); } - @Test - public void testPasswordObjectToStringShowsHidden() { - // Test that Password type properly masks the value in toString() - Map props = new HashMap<>(); - props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); - props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); - props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); - String secretQuery = "SELECT * FROM secret_table WHERE api_key = 'secret123'"; - props.put(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, secretQuery); - - JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); - org.apache.kafka.common.config.types.Password password = - config.getPassword(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG); - - // toString() should hide the value - String passwordString = password.toString(); - assertEquals("[hidden]", passwordString); - - // But value() should return the actual query - String actualValue = password.value(); - assertEquals(secretQuery, actualValue); - } - - @Test - public void testConnectionPasswordIsSimilarlyMasked() { - // Compare with existing connection.password config to ensure consistent behavior - Map props = new HashMap<>(); - props.put(JdbcSourceConnectorConfig.CONNECTION_URL_CONFIG, "jdbc:postgresql://localhost:5432/testdb"); - props.put(JdbcSourceConnectorConfig.CONNECTION_USER_CONFIG, "testUser"); - props.put(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG, "mySecretPassword123"); - props.put(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, "table1,table2"); - - Map validatedConfig = JdbcSourceConnectorConfig.CONFIG_DEF.validateAll(props); - ConfigValue passwordValue = validatedConfig.get(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG); - - // Password should be masked in ConfigValue - String maskedString = passwordValue.value().toString(); - assertEquals("[hidden]", maskedString); - - // But can be retrieved via config - JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); - org.apache.kafka.common.config.types.Password password = - config.getPassword(JdbcSourceConnectorConfig.CONNECTION_PASSWORD_CONFIG); - assertEquals("mySecretPassword123", password.value()); - } - @Test public void testQueryMaskedWithEmptyStringBehavior() { // Test behavior with empty string @@ -686,7 +547,6 @@ public void testQueryMaskedWithEmptyStringBehavior() { JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); - // getQueryString() should return empty string String actualQuery = config.getQuery(); assertEquals("", actualQuery); } diff --git a/src/test/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidationTest.java b/src/test/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidationTest.java index 341d0506b..443e74723 100644 --- a/src/test/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidationTest.java +++ b/src/test/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidationTest.java @@ -836,26 +836,6 @@ public void validate_withQueryMaskedContainingComplexQuery_noErrors() { assertNoErrors(); } - @Test - public void validate_withQueryMaskedContainingVeryLongQuery_noErrors() { - props.put(MODE_CONFIG, MODE_BULK); - props.put(TABLE_WHITELIST_CONFIG, "table1,table2"); - // Test with a very long query to ensure no length restrictions - StringBuilder longQuery = new StringBuilder("SELECT "); - for (int i = 0; i < 100; i++) { - longQuery.append("column").append(i); - if (i < 99) { - longQuery.append(", "); - } - } - longQuery.append(" FROM large_table WHERE condition = 'test'"); - props.put(QUERY_MASKED_CONFIG, longQuery.toString()); - - validate(); - - assertNoErrors(); - } - @Test public void validate_withBothQueryAndQueryMaskedEmpty_noErrors() { props.put(MODE_CONFIG, MODE_BULK); From 330e9fec7182dbb4a46c7035023db18060b531d5 Mon Sep 17 00:00:00 2001 From: KGaneshDatta Date: Fri, 14 Nov 2025 09:06:31 +0530 Subject: [PATCH 3/7] Making query masked config internal and fixed validation --- .../source/JdbcSourceConnectorConfig.java | 19 +++++++++- .../JdbcSourceConnectorValidation.java | 38 +++++++++++++++++-- 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java index 64d44659c..2d4954f01 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java @@ -435,6 +435,20 @@ public class JdbcSourceConnectorConfig extends AbstractConfig { private static final EnumRecommender QUOTE_METHOD_RECOMMENDER = EnumRecommender.in(QuoteMethod.values()); + private static final ConfigDef.Recommender HIDDEN_RECOMMENDER = + new ConfigDef.Recommender() { + @Override + public java.util.List validValues( + String name, java.util.Map parsedConfig) { + return java.util.Collections.emptyList(); + } + + @Override + public boolean visible(String name, java.util.Map parsedConfig) { + return false; + } + }; + public static final String DATABASE_GROUP = "Database"; public static final String MODE_GROUP = "Mode"; public static final String CONNECTOR_GROUP = "Connector"; @@ -964,11 +978,12 @@ private static int defineQueryAndQuoteConfigs(ConfigDef config, int orderInGroup Type.PASSWORD, QUERY_DEFAULT, Importance.MEDIUM, - QUERY_MASKED_DOC, + QUERY_DOC, MODE_GROUP, ++orderInGroup, Width.SHORT, - QUERY_MASKED_DISPLAY + QUERY_DISPLAY, + HIDDEN_RECOMMENDER ).define( QUOTE_SQL_IDENTIFIERS_CONFIG, Type.STRING, diff --git a/src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java b/src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java index 305cf068c..264cf1f6d 100644 --- a/src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java +++ b/src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java @@ -21,6 +21,7 @@ import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.TransactionIsolationMode; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigValue; +import org.apache.kafka.common.config.types.Password; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,6 +152,16 @@ private boolean validateMultiConfigs() { return true; } + private boolean hasAnyQueryConfig() { + String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG); + Password queryMasked = + config.getPassword(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG); + boolean hasQuery = query != null && !query.trim().isEmpty(); + boolean hasQueryMasked = + queryMasked != null && queryMasked.value() != null && !queryMasked.value().trim().isEmpty(); + return hasQuery || hasQueryMasked; + } + /** * Validate legacy/new config compatibility and requirements. * Implements the pattern: legacyKeys vs newKeys with early returns. @@ -159,6 +170,11 @@ private boolean validateLegacyNewConfigCompatibility() { // Define legacy and new config keys boolean usingLegacyConfigs = isUsingLegacyConfigs(); boolean usingNewConfigs = isUsingNewConfigs(); + boolean hasQuery = hasAnyQueryConfig(); + + if (hasQuery) { + return true; + } if (usingLegacyConfigs && usingNewConfigs) { return addConfigErrorsForLegacyAndNewConfigConflict(); @@ -274,6 +290,9 @@ private boolean addConfigErrorsForLegacyAndNewConfigConflict() { * Validate that at least one configuration is provided. */ private boolean addConfigErrorsForNoConfigProvided() { + if (hasAnyQueryConfig()) { + return true; + } String msg = "At least one table filtering configuration is required. " + "Provide one of: " + JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG + ", " + JdbcSourceConnectorConfig.TABLE_BLACKLIST_CONFIG + ", " @@ -312,7 +331,7 @@ private boolean validateTableInclusionConfigs() { */ private boolean validateQueryConfigs() { String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG); - org.apache.kafka.common.config.types.Password queryMasked = + Password queryMasked = config.getPassword(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG); boolean hasQuery = query != null && !query.isEmpty(); @@ -322,8 +341,7 @@ private boolean validateQueryConfigs() { if (hasQuery && hasQueryMasked) { String msg = "Both 'query' and 'query.masked' configs cannot be set at the same time. " - + "Please use only one of them. Use 'query.masked' (Type.PASSWORD) if you want to hide " - + "the query value from being visible, or use 'query' (Type.STRING) for regular usage."; + + "Please use only one of them."; addConfigError(JdbcSourceConnectorConfig.QUERY_CONFIG, msg); addConfigError(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, msg); @@ -332,6 +350,20 @@ private boolean validateQueryConfigs() { return false; } + if ((hasQuery || hasQueryMasked) && (isUsingLegacyConfigs() || isUsingNewConfigs())) { + String msg = + "Do not specify table filtering configs with 'query' or 'query.masked'. " + + "Remove table.whitelist / table.blacklist / table.include.list / " + + "table.exclude.list."; + addConfigError(JdbcSourceConnectorConfig.QUERY_CONFIG, msg); + addConfigError(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, msg); + addConfigError(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, msg); + addConfigError(JdbcSourceConnectorConfig.TABLE_BLACKLIST_CONFIG, msg); + addConfigError(JdbcSourceConnectorConfig.TABLE_INCLUDE_LIST_CONFIG, msg); + addConfigError(JdbcSourceConnectorConfig.TABLE_EXCLUDE_LIST_CONFIG, msg); + return false; + } + return true; } From b8d712ab1635d6499ad11798d402861e95b49d41 Mon Sep 17 00:00:00 2001 From: KGaneshDatta Date: Fri, 14 Nov 2025 09:39:13 +0530 Subject: [PATCH 4/7] Fix UTs with Validation errors --- .../jdbc/validation/JdbcSourceConnectorValidationTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/test/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidationTest.java b/src/test/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidationTest.java index 443e74723..942200139 100644 --- a/src/test/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidationTest.java +++ b/src/test/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidationTest.java @@ -799,7 +799,6 @@ public void validate_withBothQueryAndQueryMasked_setsError() { @Test public void validate_withOnlyQuery_noErrors() { props.put(MODE_CONFIG, MODE_BULK); - props.put(TABLE_WHITELIST_CONFIG, "table1,table2"); props.put(QUERY_CONFIG, "SELECT * FROM users WHERE active = true"); validate(); @@ -810,7 +809,6 @@ public void validate_withOnlyQuery_noErrors() { @Test public void validate_withOnlyQueryMasked_noErrors() { props.put(MODE_CONFIG, MODE_BULK); - props.put(TABLE_WHITELIST_CONFIG, "table1,table2"); props.put(QUERY_MASKED_CONFIG, "SELECT * FROM users WHERE active = true"); validate(); @@ -821,7 +819,6 @@ public void validate_withOnlyQueryMasked_noErrors() { @Test public void validate_withQueryMaskedContainingComplexQuery_noErrors() { props.put(MODE_CONFIG, MODE_BULK); - props.put(TABLE_WHITELIST_CONFIG, "table1,table2"); // Test with a complex query containing multiple joins String complexQuery = "SELECT a.id, a.name, b.email, c.address, d.phone " + "FROM users a " + From 3a22709f3f1749127cf04aedf3f771f3ae5d7619 Mon Sep 17 00:00:00 2001 From: KGaneshDatta Date: Fri, 14 Nov 2025 14:35:46 +0530 Subject: [PATCH 5/7] Update the documentation of new config --- .../source/JdbcSourceConnectorConfig.java | 22 ++++++------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java index 2d4954f01..334304532 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java @@ -335,20 +335,12 @@ public class JdbcSourceConnectorConfig extends AbstractConfig { + "to this query (i.e. no WHERE clauses may be used). If you use a WHERE clause, it must " + "handle incremental queries itself."; public static final String QUERY_DEFAULT = ""; - private static final String QUERY_DISPLAY = "Query (Deprecated)"; + private static final String QUERY_DISPLAY = "Query"; public static final String QUERY_MASKED_CONFIG = "query.masked"; private static final String QUERY_MASKED_DOC = - "If specified, the query to perform to select new or updated rows. This is the same as " - + "'query' config but with Type.PASSWORD to mask the query value from being visible " - + "to users. Use this setting when your query contains sensitive information. " - + "If used, this connector will only copy data using this query -- whole-table " - + "copying will be disabled. Different query modes may still be used for " - + "incremental updates, but in order to properly construct the incremental query," - + " it must be possible to append a WHERE clause to this query (i.e. no " - + "WHERE clauses may be used). If you use a WHERE clause, it must handle " - + "incremental queries itself. Note: Only one of 'query' or 'query.masked' " - + "should be set, not both."; + "Same as 'query' configuration but the query string is masked" + + "Use this config to prevent sensitive information from being logged."; private static final String QUERY_MASKED_DISPLAY = "Query (Masked)"; public static final String TOPIC_PREFIX_CONFIG = "topic.prefix"; @@ -439,12 +431,12 @@ public class JdbcSourceConnectorConfig extends AbstractConfig { new ConfigDef.Recommender() { @Override public java.util.List validValues( - String name, java.util.Map parsedConfig) { + String name, Map config) { return java.util.Collections.emptyList(); } @Override - public boolean visible(String name, java.util.Map parsedConfig) { + public boolean visible(String name, Map config) { return false; } }; @@ -978,11 +970,11 @@ private static int defineQueryAndQuoteConfigs(ConfigDef config, int orderInGroup Type.PASSWORD, QUERY_DEFAULT, Importance.MEDIUM, - QUERY_DOC, + QUERY_MASKED_DOC, MODE_GROUP, ++orderInGroup, Width.SHORT, - QUERY_DISPLAY, + QUERY_MASKED_DISPLAY, HIDDEN_RECOMMENDER ).define( QUOTE_SQL_IDENTIFIERS_CONFIG, From 9330777b19c1fa855bf599b562372de92f05318c Mon Sep 17 00:00:00 2001 From: KGaneshDatta Date: Fri, 21 Nov 2025 15:00:40 +0530 Subject: [PATCH 6/7] Update getQuery() method to return optional --- .../connect/jdbc/JdbcSourceConnector.java | 8 +- .../source/JdbcSourceConnectorConfig.java | 31 +++-- .../connect/jdbc/source/JdbcSourceTask.java | 9 +- .../jdbc/source/TableQuerierProcessor.java | 3 +- .../JdbcSourceConnectorValidation.java | 107 ++++++++---------- .../source/JdbcSourceConnectorConfigTest.java | 21 ++-- 6 files changed, 81 insertions(+), 98 deletions(-) diff --git a/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java b/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java index a460ad08f..9cfa4a792 100644 --- a/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java +++ b/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java @@ -117,8 +117,7 @@ public void start(Map properties) throws ConnectException { List excludeList = config.tableExcludeListRegexes(); Set excludeListSet = excludeList.isEmpty() ? null : new HashSet<>(excludeList); - String query = config.getQuery(); - if (!query.isEmpty()) { + if (config.getQuery().isPresent()) { if (whitelistSet != null || blacklistSet != null || includeListSet != null || excludeListSet != null) { log.error( @@ -145,7 +144,7 @@ public void start(Map properties) throws ConnectException { excludeListSet, // New Time.SYSTEM ); - if (query.isEmpty()) { + if (!config.getQuery().isPresent()) { tableMonitorThread.start(); log.info("Starting Table Monitor Thread"); } @@ -170,9 +169,8 @@ public Config validate(Map connectorConfigs) { @Override public List> taskConfigs(int maxTasks) { log.info("Starting with the task Configuration method."); - String query = config.getQuery(); List> taskConfigs; - if (!query.isEmpty()) { + if (config.getQuery().isPresent()) { log.info("Custom query provided, generating task configuration for the query"); Map taskProps = new HashMap<>(configProperties); taskProps.put(JdbcSourceTaskConfig.TABLES_CONFIG, ""); diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java index 334304532..53feae5dc 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java @@ -19,14 +19,7 @@ import java.sql.Timestamp; import java.time.Duration; import java.time.ZoneId; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.atomic.AtomicReference; import com.microsoft.sqlserver.jdbc.SQLServerConnection; @@ -1487,19 +1480,23 @@ public List incrementingColMappingRegexes() { } /** - * Get query string from either query.masked (Type.PASSWORD) or query (Type.STRING) config. - * Prioritizes query.masked if set, otherwise falls back to query config for backward - * compatibility. - * - * @return The query string from whichever config is set, or empty string if neither is set. - */ - public String getQuery() { + * Get the query string from either query or query.masked config. + * Prioritizes query.masked over query if both are set (though validation should prevent this). + * + * @return Optional containing the query string if present, empty Optional otherwise. + */ + public Optional getQuery() { Password maskedQuery = getPassword(QUERY_MASKED_CONFIG); if (maskedQuery != null && maskedQuery.value() != null && !maskedQuery.value().isEmpty()) { - return maskedQuery.value(); + return Optional.of(maskedQuery.value()); + } + + String query = getString(QUERY_CONFIG); + if (query != null && !query.isEmpty()) { + return Optional.of(query); } - return getString(QUERY_CONFIG); + return Optional.empty(); } public boolean modeUsesTimestampColumn() { diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java index 402289233..093c6f7e4 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java @@ -95,10 +95,9 @@ public void start(Map properties) { List tables = config.getList(JdbcSourceTaskConfig.TABLES_CONFIG); Boolean tablesFetched = config.getBoolean(JdbcSourceTaskConfig.TABLES_FETCHED); - String query = config.getQuery(); List tableType = config.getList(JdbcSourceConnectorConfig.TABLE_TYPE_CONFIG); - if ((tables.isEmpty() && query.isEmpty())) { + if ((tables.isEmpty() && !config.getQuery().isPresent())) { // We are still waiting for the tables call to complete. // Start task but do nothing. if (!tablesFetched) { @@ -115,7 +114,7 @@ public void start(Map properties) { + " table name."); } - if ((!tables.isEmpty() && !query.isEmpty())) { + if ((!tables.isEmpty() && config.getQuery().isPresent())) { throw new ConfigException("Invalid configuration: a JdbcSourceTask" + " cannot have both a table and a query assigned to it"); } @@ -147,10 +146,10 @@ public void start(Map properties) { ) ) ); - TableQuerier.QueryMode queryMode = !query.isEmpty() ? TableQuerier.QueryMode.QUERY : + TableQuerier.QueryMode queryMode = config.getQuery().isPresent() ? TableQuerier.QueryMode.QUERY : TableQuerier.QueryMode.TABLE; final List tablesOrQuery = queryMode == TableQuerier.QueryMode.QUERY - ? Collections.singletonList(query) : tables; + ? Collections.singletonList(config.getQuery().get()) : tables; String mode = config.getString(JdbcSourceTaskConfig.MODE_CONFIG); //used only in table mode diff --git a/src/main/java/io/confluent/connect/jdbc/source/TableQuerierProcessor.java b/src/main/java/io/confluent/connect/jdbc/source/TableQuerierProcessor.java index 7eb5924f1..3f5d2e97d 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/TableQuerierProcessor.java +++ b/src/main/java/io/confluent/connect/jdbc/source/TableQuerierProcessor.java @@ -95,8 +95,7 @@ private boolean isReadyToProcess() { // If the call to get tables has not completed we will not do anything. // This is only valid in table mode. Boolean tablesFetched = config.getBoolean(JdbcSourceTaskConfig.TABLES_FETCHED); - String query = config.getQuery(); - return !query.isEmpty() || tablesFetched; + return config.getQuery().isPresent() || tablesFetched; } private void processQuerier(RecordDestination destination, TableQuerier querier) diff --git a/src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java b/src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java index 264cf1f6d..a99dd94fc 100644 --- a/src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java +++ b/src/main/java/io/confluent/connect/jdbc/validation/JdbcSourceConnectorValidation.java @@ -21,7 +21,6 @@ import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.TransactionIsolationMode; import org.apache.kafka.common.config.Config; import org.apache.kafka.common.config.ConfigValue; -import org.apache.kafka.common.config.types.Password; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,17 +89,17 @@ && validateLegacyNewConfigCompatibility() } validationResult = validationResult && validatePluginSpecificNeeds(); - + if (!validationResult) { log.info("Validation failed"); } else { log.info("Validation succeeded"); } - + } catch (Exception e) { log.error("Error during validation", e); } - + return this.validationResult; } @@ -152,16 +151,6 @@ private boolean validateMultiConfigs() { return true; } - private boolean hasAnyQueryConfig() { - String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG); - Password queryMasked = - config.getPassword(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG); - boolean hasQuery = query != null && !query.trim().isEmpty(); - boolean hasQueryMasked = - queryMasked != null && queryMasked.value() != null && !queryMasked.value().trim().isEmpty(); - return hasQuery || hasQueryMasked; - } - /** * Validate legacy/new config compatibility and requirements. * Implements the pattern: legacyKeys vs newKeys with early returns. @@ -170,9 +159,8 @@ private boolean validateLegacyNewConfigCompatibility() { // Define legacy and new config keys boolean usingLegacyConfigs = isUsingLegacyConfigs(); boolean usingNewConfigs = isUsingNewConfigs(); - boolean hasQuery = hasAnyQueryConfig(); - if (hasQuery) { + if (config.getQuery().isPresent()) { return true; } @@ -196,18 +184,18 @@ private boolean isUsingLegacyConfigs() { Set blacklistSet = config.getTableBlacklistSet(); String incrementingColumnName = config.getIncrementingColumnName(); List timestampColumnName = config.getTimestampColumnName(); - + boolean hasWhitelist = !whitelistSet.isEmpty(); boolean hasBlacklist = !blacklistSet.isEmpty(); - boolean hasLegacyIncrementing = incrementingColumnName != null + boolean hasLegacyIncrementing = incrementingColumnName != null && !incrementingColumnName.trim().isEmpty(); - boolean hasLegacyTimestamp = timestampColumnName != null - && !timestampColumnName.isEmpty() + boolean hasLegacyTimestamp = timestampColumnName != null + && !timestampColumnName.isEmpty() && !timestampColumnName.get(0).trim().isEmpty(); - + return hasWhitelist || hasBlacklist || hasLegacyIncrementing || hasLegacyTimestamp; } - + /** * Check if any new config keys are being used. * New keys: table.include.list, table.exclude.list, incrementing.column.mapping, @@ -218,17 +206,17 @@ private boolean isUsingNewConfigs() { Set excludeListSet = config.getTableExcludeListSet(); List incrementingColumnMapping = config.getIncrementingColumnMapping(); List timestampColumnsMapping = config.getTimestampColumnMapping(); - + boolean hasIncludeList = !includeListSet.isEmpty(); boolean hasExcludeList = !excludeListSet.isEmpty(); - boolean hasNewIncrementing = incrementingColumnMapping != null + boolean hasNewIncrementing = incrementingColumnMapping != null && !incrementingColumnMapping.isEmpty(); - boolean hasNewTimestamp = timestampColumnsMapping != null + boolean hasNewTimestamp = timestampColumnsMapping != null && !timestampColumnsMapping.isEmpty(); - + return hasIncludeList || hasExcludeList || hasNewIncrementing || hasNewTimestamp; } - + /** * Validate conflict between legacy and new configs. * Only add errors to configs that are actually present and conflicting. @@ -240,57 +228,57 @@ private boolean addConfigErrorsForLegacyAndNewConfigConflict() { + "(table.include.list, table.exclude.list, timestamp.columns.mapping, " + "incrementing.column.mapping). Please choose one approach: either use all legacy " + "configurations or all new configurations."; - + // Only add errors to configs that are actually present and non-empty Set whitelistSet = config.getTableWhitelistSet(); if (!whitelistSet.isEmpty()) { addConfigError(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, msg); } - + Set blacklistSet = config.getTableBlacklistSet(); if (!blacklistSet.isEmpty()) { addConfigError(JdbcSourceConnectorConfig.TABLE_BLACKLIST_CONFIG, msg); } - + Set includeListSet = config.getTableIncludeListSet(); if (!includeListSet.isEmpty()) { addConfigError(JdbcSourceConnectorConfig.TABLE_INCLUDE_LIST_CONFIG, msg); } - + Set excludeListSet = config.getTableExcludeListSet(); if (!excludeListSet.isEmpty()) { addConfigError(JdbcSourceConnectorConfig.TABLE_EXCLUDE_LIST_CONFIG, msg); } - + List timestampColumnName = config.getTimestampColumnName(); - if (timestampColumnName != null && !timestampColumnName.isEmpty() + if (timestampColumnName != null && !timestampColumnName.isEmpty() && !timestampColumnName.get(0).trim().isEmpty()) { addConfigError(JdbcSourceConnectorConfig.TIMESTAMP_COLUMN_NAME_CONFIG, msg); } - + List timestampColumnsMapping = config.getTimestampColumnMapping(); if (timestampColumnsMapping != null && !timestampColumnsMapping.isEmpty()) { addConfigError(JdbcSourceConnectorConfig.TIMESTAMP_COLUMN_MAPPING_CONFIG, msg); } - + String incrementingColumnName = config.getIncrementingColumnName(); if (incrementingColumnName != null && !incrementingColumnName.trim().isEmpty()) { addConfigError(JdbcSourceConnectorConfig.INCREMENTING_COLUMN_NAME_CONFIG, msg); } - + List incrementingColumnMapping = config.getIncrementingColumnMapping(); if (incrementingColumnMapping != null && !incrementingColumnMapping.isEmpty()) { addConfigError(JdbcSourceConnectorConfig.INCREMENTING_COLUMN_MAPPING_CONFIG, msg); } - + return false; } - + /** * Validate that at least one configuration is provided. */ private boolean addConfigErrorsForNoConfigProvided() { - if (hasAnyQueryConfig()) { + if (config.getQuery().isPresent()) { return true; } String msg = "At least one table filtering configuration is required. " @@ -298,30 +286,30 @@ private boolean addConfigErrorsForNoConfigProvided() { + JdbcSourceConnectorConfig.TABLE_BLACKLIST_CONFIG + ", " + JdbcSourceConnectorConfig.TABLE_INCLUDE_LIST_CONFIG + ", or " + JdbcSourceConnectorConfig.TABLE_EXCLUDE_LIST_CONFIG + "."; - + addConfigError(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, msg); addConfigError(JdbcSourceConnectorConfig.TABLE_BLACKLIST_CONFIG, msg); addConfigError(JdbcSourceConnectorConfig.TABLE_INCLUDE_LIST_CONFIG, msg); addConfigError(JdbcSourceConnectorConfig.TABLE_EXCLUDE_LIST_CONFIG, msg); return false; } - + /** * Validate new config requirements (when using new configs only). */ private boolean validateTableInclusionConfigs() { Set includeListSet = config.getTableIncludeListSet(); Set excludeListSet = config.getTableExcludeListSet(); - + // Validate that exclude list requires include list if (!excludeListSet.isEmpty() && includeListSet.isEmpty()) { - String msg = JdbcSourceConnectorConfig.TABLE_EXCLUDE_LIST_CONFIG + String msg = JdbcSourceConnectorConfig.TABLE_EXCLUDE_LIST_CONFIG + " cannot be used without " + JdbcSourceConnectorConfig.TABLE_INCLUDE_LIST_CONFIG + ". Exclude list only applies to tables that match the include list."; addConfigError(JdbcSourceConnectorConfig.TABLE_EXCLUDE_LIST_CONFIG, msg); return false; } - + return true; } @@ -331,13 +319,15 @@ private boolean validateTableInclusionConfigs() { */ private boolean validateQueryConfigs() { String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG); - Password queryMasked = + String queryMaskedValue = null; + org.apache.kafka.common.config.types.Password queryMasked = config.getPassword(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG); + if (queryMasked != null && queryMasked.value() != null) { + queryMaskedValue = queryMasked.value(); + } boolean hasQuery = query != null && !query.isEmpty(); - boolean hasQueryMasked = queryMasked != null - && queryMasked.value() != null - && !queryMasked.value().isEmpty(); + boolean hasQueryMasked = queryMaskedValue != null && !queryMaskedValue.isEmpty(); if (hasQuery && hasQueryMasked) { String msg = "Both 'query' and 'query.masked' configs cannot be set at the same time. " @@ -350,11 +340,12 @@ private boolean validateQueryConfigs() { return false; } - if ((hasQuery || hasQueryMasked) && (isUsingLegacyConfigs() || isUsingNewConfigs())) { + if (config.getQuery().isPresent() && (isUsingLegacyConfigs() || isUsingNewConfigs())) { String msg = "Do not specify table filtering configs with 'query' or 'query.masked'. " + "Remove table.whitelist / table.blacklist / table.include.list / " - + "table.exclude.list."; + + "table.exclude.list when using query mode" + + " or 'query' / 'query.masked' when using table filtering mode."; addConfigError(JdbcSourceConnectorConfig.QUERY_CONFIG, msg); addConfigError(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, msg); addConfigError(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, msg); @@ -397,7 +388,7 @@ private boolean validateTsColProvidedWhenRequired() { List timestampColumnsMapping = config.getTimestampColumnMapping(); boolean hasNewTimestampConfig = timestampColumnsMapping != null && !timestampColumnsMapping.isEmpty(); - + if (!hasNewTimestampConfig) { String msg = String.format( "Timestamp column configuration must be provided when using mode '%s' or '%s'. " @@ -420,10 +411,10 @@ private boolean validateTsColProvidedWhenRequired() { private boolean validateTsColNotProvidedWhenNotRequired() { if (!config.modeUsesTimestampColumn()) { List timestampColumnsMapping = config.getTimestampColumnMapping(); - + boolean hasNewTimestampConfig = timestampColumnsMapping != null && !timestampColumnsMapping.isEmpty(); - + if (hasNewTimestampConfig) { String msg = String.format( "Timestamp column configurations should not be provided if mode is not '%s' or '%s'. " @@ -448,10 +439,10 @@ private boolean validateTsColNotProvidedWhenNotRequired() { private boolean validateIncrColProvidedWhenRequired() { if (config.modeUsesIncrementingColumn()) { List incrementingColumnMapping = config.getIncrementingColumnMapping(); - + boolean hasNewIncrementingConfig = incrementingColumnMapping != null && !incrementingColumnMapping.isEmpty(); - + if (!hasNewIncrementingConfig) { String msg = String.format( "Incrementing column configuration must be provided when using mode '%s' or '%s'. " @@ -474,10 +465,10 @@ private boolean validateIncrColProvidedWhenRequired() { private boolean validateIncrColumnNotProvidedWhenNotRequired() { if (!config.modeUsesIncrementingColumn()) { List incrementingColumnMapping = config.getIncrementingColumnMapping(); - + boolean hasNewIncrementingConfig = incrementingColumnMapping != null && !incrementingColumnMapping.isEmpty(); - + if (hasNewIncrementingConfig) { String msg = String.format( "Incrementing column configurations " @@ -504,4 +495,4 @@ protected void addConfigError(String configName, String errorMessage) { .ifPresent(cv -> cv.addErrorMessage(errorMessage)); } -} +} \ No newline at end of file diff --git a/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java b/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java index c2ea1f4b0..f90877b25 100644 --- a/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java +++ b/src/test/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfigTest.java @@ -465,8 +465,8 @@ public void testGetQueryStringReturnsQueryWhenOnlyQuerySet() { JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); - String actualQuery = config.getQuery(); - assertEquals(expectedQuery, actualQuery); + assertTrue(config.getQuery().isPresent()); + assertEquals(expectedQuery, config.getQuery().get()); } @Test @@ -479,9 +479,8 @@ public void testGetQueryStringReturnsEmptyWhenNeitherSet() { JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); - // getQueryString() should return empty string - String actualQuery = config.getQuery(); - assertEquals("", actualQuery); + // getQuery() should return empty Optional + assertFalse(config.getQuery().isPresent()); } @Test @@ -509,8 +508,8 @@ public void testQueryMaskedSupportsComplexQueryWithMultipleJoins() { JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); - String retrievedQuery = config.getQuery(); - assertEquals(complexQuery, retrievedQuery); + assertTrue(config.getQuery().isPresent()); + assertEquals(complexQuery, config.getQuery().get()); } @Test @@ -532,8 +531,8 @@ public void testQueryMaskedSupportsSpecialCharacters() { JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); // Verify special characters are preserved - String retrievedQuery = config.getQuery(); - assertEquals(queryWithSpecialChars, retrievedQuery); + assertTrue(config.getQuery().isPresent()); + assertEquals(queryWithSpecialChars, config.getQuery().get()); } @Test @@ -547,7 +546,7 @@ public void testQueryMaskedWithEmptyStringBehavior() { JdbcSourceConnectorConfig config = new JdbcSourceConnectorConfig(props); - String actualQuery = config.getQuery(); - assertEquals("", actualQuery); + // Empty string should return empty Optional + assertFalse(config.getQuery().isPresent()); } } From 48350cad7a3c27c3a3bf7c965ce5a383b868b826 Mon Sep 17 00:00:00 2001 From: KGaneshDatta Date: Fri, 21 Nov 2025 15:40:56 +0530 Subject: [PATCH 7/7] Minor checkstyle fixes --- .../connect/jdbc/source/JdbcSourceConnectorConfig.java | 10 +++++++++- .../confluent/connect/jdbc/source/JdbcSourceTask.java | 4 ++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java index 53feae5dc..53ff94717 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceConnectorConfig.java @@ -19,7 +19,15 @@ import java.sql.Timestamp; import java.time.Duration; import java.time.ZoneId; -import java.util.*; +import java.util.Arrays; +import java.util.HashSet; +import java.util.HashMap; +import java.util.Map; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.Optional; +import java.util.Locale; import java.util.concurrent.atomic.AtomicReference; import com.microsoft.sqlserver.jdbc.SQLServerConnection; diff --git a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java index 093c6f7e4..0068ba4ec 100644 --- a/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java +++ b/src/main/java/io/confluent/connect/jdbc/source/JdbcSourceTask.java @@ -146,8 +146,8 @@ public void start(Map properties) { ) ) ); - TableQuerier.QueryMode queryMode = config.getQuery().isPresent() ? TableQuerier.QueryMode.QUERY : - TableQuerier.QueryMode.TABLE; + TableQuerier.QueryMode queryMode = + config.getQuery().isPresent() ? TableQuerier.QueryMode.QUERY : TableQuerier.QueryMode.TABLE; final List tablesOrQuery = queryMode == TableQuerier.QueryMode.QUERY ? Collections.singletonList(config.getQuery().get()) : tables;