|
16 | 16 | # limitations under the License. |
17 | 17 |
|
18 | 18 | import pytest |
19 | | -from confluent_kafka import ConsumerGroupType, IsolationLevel, KafkaException, TopicPartition |
20 | | -from confluent_kafka.admin import OffsetSpec |
| 19 | +from confluent_kafka import ConsumerGroupType, KafkaException |
21 | 20 | from tests.common import TestUtils |
22 | 21 |
|
23 | 22 | topic_prefix = "test_consumer_upgrade_downgrade_" |
@@ -76,20 +75,25 @@ def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(k |
76 | 75 | }) |
77 | 76 | admin_client = kafka_cluster.admin() |
78 | 77 |
|
79 | | - # Create a consumer with the latest version |
80 | 78 | consumer_conf = {'group.id': topic, |
81 | | - 'auto.offset.reset': 'earliest', |
82 | | - 'group.protocol': 'classic'} |
83 | | - consumer_conf['partition.assignment.strategy'] = partition_assignment_strategy |
84 | | - consumer = kafka_cluster.consumer(consumer_conf) |
| 79 | + 'auto.offset.reset': 'earliest'} |
| 80 | + consumer_conf_classic = { |
| 81 | + 'group.protocol': 'classic', |
| 82 | + 'partition.assignment.strategy': partition_assignment_strategy, |
| 83 | + **consumer_conf |
| 84 | + } |
| 85 | + consumer_conf_consumer = { |
| 86 | + 'group.protocol': 'consumer', |
| 87 | + **consumer_conf |
| 88 | + } |
| 89 | + |
| 90 | + consumer = kafka_cluster.consumer(consumer_conf_classic) |
85 | 91 | assert consumer is not None |
86 | 92 | consumer.subscribe([topic]) |
87 | 93 | check_consumer(kafka_cluster, [consumer], admin_client, topic, ConsumerGroupType.CLASSIC) |
88 | | - del consumer_conf['partition.assignment.strategy'] |
89 | 94 |
|
90 | 95 | # Now simulate an upgrade by creating a new consumer with 'consumer' protocol |
91 | | - consumer_conf['group.protocol'] = 'consumer' |
92 | | - consumer2 = kafka_cluster.consumer(consumer_conf) |
| 96 | + consumer2 = kafka_cluster.consumer(consumer_conf_consumer) |
93 | 97 | assert consumer2 is not None |
94 | 98 | consumer2.subscribe([topic]) |
95 | 99 | check_consumer(kafka_cluster, [consumer, consumer2], admin_client, topic, ConsumerGroupType.CONSUMER) |
|
0 commit comments