diff --git a/opentracing-kafka-client/pom.xml b/opentracing-kafka-client/pom.xml
index 2ac937c..190dbc6 100644
--- a/opentracing-kafka-client/pom.xml
+++ b/opentracing-kafka-client/pom.xml
@@ -32,6 +32,11 @@
kafka-clients
provided
+
+ org.slf4j
+ slf4j-api
+ 1.7.30
+
\ No newline at end of file
diff --git a/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaConsumer.java b/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaConsumer.java
index e00a363..416b9e4 100644
--- a/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaConsumer.java
+++ b/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaConsumer.java
@@ -22,6 +22,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
@@ -224,7 +225,7 @@ public Map committed(Set part
public Map committed(Set partitions,
final Duration timeout) {
return consumer.committed(partitions, timeout);
- }
+ }
@Override
public Map metrics() {
@@ -300,6 +301,11 @@ public Map endOffsets(Collection collectio
return consumer.endOffsets(collection, duration);
}
+ @Override
+ public OptionalLong currentLag(final TopicPartition topicPartition) {
+ return consumer.currentLag(topicPartition);
+ }
+
@Override
public ConsumerGroupMetadata groupMetadata() {
return consumer.groupMetadata();
@@ -311,14 +317,13 @@ public void enforceRebalance() {
}
@Override
- public void close() {
- consumer.close();
+ public void enforceRebalance(final String s) {
+ consumer.enforceRebalance(s);
}
@Override
- @Deprecated
- public void close(long l, TimeUnit timeUnit) {
- consumer.close(l, timeUnit);
+ public void close() {
+ consumer.close();
}
@Override
diff --git a/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaProducer.java b/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaProducer.java
index 3447d68..e9b7750 100644
--- a/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaProducer.java
+++ b/opentracing-kafka-client/src/main/java/io/opentracing/contrib/kafka/TracingKafkaProducer.java
@@ -166,9 +166,4 @@ public void close(Duration duration) {
producer.close(duration);
}
- @Override
- public void close(long timeout, TimeUnit timeUnit) {
- producer.close(timeout, timeUnit);
- }
-
}
diff --git a/opentracing-kafka-spring/src/test/java/io/opentracing/contrib/kafka/spring/TracingSpringKafkaTest.java b/opentracing-kafka-spring/src/test/java/io/opentracing/contrib/kafka/spring/TracingSpringKafkaTest.java
index a59aabc..1ae5e8d 100644
--- a/opentracing-kafka-spring/src/test/java/io/opentracing/contrib/kafka/spring/TracingSpringKafkaTest.java
+++ b/opentracing-kafka-spring/src/test/java/io/opentracing/contrib/kafka/spring/TracingSpringKafkaTest.java
@@ -60,9 +60,10 @@ public void test() {
await().atMost(15, TimeUnit.SECONDS).until(reportedSpansSize(), greaterThanOrEqualTo(3));
List spans = mockTracer.finishedSpans();
+ spans.forEach(s -> System.out.println("Span Operation: " + s.operationName()));
assertThat(spans, contains(
- new SpanMatcher("To_spring"),
new SpanMatcher("From_spring"),
+ new SpanMatcher("To_spring"),
new SpanMatcher("KafkaListener_spring")));
}
diff --git a/opentracing-kafka-streams/src/main/java/io/opentracing/contrib/kafka/streams/TracingKafkaClientSupplier.java b/opentracing-kafka-streams/src/main/java/io/opentracing/contrib/kafka/streams/TracingKafkaClientSupplier.java
index a2c4eff..f3e4bff 100644
--- a/opentracing-kafka-streams/src/main/java/io/opentracing/contrib/kafka/streams/TracingKafkaClientSupplier.java
+++ b/opentracing-kafka-streams/src/main/java/io/opentracing/contrib/kafka/streams/TracingKafkaClientSupplier.java
@@ -25,6 +25,8 @@
import java.util.Collections;
import java.util.Map;
import java.util.function.BiFunction;
+
+import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -95,6 +97,11 @@ public AdminClient getAdminClient(final Map config) {
return AdminClient.create(config);
}
+ @Override
+ public Admin getAdmin(final Map config) {
+ return this.getAdminClient(config);
+ }
+
@Override
public Producer getProducer(Map config) {
return new TracingKafkaProducerBuilder<>(
diff --git a/pom.xml b/pom.xml
index c1289af..3678659 100644
--- a/pom.xml
+++ b/pom.xml
@@ -62,13 +62,13 @@
- 1.8
+ 11
UTF-8
UTF-8
- 0.33.0
- 2.6.0
- 2.6.1
+ 0.32.0
+ 3.2.1
+ 2.9.0
5.2.7.RELEASE
4.3.0
0.8.5
@@ -128,7 +128,7 @@
org.apache.kafka
- kafka_2.11
+ kafka_2.12
org.apache.kafka
@@ -268,7 +268,7 @@
org.apache.maven.plugins
maven-source-plugin
- 3.2.0
+ 3.2.1
attach-sources
@@ -282,7 +282,7 @@
org.apache.maven.plugins
maven-javadoc-plugin
- 3.1.1
+ 3.4.1
false