Skip to content

Commit 7ce1a40

Browse files
authored
Merge pull request #8 from itERRatOR/master
Suggested changes
2 parents df052f5 + 71f13a3 commit 7ce1a40

File tree

8 files changed

+58
-62
lines changed

8 files changed

+58
-62
lines changed

config/worker.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# from more information, visit: http://docs.confluent.io/3.2.0/connect/userguide.html#common-worker-configs
1+
# for more information, visit: http://docs.confluent.io/3.2.0/connect/userguide.html#common-worker-configs
22
bootstrap.servers=127.0.0.1:9092
33
key.converter=org.apache.kafka.connect.json.JsonConverter
44
key.converter.schemas.enable=true

run.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,11 @@ elif hash connect-standalone 2>/dev/null; then
1111
connect-standalone config/worker.properties config/GitHubSourceConnectorExample.properties
1212
elif [[ -z $KAFKA_HOME ]]; then
1313
# for people who installed kafka vanilla
14-
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/etc/schema-registry/connect-avro-standalone.properties config/MySourceConnector.properties
14+
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/etc/schema-registry/connect-avro-standalone.properties config/GitHubSourceConnectorExample.properties
1515
elif [[ -z $CONFLUENT_HOME ]]; then
1616
# for people who installed kafka confluent flavour
17-
$CONFLUENT_HOME/bin/connect-standalone $CONFLUENT_HOME/etc/schema-registry/connect-avro-standalone.properties config/MySourceConnector.properties
17+
$CONFLUENT_HOME/bin/connect-standalone $CONFLUENT_HOME/etc/schema-registry/connect-avro-standalone.properties config/GitHubSourceConnectorExample.properties
1818
else
1919
printf "Couldn't find a suitable way to run kafka connect for you.\n \
2020
Please install Docker, or download the kafka binaries and set the variable KAFKA_HOME."
21-
fi;
21+
fi;

src/main/java/com/simplesteph/kafka/GitHubSchemas.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,62 +6,62 @@
66

