Skip to content

Commit 1469110

Browse files
committed
Added online upgrade and downgrade test
1 parent 9ad4e3a commit 1469110

File tree

3 files changed

+159
-12
lines changed

3 files changed

+159
-12
lines changed

tests/common/__init__.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,22 +55,25 @@ def use_group_protocol_consumer():
5555

5656
@staticmethod
5757
def update_conf_group_protocol(conf=None):
58-
if conf is not None and 'group.id' in conf and TestUtils.use_group_protocol_consumer():
58+
if TestUtils.can_upgrade_group_protocol_to_consumer(conf):
5959
conf['group.protocol'] = 'consumer'
6060

61+
@staticmethod
62+
def can_upgrade_group_protocol_to_consumer(conf):
63+
return conf is not None and 'group.id' in conf and 'group.protocol' not in conf and TestUtils.use_group_protocol_consumer()
64+
6165
@staticmethod
6266
def remove_forbidden_conf_group_protocol_consumer(conf):
63-
if conf is None:
67+
if conf is None or not TestUtils.use_group_protocol_consumer() or conf.get('group.protocol', 'consumer') != 'consumer':
6468
return
65-
if TestUtils.use_group_protocol_consumer():
66-
forbidden_conf_properties = ["session.timeout.ms",
67-
"partition.assignment.strategy",
68-
"heartbeat.interval.ms",
69-
"group.protocol.type"]
70-
for prop in forbidden_conf_properties:
71-
if prop in conf:
72-
print("Skipping setting forbidden configuration {prop} for `CONSUMER` protocol")
73-
del conf[prop]
69+
forbidden_conf_properties = ["session.timeout.ms",
70+
"partition.assignment.strategy",
71+
"heartbeat.interval.ms",
72+
"group.protocol.type"]
73+
for prop in forbidden_conf_properties:
74+
if prop in conf:
75+
print(f"Skipping setting forbidden configuration {prop} for `CONSUMER` protocol")
76+
del conf[prop]
7477

7578

7679
class TestConsumer(Consumer):

tests/integration/cluster_fixture.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,19 @@ def create_topic(self, prefix, conf=None, **create_topic_kwargs):
233233

234234
future_topic.get(name).result()
235235
return name
236+
237+
def delete_topic(self, topic):
238+
"""
239+
Deletes a topic with this cluster.
240+
241+
:param str topic: topic name
242+
"""
243+
future = self.admin().delete_topics([topic])
244+
try:
245+
future.get(topic).result()
246+
print("Topic {} deleted".format(topic))
247+
except Exception as e:
248+
print("Failed to delete topic {}: {}".format(topic, e))
236249

