Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void start(Map<String, String> properties) throws ConnectException {
List<String> excludeList = config.tableExcludeListRegexes();
Set<String> excludeListSet = excludeList.isEmpty() ? null : new HashSet<>(excludeList);

String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG);
String query = config.getQuery();
if (!query.isEmpty()) {
if (whitelistSet != null || blacklistSet != null
|| includeListSet != null || excludeListSet != null) {
Expand Down Expand Up @@ -170,7 +170,7 @@ public Config validate(Map<String, String> connectorConfigs) {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
log.info("Starting with the task Configuration method.");
String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG);
String query = config.getQuery();
List<Map<String, String>> taskConfigs;
if (!query.isEmpty()) {
log.info("Custom query provided, generating task configuration for the query");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.kafka.common.config.ConfigDef.Validator;
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
Expand Down Expand Up @@ -336,6 +337,12 @@ public class JdbcSourceConnectorConfig extends AbstractConfig {
public static final String QUERY_DEFAULT = "";
private static final String QUERY_DISPLAY = "Query";

public static final String QUERY_MASKED_CONFIG = "query.masked";
private static final String QUERY_MASKED_DOC =
"Same as 'query' configuration but the query string is masked"
+ "Use this config to prevent sensitive information from being logged.";
private static final String QUERY_MASKED_DISPLAY = "Query (Masked)";

public static final String TOPIC_PREFIX_CONFIG = "topic.prefix";
private static final String TOPIC_PREFIX_DOC =
"Prefix to prepend to table names to generate the name of the Kafka topic to publish data "
Expand Down Expand Up @@ -420,6 +427,20 @@ public class JdbcSourceConnectorConfig extends AbstractConfig {
private static final EnumRecommender QUOTE_METHOD_RECOMMENDER =
EnumRecommender.in(QuoteMethod.values());

private static final ConfigDef.Recommender HIDDEN_RECOMMENDER =
new ConfigDef.Recommender() {
@Override
public java.util.List<Object> validValues(
String name, Map<String, Object> config) {
return java.util.Collections.emptyList();
}

@Override
public boolean visible(String name, Map<String, Object> config) {
return false;
}
};

public static final String DATABASE_GROUP = "Database";
public static final String MODE_GROUP = "Mode";
public static final String CONNECTOR_GROUP = "Connector";
Expand Down Expand Up @@ -825,6 +846,13 @@ private static void addSchemaAndDialectOptions(ConfigDef config, int orderInGrou

private static final void addModeOptions(ConfigDef config) {
int orderInGroup = 0;
orderInGroup = defineModeConfig(config, orderInGroup);
orderInGroup = defineIncrementTimestampConfigs(config, orderInGroup);
orderInGroup = defineQueryAndQuoteConfigs(config, orderInGroup);
defineTransactionAndRetryConfigs(config, orderInGroup);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extend this Options method, because of falling to checkstyle error of exceeding 128 lines in a method.


private static int defineModeConfig(ConfigDef config, int orderInGroup) {
config.define(
MODE_CONFIG,
Type.STRING,
Expand All @@ -849,7 +877,12 @@ private static final void addModeOptions(ConfigDef config) {
TIMESTAMP_COLUMN_MAPPING_CONFIG,
VALIDATE_NON_NULL_CONFIG
)
).define(
);
return orderInGroup;
}

private static int defineIncrementTimestampConfigs(ConfigDef config, int orderInGroup) {
config.define(
INCREMENTING_COLUMN_NAME_CONFIG,
Type.STRING,
INCREMENTING_COLUMN_NAME_DEFAULT,
Expand Down Expand Up @@ -917,7 +950,12 @@ private static final void addModeOptions(ConfigDef config) {
Width.SHORT,
VALIDATE_NON_NULL_DISPLAY,
MODE_DEPENDENTS_RECOMMENDER
).define(
);
return orderInGroup;
}

private static int defineQueryAndQuoteConfigs(ConfigDef config, int orderInGroup) {
config.define(
QUERY_CONFIG,
Type.STRING,
QUERY_DEFAULT,
Expand All @@ -927,6 +965,17 @@ private static final void addModeOptions(ConfigDef config) {
++orderInGroup,
Width.SHORT,
QUERY_DISPLAY
).define(
QUERY_MASKED_CONFIG,
Type.PASSWORD,
QUERY_DEFAULT,
Importance.MEDIUM,
QUERY_MASKED_DOC,
MODE_GROUP,
++orderInGroup,
Width.SHORT,
QUERY_MASKED_DISPLAY,
HIDDEN_RECOMMENDER
).define(
QUOTE_SQL_IDENTIFIERS_CONFIG,
Type.STRING,
Expand All @@ -948,7 +997,12 @@ private static final void addModeOptions(ConfigDef config) {
++orderInGroup,
Width.MEDIUM,
QUERY_SUFFIX_DISPLAY
).define(
);
return orderInGroup;
}

private static void defineTransactionAndRetryConfigs(ConfigDef config, int orderInGroup) {
config.define(
TRANSACTION_ISOLATION_MODE_CONFIG,
Type.STRING,
TRANSACTION_ISOLATION_MODE_DEFAULT,
Expand Down Expand Up @@ -1432,6 +1486,22 @@ public List<String> incrementingColMappingRegexes() {
.collect(java.util.stream.Collectors.toList());
}

/**
* Get query string from either query.masked (Type.PASSWORD) or query (Type.STRING) config.
* Prioritizes query.masked if set, otherwise falls back to query config for backward
* compatibility.
*
* @return The query string from whichever config is set, or empty string if neither is set.
*/
public String getQuery() {
Password maskedQuery = getPassword(QUERY_MASKED_CONFIG);
if (maskedQuery != null && maskedQuery.value() != null && !maskedQuery.value().isEmpty()) {
return maskedQuery.value();
}

return getString(QUERY_CONFIG);
}

public boolean modeUsesTimestampColumn() {
String mode = getString(MODE_CONFIG);
return Arrays.asList(MODE_TIMESTAMP, MODE_TIMESTAMP_INCREMENTING).contains(mode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void start(Map<String, String> properties) {

List<String> tables = config.getList(JdbcSourceTaskConfig.TABLES_CONFIG);
Boolean tablesFetched = config.getBoolean(JdbcSourceTaskConfig.TABLES_FETCHED);
String query = config.getString(JdbcSourceTaskConfig.QUERY_CONFIG);
String query = config.getQuery();
List<String> tableType = config.getList(JdbcSourceConnectorConfig.TABLE_TYPE_CONFIG);

if ((tables.isEmpty() && query.isEmpty())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private boolean isReadyToProcess() {
// If the call to get tables has not completed we will not do anything.
// This is only valid in table mode.
Boolean tablesFetched = config.getBoolean(JdbcSourceTaskConfig.TABLES_FETCHED);
String query = config.getString(JdbcSourceTaskConfig.QUERY_CONFIG);
String query = config.getQuery();
return !query.isEmpty() || tablesFetched;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig.TransactionIsolationMode;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.config.types.Password;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -80,7 +81,8 @@ public Config validate() {
}

boolean validationResult = validateMultiConfigs()
&& validateLegacyNewConfigCompatibility();
&& validateLegacyNewConfigCompatibility()
&& validateQueryConfigs();

if (validationResult && isUsingNewConfigs()) {
validationResult = validateTableInclusionConfigs()
Expand Down Expand Up @@ -150,6 +152,16 @@ private boolean validateMultiConfigs() {
return true;
}

private boolean hasAnyQueryConfig() {
String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG);
Password queryMasked =
config.getPassword(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you make config.getQuery return optional, then this hasAnyQueryConfig method will not be needed.

boolean hasQuery = query != null && !query.trim().isEmpty();
boolean hasQueryMasked =
queryMasked != null && queryMasked.value() != null && !queryMasked.value().trim().isEmpty();
return hasQuery || hasQueryMasked;
}

/**
* Validate legacy/new config compatibility and requirements.
* Implements the pattern: legacyKeys vs newKeys with early returns.
Expand All @@ -158,6 +170,11 @@ private boolean validateLegacyNewConfigCompatibility() {
// Define legacy and new config keys
boolean usingLegacyConfigs = isUsingLegacyConfigs();
boolean usingNewConfigs = isUsingNewConfigs();
boolean hasQuery = hasAnyQueryConfig();

if (hasQuery) {
return true;
}

if (usingLegacyConfigs && usingNewConfigs) {
return addConfigErrorsForLegacyAndNewConfigConflict();
Expand Down Expand Up @@ -273,6 +290,9 @@ private boolean addConfigErrorsForLegacyAndNewConfigConflict() {
* Validate that at least one configuration is provided.
*/
private boolean addConfigErrorsForNoConfigProvided() {
if (hasAnyQueryConfig()) {
return true;
}
String msg = "At least one table filtering configuration is required. "
+ "Provide one of: " + JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG + ", "
+ JdbcSourceConnectorConfig.TABLE_BLACKLIST_CONFIG + ", "
Expand Down Expand Up @@ -305,6 +325,48 @@ private boolean validateTableInclusionConfigs() {
return true;
}

/**
* Validate that only one of query or query.masked configs is set at a time.
* Both configs should not be set simultaneously to avoid ambiguity.
*/
private boolean validateQueryConfigs() {
String query = config.getString(JdbcSourceConnectorConfig.QUERY_CONFIG);
Password queryMasked =
config.getPassword(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG);

boolean hasQuery = query != null && !query.isEmpty();
boolean hasQueryMasked = queryMasked != null
&& queryMasked.value() != null
&& !queryMasked.value().isEmpty();

if (hasQuery && hasQueryMasked) {
String msg = "Both 'query' and 'query.masked' configs cannot be set at the same time. "
+ "Please use only one of them.";

addConfigError(JdbcSourceConnectorConfig.QUERY_CONFIG, msg);
addConfigError(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, msg);

log.error("Validation failed: Both query and query.masked configs are set");
return false;
}

if ((hasQuery || hasQueryMasked) && (isUsingLegacyConfigs() || isUsingNewConfigs())) {
String msg =
"Do not specify table filtering configs with 'query' or 'query.masked'. "
+ "Remove table.whitelist / table.blacklist / table.include.list / "
+ "table.exclude.list.";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either of table.filtering.options or query should be provided as the connector would work in table.mode or query.mode and not both.

addConfigError(JdbcSourceConnectorConfig.QUERY_CONFIG, msg);
addConfigError(JdbcSourceConnectorConfig.QUERY_MASKED_CONFIG, msg);
addConfigError(JdbcSourceConnectorConfig.TABLE_WHITELIST_CONFIG, msg);
addConfigError(JdbcSourceConnectorConfig.TABLE_BLACKLIST_CONFIG, msg);
addConfigError(JdbcSourceConnectorConfig.TABLE_INCLUDE_LIST_CONFIG, msg);
addConfigError(JdbcSourceConnectorConfig.TABLE_EXCLUDE_LIST_CONFIG, msg);
return false;
}

return true;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a validation in order to limit using of either of query or query.masked config.

/**
* Validate plugin-specific needs. This method can be overridden by specific
* connector implementations to add their own validation logic.
Expand Down
Loading