77
public class GitHubSchemas {
88

9-
public static String NEXT_PAGE_FIELD = "next_page";
9+
public static final String NEXT_PAGE_FIELD = "next_page";
1010

1111
// Issue fields
12-
public static String OWNER_FIELD = "owner";
13-
public static String REPOSITORY_FIELD = "repository";
14-
public static String CREATED_AT_FIELD = "created_at";
15-
public static String UPDATED_AT_FIELD = "updated_at";
16-
public static String NUMBER_FIELD = "number";
17-
public static String URL_FIELD = "url";
18-
public static String HTML_URL_FIELD = "html_url";
19-
public static String TITLE_FIELD = "title";
20-
public static String STATE_FIELD = "state";
12+
public static final String OWNER_FIELD = "owner";
13+
public static final String REPOSITORY_FIELD = "repository";
14+
public static final String CREATED_AT_FIELD = "created_at";
15+
public static final String UPDATED_AT_FIELD = "updated_at";
16+
public static final String NUMBER_FIELD = "number";
17+
public static final String URL_FIELD = "url";
18+
public static final String HTML_URL_FIELD = "html_url";
19+
public static final String TITLE_FIELD = "title";
20+
public static final String STATE_FIELD = "state";
2121

2222
// User fields
23-
public static String USER_FIELD = "user";
24-
public static String USER_URL_FIELD = "url";
25-
public static String USER_HTML_URL_FIELD = "html_url";
26-
public static String USER_ID_FIELD = "id";
27-
public static String USER_LOGIN_FIELD = "login";
23+
public static final String USER_FIELD = "user";
24+
public static final String USER_URL_FIELD = "url";
25+
public static final String USER_HTML_URL_FIELD = "html_url";
26+
public static final String USER_ID_FIELD = "id";
27+
public static final String USER_LOGIN_FIELD = "login";
2828

2929
// PR fields
30-
public static String PR_FIELD = "pull_request";
31-
public static String PR_URL_FIELD = "url";
32-
public static String PR_HTML_URL_FIELD = "html_url";
30+
public static final String PR_FIELD = "pull_request";
31+
public static final String PR_URL_FIELD = "url";
32+
public static final String PR_HTML_URL_FIELD = "html_url";
3333

3434
// Schema names
35-
public static String SCHEMA_KEY = "com.simplesteph.kafka.connect.github.IssueKey";
36-
public static String SCHEMA_VALUE_ISSUE = "com.simplesteph.kafka.connect.github.IssueValue";
37-
public static String SCHEMA_VALUE_USER = "com.simplesteph.kafka.connect.github.UserValue";
38-
public static String SCHEMA_VALUE_PR = "com.simplesteph.kafka.connect.github.PrValue";
35+
public static final String SCHEMA_KEY = "com.simplesteph.kafka.connect.github.IssueKey";
36+
public static final String SCHEMA_VALUE_ISSUE = "com.simplesteph.kafka.connect.github.IssueValue";
37+
public static final String SCHEMA_VALUE_USER = "com.simplesteph.kafka.connect.github.UserValue";
38+
public static final String SCHEMA_VALUE_PR = "com.simplesteph.kafka.connect.github.PrValue";
3939

4040
// Key Schema
41-
public static Schema KEY_SCHEMA = SchemaBuilder.struct().name(SCHEMA_KEY)
41+
public static final Schema KEY_SCHEMA = SchemaBuilder.struct().name(SCHEMA_KEY)
4242
.version(1)
4343
.field(OWNER_FIELD, Schema.STRING_SCHEMA)
4444
.field(REPOSITORY_FIELD, Schema.STRING_SCHEMA)
4545
.field(NUMBER_FIELD, Schema.INT32_SCHEMA)
4646
.build();
4747

4848
// Value Schema
49-
public static Schema USER_SCHEMA = SchemaBuilder.struct().name(SCHEMA_VALUE_USER)
49+
public static final Schema USER_SCHEMA = SchemaBuilder.struct().name(SCHEMA_VALUE_USER)
5050
.version(1)
5151
.field(USER_URL_FIELD, Schema.STRING_SCHEMA)
5252
.field(USER_ID_FIELD, Schema.INT32_SCHEMA)
5353
.field(USER_LOGIN_FIELD, Schema.STRING_SCHEMA)
5454
.build();
5555

5656
// optional schema
57-
public static Schema PR_SCHEMA = SchemaBuilder.struct().name(SCHEMA_VALUE_PR)
57+
public static final Schema PR_SCHEMA = SchemaBuilder.struct().name(SCHEMA_VALUE_PR)
5858
.version(1)
5959
.field(PR_URL_FIELD, Schema.STRING_SCHEMA)
6060
.field(PR_HTML_URL_FIELD, Schema.STRING_SCHEMA)
6161
.optional()
6262
.build();
6363

64-
public static Schema VALUE_SCHEMA = SchemaBuilder.struct().name(SCHEMA_VALUE_ISSUE)
64+
public static final Schema VALUE_SCHEMA = SchemaBuilder.struct().name(SCHEMA_VALUE_ISSUE)
6565
.version(2)
6666
.field(URL_FIELD, Schema.STRING_SCHEMA)
6767
.field(TITLE_FIELD, Schema.STRING_SCHEMA)

src/main/java/com/simplesteph/kafka/Validators/BatchSizeValidator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ public class BatchSizeValidator implements ConfigDef.Validator {
88
@Override
99
public void ensureValid(String name, Object value) {
1010
Integer batchSize = (Integer) value;
11-
if (!(1 <= batchSize && batchSize <=100)){
11+
if (!(1 <= batchSize && batchSize <= 100)){
1212
throw new ConfigException(name, value, "Batch Size must be a positive integer that's less or equal to 100");
1313
}
1414
}
15-
}
15+
}

src/main/java/com/simplesteph/kafka/model/User.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11

22
package com.simplesteph.kafka.model;
33

4-
import org.apache.kafka.connect.data.Struct;
54
import org.json.JSONObject;
65

76
import java.util.HashMap;

src/test/java/com/simplesteph/kafka/GitHubSourceConnectorConfigTest.java

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import org.apache.kafka.common.config.ConfigDef;
44
import org.apache.kafka.common.config.ConfigValue;
5+
import org.junit.Before;
56
import org.junit.Test;
67
import java.util.HashMap;
78
import java.util.Map;
@@ -11,71 +12,64 @@
1112
public class GitHubSourceConnectorConfigTest {
1213

1314
private ConfigDef configDef = GitHubSourceConnectorConfig.conf();
14-
15-
private Map<String, String> initialConfig() {
16-
Map<String, String> baseProps = new HashMap<>();
17-
baseProps.put(OWNER_CONFIG, "foo");
18-
baseProps.put(REPO_CONFIG, "bar");
19-
baseProps.put(SINCE_CONFIG, "2017-04-26T01:23:45Z");
20-
baseProps.put(BATCH_SIZE_CONFIG, "100");
21-
baseProps.put(TOPIC_CONFIG, "github-issues");
22-
return baseProps;
15+
private Map<String, String> config;
16+
17+
@Before
18+
public void setUpInitialConfig() {
19+
config = new HashMap<>();
20+
config.put(OWNER_CONFIG, "foo");
21+
config.put(REPO_CONFIG, "bar");
22+
config.put(SINCE_CONFIG, "2017-04-26T01:23:45Z");
23+
config.put(BATCH_SIZE_CONFIG, "100");
24+
config.put(TOPIC_CONFIG, "github-issues");
2325
}
2426

25-
2627
@Test
2728
public void doc() {
2829
System.out.println(GitHubSourceConnectorConfig.conf().toRst());
2930
}
3031

3132
@Test
3233
public void initialConfigIsValid() {
33-
assert (configDef.validate(initialConfig())
34+
assert (configDef.validate(config)
3435
.stream()
3536
.allMatch(configValue -> configValue.errorMessages().size() == 0));
3637
}
3738

3839
@Test
3940
public void canReadConfigCorrectly() {
40-
GitHubSourceConnectorConfig config = new GitHubSourceConnectorConfig(initialConfig());
41+
GitHubSourceConnectorConfig config = new GitHubSourceConnectorConfig(this.config);
4142
config.getAuthPassword();
4243

4344
}
4445

45-
4646
@Test
4747
public void validateSince() {
48-
Map<String, String> config = initialConfig();
4948
config.put(SINCE_CONFIG, "not-a-date");
5049
ConfigValue configValue = configDef.validateAll(config).get(SINCE_CONFIG);
5150
assert (configValue.errorMessages().size() > 0);
5251
}
5352

5453
@Test
5554
public void validateBatchSize() {
56-
Map<String, String> config = initialConfig();
5755
config.put(BATCH_SIZE_CONFIG, "-1");
5856
ConfigValue configValue = configDef.validateAll(config).get(BATCH_SIZE_CONFIG);
5957
assert (configValue.errorMessages().size() > 0);
6058

61-
config = initialConfig();
6259
config.put(BATCH_SIZE_CONFIG, "101");
6360
configValue = configDef.validateAll(config).get(BATCH_SIZE_CONFIG);
6461
assert (configValue.errorMessages().size() > 0);
65-
6662
}
6763

6864
@Test
6965
public void validateUsername() {
70-
Map<String, String> config = initialConfig();
7166
config.put(AUTH_USERNAME_CONFIG, "username");
7267
ConfigValue configValue = configDef.validateAll(config).get(AUTH_USERNAME_CONFIG);
7368
assert (configValue.errorMessages().size() == 0);
7469
}
7570

7671
@Test
7772
public void validatePassword() {
78-
Map<String, String> config = initialConfig();
7973
config.put(AUTH_PASSWORD_CONFIG, "password");
8074
ConfigValue configValue = configDef.validateAll(config).get(AUTH_PASSWORD_CONFIG);
8175
assert (configValue.errorMessages().size() == 0);

src/test/java/com/simplesteph/kafka/GitHubSourceTaskTest.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@
1313
import java.util.Set;
1414

1515
import static com.simplesteph.kafka.GitHubSourceConnectorConfig.*;
16+
import static org.junit.Assert.*;
1617

1718
public class GitHubSourceTaskTest {
1819

19-
GitHubSourceTask gitHubSourceTask = new GitHubSourceTask();
20+
private GitHubSourceTask gitHubSourceTask = new GitHubSourceTask();
2021
private Integer batchSize = 10;
2122

2223
private Map<String, String> initialConfig() {
@@ -40,17 +41,18 @@ public void test() throws UnirestException {
4041
System.out.println(url);
4142
HttpResponse<JsonNode> httpResponse = gitHubSourceTask.gitHubHttpAPIClient.getNextIssuesAPI(gitHubSourceTask.nextPageToVisit, gitHubSourceTask.nextQuerySince);
4243
if (httpResponse.getStatus() != 403) {
43-
assert (httpResponse.getStatus() == 200);
44+
assertEquals(200, httpResponse.getStatus());
4445
Set<String> headers = httpResponse.getHeaders().keySet();
45-
assert (headers.contains("ETag"));
46-
assert (headers.contains("X-RateLimit-Limit"));
47-
assert (headers.contains("X-RateLimit-Remaining"));
48-
assert (headers.contains("X-RateLimit-Reset"));
49-
assert (httpResponse.getBody().getArray().length() == 10);
46+
assertTrue(headers.contains("ETag"));
47+
assertTrue(headers.contains("X-RateLimit-Limit"));
48+
assertTrue(headers.contains("X-RateLimit-Remaining"));
49+
assertTrue(headers.contains("X-RateLimit-Reset"));
50+
assertEquals(batchSize.intValue(), httpResponse.getBody().getArray().length());
5051
JSONObject jsonObject = (JSONObject) httpResponse.getBody().getArray().get(0);
5152
Issue issue = Issue.fromJson(jsonObject);
52-
assert (issue != null);
53-
assert (issue.getNumber() == 2072);
53+
assertNotNull(issue);
54+
assertNotNull(issue.getNumber());
55+
assertEquals(2072, issue.getNumber().intValue());
5456
}
5557
}
5658
}

src/test/java/com/simplesteph/kafka/model/IssueTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.simplesteph.kafka.model;
22

3+
import com.simplesteph.kafka.GitHubSchemas;
34
import com.simplesteph.kafka.GitHubSourceTask;
45
import org.apache.kafka.connect.data.Struct;
56
import org.json.JSONObject;
@@ -93,7 +94,7 @@ public void convertsToStruct(){
9394
// issue
9495
Issue issue = Issue.fromJson(issueJson);
9596
Struct struct = new GitHubSourceTask().buildRecordValue(issue);
96-
assert struct.get("created_at").getClass() == Date.class;
97+
assert struct.get(GitHubSchemas.CREATED_AT_FIELD).getClass() == Date.class;
9798
}
9899

99100
}

0 commit comments

Comments
 (0)