Skip to content

Commit 7fa7655

Browse files
authored
Add context manager support for Producer, Consumer, and AdminClient (#2114)
1 parent 1206050 commit 7fa7655

File tree

11 files changed

+875
-40
lines changed

11 files changed

+875
-40
lines changed

examples/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ The scripts in this directory provide various examples of using the Confluent Py
88

99
- [producer.py](producer.py): Read lines from stdin and send them to a Kafka topic.
1010
- [consumer.py](consumer.py): Read messages from a Kafka topic.
11+
- [context_manager_example.py](context_manager_example.py): **Demonstrates context manager (`with` statement) usage for Producer, Consumer, and AdminClient** - shows automatic resource cleanup when exiting the `with` block.
1112

1213
## AsyncIO Examples
1314

examples/avro_producer.py

Lines changed: 36 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -114,43 +114,42 @@ def main(args):
114114
# If Confluent Cloud SR credentials are provided, add to config
115115
if args.sr_api_key and args.sr_api_secret:
116116
schema_registry_conf['basic.auth.user.info'] = f"{args.sr_api_key}:{args.sr_api_secret}"
117-
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
118-
119-
avro_serializer = AvroSerializer(schema_registry_client,
120-
schema_str,
121-
user_to_dict)
122-
123-
string_serializer = StringSerializer('utf_8')
124-
125-
producer_conf = {'bootstrap.servers': args.bootstrap_servers}
126-
127-
producer = Producer(producer_conf)
128-
129-
print("Producing user records to topic {}. ^C to exit.".format(topic))
130-
while True:
131-
# Serve on_delivery callbacks from previous calls to produce()
132-
producer.poll(0.0)
133-
try:
134-
user_name = input("Enter name: ")
135-
user_address = input("Enter address: ")
136-
user_favorite_number = int(input("Enter favorite number: "))
137-
user_favorite_color = input("Enter favorite color: ")
138-
user = User(name=user_name,
139-
address=user_address,
140-
favorite_color=user_favorite_color,
141-
favorite_number=user_favorite_number)
142-
producer.produce(topic=topic,
143-
key=string_serializer(str(uuid4())),
144-
value=avro_serializer(user, SerializationContext(topic, MessageField.VALUE)),
145-
on_delivery=delivery_report)
146-
except KeyboardInterrupt:
147-
break
148-
except ValueError:
149-
print("Invalid input, discarding record...")
150-
continue
151-
152-
print("\nFlushing records...")
153-
producer.flush()
117+
118+
# Use context manager for SchemaRegistryClient to ensure proper cleanup
119+
with SchemaRegistryClient(schema_registry_conf) as schema_registry_client:
120+
avro_serializer = AvroSerializer(schema_registry_client,
121+
schema_str,
122+
user_to_dict)
123+
124+
string_serializer = StringSerializer('utf_8')
125+
126+
producer_conf = {'bootstrap.servers': args.bootstrap_servers}
127+
128+
# Use context manager for Producer to ensure proper cleanup
129+
with Producer(producer_conf) as producer:
130+
print("Producing user records to topic {}. ^C to exit.".format(topic))
131+
while True:
132+
# Serve on_delivery callbacks from previous calls to produce()
133+
producer.poll(0.0)
134+
try:
135+
user_name = input("Enter name: ")
136+
user_address = input("Enter address: ")
137+
user_favorite_number = int(input("Enter favorite number: "))
138+
user_favorite_color = input("Enter favorite color: ")
139+
user = User(name=user_name,
140+
address=user_address,
141+
favorite_color=user_favorite_color,
142+
favorite_number=user_favorite_number)
143+
producer.produce(topic=topic,
144+
key=string_serializer(str(uuid4())),
145+
value=avro_serializer(user, SerializationContext(topic, MessageField.VALUE)),
146+
on_delivery=delivery_report)
147+
except KeyboardInterrupt:
148+
break
149+
except ValueError:
150+
print("Invalid input, discarding record...")
151+
continue
152+
# Producer will automatically flush on context exit
154153

155154

156155
if __name__ == '__main__':
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
#!/usr/bin/env python
2+
#
3+
# Copyright 2016 Confluent Inc.
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
#
19+
# Example demonstrating context manager usage for Producer, Consumer, and AdminClient.
20+
# Context managers ensure proper cleanup of resources when exiting the 'with' block.
21+
#
22+
23+
from confluent_kafka import Producer, Consumer, KafkaError
24+
from confluent_kafka.admin import AdminClient, NewTopic
25+
import sys
26+
27+
28+
def main():
29+
if len(sys.argv) < 2:
30+
sys.stderr.write('Usage: %s <bootstrap-brokers>\n' % sys.argv[0])
31+
sys.exit(1)
32+
33+
broker = sys.argv[1]
34+
topic = 'context-manager-example'
35+
36+
# Example 1: AdminClient with context manager
37+
# Automatically destroys the admin client when exiting the 'with' block
38+
print("=== AdminClient Context Manager Example ===")
39+
admin_conf = {'bootstrap.servers': broker}
40+
41+
with AdminClient(admin_conf) as admin:
42+
# Create a topic using AdminClient
43+
topic_obj = NewTopic(topic, num_partitions=1, replication_factor=1)
44+
futures = admin.create_topics([topic_obj])
45+
46+
# Wait for the operation to complete
47+
for topic_name, future in futures.items():
48+
try:
49+
future.result() # The result itself is None
50+
print(f"Topic '{topic_name}' created successfully")
51+
except Exception as e:
52+
print(f"Failed to create topic '{topic_name}': {e}")
53+
54+
# Poll to ensure callbacks are processed
55+
admin.poll(timeout=1.0)
56+
57+
# AdminClient is automatically destroyed here, no need for manual cleanup
58+
59+
# Example 2: Producer with context manager
60+
# Automatically flushes pending messages and destroys the producer
61+
print("\n=== Producer Context Manager Example ===")
62+
producer_conf = {'bootstrap.servers': broker}
63+
64+
def delivery_callback(err, msg):
65+
if err:
66+
print(f'Message failed delivery: {err}')
67+
else:
68+
print(f'Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}')
69+
70+
with Producer(producer_conf) as producer:
71+
# Produce some messages
72+
for i in range(5):
73+
value = f'Message {i} from context manager example'
74+
producer.produce(
75+
topic,
76+
key=f'key-{i}',
77+
value=value.encode('utf-8'),
78+
callback=delivery_callback
79+
)
80+
# Poll for delivery callbacks
81+
producer.poll(0)
82+
83+
print(f"Produced 5 messages to topic '{topic}'")
84+
85+
# Producer automatically flushes all pending messages and destroys here
86+
# No need to call producer.flush() or manually clean up
87+
88+
# Example 3: Consumer with context manager
89+
# Automatically closes the consumer (leaves consumer group, commits offsets)
90+
print("\n=== Consumer Context Manager Example ===")
91+
consumer_conf = {
92+
'bootstrap.servers': broker,
93+
'group.id': 'context-manager-example-group',
94+
'auto.offset.reset': 'earliest'
95+
}
96+
97+
with Consumer(consumer_conf) as consumer:
98+
# Subscribe to the topic
99+
consumer.subscribe([topic])
100+
101+
# Consume messages
102+
msg_count = 0
103+
try:
104+
while msg_count < 5:
105+
msg = consumer.poll(timeout=1.0)
106+
if msg is None:
107+
continue
108+
109+
if msg.error():
110+
if msg.error().code() == KafkaError._PARTITION_EOF:
111+
# End of partition, try next message
112+
continue
113+
else:
114+
print(f'Consumer error: {msg.error()}')
115+
break
116+
117+
print(f'Consumed message: key={msg.key().decode("utf-8")}, '
118+
f'value={msg.value().decode("utf-8")}, '
119+
f'partition={msg.partition()}, offset={msg.offset()}')
120+
msg_count += 1
121+
except KeyboardInterrupt:
122+
print('Consumer interrupted by user')
123+
124+
# Consumer automatically calls close() here (leaves group, commits offsets)
125+
# No need to manually call consumer.close()
126+
127+
print("\n=== All examples completed successfully! ===")
128+
129+
130+
if __name__ == '__main__':
131+
main()

src/confluent_kafka/cimpl.pyi

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,8 @@ class Producer:
157157
def set_sasl_credentials(self, username: str, password: str) -> None: ...
158158
def __len__(self) -> int: ...
159159
def __bool__(self) -> bool: ...
160+
def __enter__(self) -> "Producer": ...
161+
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ...
160162

161163
class Consumer:
162164
def __init__(self, config: Dict[str, Union[str, int, float, bool, None]]) -> None: ...
@@ -208,6 +210,8 @@ class Consumer:
208210
timeout: float = -1
209211
) -> List[TopicPartition]: ...
210212
def close(self) -> None: ...
213+
def __enter__(self) -> "Consumer": ...
214+
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ...
211215
def list_topics(self, topic: Optional[str] = None, timeout: float = -1) -> Any: ...
212216
def offsets_for_times(
213217
self,
@@ -223,6 +227,8 @@ class Consumer:
223227

224228
class _AdminClientImpl:
225229
def __init__(self, config: Dict[str, Union[str, int, float, bool]]) -> None: ...
230+
def __enter__(self) -> "_AdminClientImpl": ...
231+
def __exit__(self, exc_type: Any, exc_value: Any, exc_traceback: Any) -> Optional[bool]: ...
226232
def create_topics(
227233
self,
228234
topics: List['NewTopic'],

src/confluent_kafka/src/Admin.c

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,11 @@ Admin_options_to_c (Handle *self, rd_kafka_admin_op_t for_api,
137137
rd_kafka_error_t *err_obj = NULL;
138138
char errstr[512];
139139

140+
if (!self->rk) {
141+
PyErr_SetString(PyExc_RuntimeError, "AdminClient has been closed");
142+
return NULL;
143+
}
144+
140145
c_options = rd_kafka_AdminOptions_new(self->rk, for_api);
141146
if (!c_options) {
142147
PyErr_Format(PyExc_RuntimeError,
@@ -3245,6 +3250,11 @@ static PyObject *Admin_poll (Handle *self, PyObject *args,
32453250
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "d", kws, &tmout))
32463251
return NULL;
32473252

3253+
if (!self->rk) {
3254+
PyErr_SetString(PyExc_RuntimeError, "AdminClient has been closed");
3255+
return NULL;
3256+
}
3257+
32483258
r = Admin_poll0(self, (int)(tmout * 1000));
32493259
if (r == -1)
32503260
return NULL;
@@ -3253,6 +3263,36 @@ static PyObject *Admin_poll (Handle *self, PyObject *args,
32533263
}
32543264

32553265

3266+
static PyObject *Admin_enter (Handle *self) {
3267+
Py_INCREF(self);
3268+
return (PyObject *)self;
3269+
}
3270+
3271+
static PyObject *Admin_exit (Handle *self, PyObject *args) {
3272+
PyObject *exc_type, *exc_value, *exc_traceback;
3273+
CallState cs;
3274+
3275+
/* __exit__ always receives 3 arguments: exception type, exception value, and traceback.
3276+
* All three will be None if the with block completed without an exception.
3277+
* We unpack them here but don't need to use them - we just clean up regardless. */
3278+
if (!PyArg_UnpackTuple(args, "__exit__", 3, 3,
3279+
&exc_type, &exc_value, &exc_traceback))
3280+
return NULL;
3281+
3282+
/* Cleanup: destroy admin client */
3283+
if (self->rk) {
3284+
CallState_begin(self, &cs);
3285+
3286+
rd_kafka_destroy(self->rk);
3287+
self->rk = NULL;
3288+
3289+
if (!CallState_end(self, &cs))
3290+
return NULL;
3291+
}
3292+
3293+
Py_RETURN_NONE;
3294+
}
3295+
32563296

32573297
static PyMethodDef Admin_methods[] = {
32583298
{ "create_topics", (PyCFunction)Admin_create_topics,
@@ -3384,12 +3424,18 @@ static PyMethodDef Admin_methods[] = {
33843424
{ "elect_leaders", (PyCFunction)Admin_elect_leaders, METH_VARARGS | METH_KEYWORDS,
33853425
Admin_elect_leaders_doc
33863426
},
3427+
{ "__enter__", (PyCFunction)Admin_enter, METH_NOARGS,
3428+
"Context manager entry." },
3429+
{ "__exit__", (PyCFunction)Admin_exit, METH_VARARGS,
3430+
"Context manager exit. Automatically destroys the admin client." },
33873431

33883432
{ NULL }
33893433
};
33903434

33913435

33923436
static Py_ssize_t Admin__len__ (Handle *self) {
3437+
if (!self->rk)
3438+
return 0;
33933439
return rd_kafka_outq_len(self->rk);
33943440
}
33953441

src/confluent_kafka/src/Consumer.c

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1147,6 +1147,29 @@ static PyObject *Consumer_close (Handle *self, PyObject *ignore) {
11471147
Py_RETURN_NONE;
11481148
}
11491149

1150+
static PyObject *Consumer_enter (Handle *self) {
1151+
Py_INCREF(self);
1152+
return (PyObject *)self;
1153+
}
1154+
1155+
static PyObject *Consumer_exit (Handle *self, PyObject *args) {
1156+
PyObject *exc_type, *exc_value, *exc_traceback;
1157+
1158+
if (!PyArg_UnpackTuple(args, "__exit__", 3, 3,
1159+
&exc_type, &exc_value, &exc_traceback))
1160+
return NULL;
1161+
1162+
/* Cleanup: call close() */
1163+
if (self->rk) {
1164+
PyObject *result = Consumer_close(self, NULL);
1165+
if (!result)
1166+
return NULL;
1167+
Py_DECREF(result);
1168+
}
1169+
1170+
Py_RETURN_NONE;
1171+
}
1172+
11501173
static PyObject *
11511174
Consumer_consumer_group_metadata (Handle *self, PyObject *ignore) {
11521175
rd_kafka_consumer_group_metadata_t *cgmd;
@@ -1527,8 +1550,10 @@ static PyMethodDef Consumer_methods[] = {
15271550
{ "set_sasl_credentials", (PyCFunction)set_sasl_credentials, METH_VARARGS|METH_KEYWORDS,
15281551
set_sasl_credentials_doc
15291552
},
1530-
1531-
1553+
{ "__enter__", (PyCFunction)Consumer_enter, METH_NOARGS,
1554+
"Context manager entry." },
1555+
{ "__exit__", (PyCFunction)Consumer_exit, METH_VARARGS,
1556+
"Context manager exit. Automatically closes the consumer." },
15321557
{ NULL }
15331558
};
15341559

src/confluent_kafka/src/Metadata.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,11 @@ list_topics (Handle *self, PyObject *args, PyObject *kwargs) {
370370
&topic, &tmout))
371371
return NULL;
372372

373+
if (!self->rk) {
374+
PyErr_SetString(PyExc_RuntimeError, "Handle has been closed");
375+
return NULL;
376+
}
377+
373378
if (topic != NULL) {
374379
if (!(only_rkt = rd_kafka_topic_new(self->rk,
375380
topic, NULL))) {
@@ -605,6 +610,11 @@ list_groups (Handle *self, PyObject *args, PyObject *kwargs) {
605610
&group, &tmout))
606611
return NULL;
607612

613+
if (!self->rk) {
614+
PyErr_SetString(PyExc_RuntimeError, "Handle has been closed");
615+
return NULL;
616+
}
617+
608618
CallState_begin(self, &cs);
609619

610620
err = rd_kafka_list_groups(self->rk, group, &group_list,

0 commit comments

Comments
 (0)