Skip to content

Commit b2a2179

Browse files
committed
Minor refactoring related to the PR comments
* Update topic prefix to include partition.assignment.strategy * Add test to check uniqueness after each rebalance * Add test to check all partitions are assigned after each rebalance * Extracted variable for repetitive code part
1 parent d2a44bb commit b2a2179

File tree

1 file changed

+10
-13
lines changed

1 file changed

+10
-13
lines changed

tests/integration/consumer/test_consumer_upgrade_downgrade.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,23 +53,19 @@ def list_offsets(a, topic, no_of_partitions):
5353
.format(partition.topic, partition.partition, e))
5454

5555

56-
# def produce_messages(producer, topic, partitions, num_messages):
57-
# for i in range(num_messages):
58-
# key = "key-{}".format(i)
59-
# value = "value-{}".format(i)
60-
# partition = i % partitions
61-
# producer.produce(topic, key=key, value=value, partition=partition)
62-
# producer.flush()
63-
64-
6556
def check_consumer(kafka_cluster, consumers, admin_client, topic, expected_protocol):
6657
total_msg_read = 0
67-
while len(consumers[-1].assignment()) != number_of_partitions // len(consumers):
58+
expected_partitions_per_consumer = number_of_partitions // len(consumers)
59+
while len(consumers[-1].assignment()) != expected_partitions_per_consumer:
6860
for consumer in consumers:
6961
consumer.poll(0.1)
7062

63+
all_assignments = set()
7164
for consumer in consumers:
72-
assert len(consumer.assignment()) == number_of_partitions // len(consumers)
65+
assignment = consumer.assignment()
66+
all_assignments.update(assignment)
67+
assert len(assignment) == expected_partitions_per_consumer
68+
assert len(all_assignments) == number_of_partitions
7369

7470
assert get_group_protocol_type(admin_client, topic) == expected_protocol
7571

@@ -91,7 +87,8 @@ def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(k
9187
"""
9288
Test consumer upgrade and downgrade.
9389
"""
94-
topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix,
90+
topic_name_prefix = f"{topic_prefix}_{partition_assignment_strategy}"
91+
topic = kafka_cluster.create_topic_and_wait_propogation(topic_name_prefix,
9592
{
9693
"num_partitions": number_of_partitions
9794
})
@@ -114,7 +111,7 @@ def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(k
114111
assert consumer2 is not None
115112
consumer2.subscribe([topic])
116113
check_consumer(kafka_cluster, [consumer, consumer2], admin_client, topic, ConsumerGroupType.CONSUMER)
117-
114+
118115
# Now simulate a downgrade by deleting the second consumer and keeping only 'classic' consumer
119116
consumer2.close()
120117
check_consumer(kafka_cluster, [consumer], admin_client, topic, ConsumerGroupType.CLASSIC)

0 commit comments

Comments
 (0)