237250
def create_topic_and_wait_propogation(self, prefix, conf=None, **create_topic_kwargs):
238251
"""
@@ -273,7 +286,7 @@ def seed_topic(self, topic, value_source=None, key_source=None, header_source=No
273286
value_source = ['test-data{}'.format(i) for i in range(0, 100)]
274287

275288
if key_source is None:
276-
key_source = [None]
289+
key_source = ['test-key{}'.format(i) for i in range(0, 100)]
277290

278291
KafkaClusterFixture._produce(self._producer, topic, value_source, key_source, header_source)
279292

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2025 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limit
17+
18+
import pytest
19+
from confluent_kafka import ConsumerGroupType, IsolationLevel, KafkaException, TopicPartition
20+
from confluent_kafka.admin import OffsetSpec
21+
from tests.common import TestUtils
22+
23+
topic_prefix = "test_consumer_upgrade_downgrade_"
24+
number_of_partitions = 10
25+
26+
27+
def get_group_protocol_type(a, group_id):
28+
futureMap = a.describe_consumer_groups([group_id])
29+
try:
30+
future = futureMap[group_id]
31+
g = future.result()
32+
return g.type
33+
except KafkaException as e:
34+
print("Error while describing group id '{}': {}".format(group_id, e))
35+
except Exception:
36+
raise
37+
38+
39+
def list_offsets(a, topic, no_of_partitions):
40+
topic_partition_offsets = {}
41+
for partition in range(no_of_partitions):
42+
topic_partition = TopicPartition(topic, partition)
43+
topic_partition_offsets[topic_partition] = OffsetSpec.latest()
44+
futmap = a.list_offsets(topic_partition_offsets, isolation_level=IsolationLevel.READ_COMMITTED, request_timeout=30)
45+
for partition, fut in futmap.items():
46+
try:
47+
result = fut.result()
48+
print("Topicname : {} Partition_Index : {} Offset : {} Timestamp : {}"
49+
.format(partition.topic, partition.partition, result.offset,
50+
result.timestamp))
51+
except KafkaException as e:
52+
print("Topicname : {} Partition_Index : {} Error : {}"
53+
.format(partition.topic, partition.partition, e))
54+
55+
56+
# def produce_messages(producer, topic, partitions, num_messages):
57+
# for i in range(num_messages):
58+
# key = "key-{}".format(i)
59+
# value = "value-{}".format(i)
60+
# partition = i % partitions
61+
# producer.produce(topic, key=key, value=value, partition=partition)
62+
# producer.flush()
63+
64+
65+
def check_consumer(kafka_cluster, consumers, admin_client, topic, expected_protocol):
66+
total_msg_read = 0
67+
while len(consumers[-1].assignment()) != number_of_partitions // len(consumers):
68+
for consumer in consumers:
69+
consumer.poll(0.1)
70+
71+
for consumer in consumers:
72+
assert len(consumer.assignment()) == number_of_partitions // len(consumers)
73+
74+
assert get_group_protocol_type(admin_client, topic) == expected_protocol
75+
76+
# Produce some messages to the topic
77+
kafka_cluster.seed_topic(topic)
78+
list_offsets(admin_client, topic, number_of_partitions)
79+
80+
while total_msg_read < 100:
81+
for consumer in consumers:
82+
# Poll for messages
83+
msg = consumer.poll(0.1)
84+
if msg is not None:
85+
total_msg_read += 1
86+
87+
assert total_msg_read == 100, "Expected to read 100 messages, but read {}".format(total_msg_read)
88+
89+
90+
def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, partition_assignment_strategy):
91+
"""
92+
Test consumer upgrade and downgrade.
93+
"""
94+
topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix,
95+
{
96+
"num_partitions": number_of_partitions
97+
})
98+
admin_client = kafka_cluster.admin()
99+
100+
# Create a consumer with the latest version
101+
consumer_conf = {'group.id': topic,
102+
'auto.offset.reset': 'earliest',
103+
'group.protocol': 'classic'}
104+
consumer_conf['partition.assignment.strategy'] = partition_assignment_strategy
105+
consumer = kafka_cluster.consumer(consumer_conf)
106+
assert consumer is not None
107+
consumer.subscribe([topic])
108+
check_consumer(kafka_cluster, [consumer], admin_client, topic, ConsumerGroupType.CLASSIC)
109+
del consumer_conf['partition.assignment.strategy']
110+
111+
# Now simulate an upgrade by creating a new consumer with 'consumer' protocol
112+
consumer_conf['group.protocol'] = 'consumer'
113+
consumer2 = kafka_cluster.consumer(consumer_conf)
114+
assert consumer2 is not None
115+
consumer2.subscribe([topic])
116+
check_consumer(kafka_cluster, [consumer, consumer2], admin_client, topic, ConsumerGroupType.CONSUMER)
117+
118+
# Now simulate a downgrade by deleting the second consumer and keeping only 'classic' consumer
119+
consumer2.close()
120+
check_consumer(kafka_cluster, [consumer], admin_client, topic, ConsumerGroupType.CLASSIC)
121+
122+
consumer.close()
123+
kafka_cluster.delete_topic(topic)
124+
125+
126+
@pytest.mark.skipif(not TestUtils.use_group_protocol_consumer(),
127+
reason="Skipping test as group protocol consumer is not enabled")
128+
def test_consumer_upgrade_downgrade(kafka_cluster):
129+
perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, 'roundrobin')
130+
perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, 'range')
131+
perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, 'cooperative-sticky')

0 commit comments

Comments
 (0)