@@ -36,23 +36,6 @@ def get_group_protocol_type(a, group_id):
3636 raise
3737
3838
39- def list_offsets (a , topic , no_of_partitions ):
40- topic_partition_offsets = {}
41- for partition in range (no_of_partitions ):
42- topic_partition = TopicPartition (topic , partition )
43- topic_partition_offsets [topic_partition ] = OffsetSpec .latest ()
44- futmap = a .list_offsets (topic_partition_offsets , isolation_level = IsolationLevel .READ_COMMITTED , request_timeout = 30 )
45- for partition , fut in futmap .items ():
46- try :
47- result = fut .result ()
48- print ("Topicname : {} Partition_Index : {} Offset : {} Timestamp : {}"
49- .format (partition .topic , partition .partition , result .offset ,
50- result .timestamp ))
51- except KafkaException as e :
52- print ("Topicname : {} Partition_Index : {} Error : {}"
53- .format (partition .topic , partition .partition , e ))
54-
55-
5639def check_consumer (kafka_cluster , consumers , admin_client , topic , expected_protocol ):
5740 total_msg_read = 0
5841 expected_partitions_per_consumer = number_of_partitions // len (consumers )
@@ -71,7 +54,6 @@ def check_consumer(kafka_cluster, consumers, admin_client, topic, expected_proto
7154
7255 # Produce some messages to the topic
7356 kafka_cluster .seed_topic (topic )
74- list_offsets (admin_client , topic , number_of_partitions )
7557
7658 while total_msg_read < 100 :
7759 for consumer in consumers :
0 commit comments