diff --git a/docs/docs/en/getting-started/subscription/dynamic.md b/docs/docs/en/getting-started/subscription/dynamic.md index f0a144c021..61dca76ac6 100644 --- a/docs/docs/en/getting-started/subscription/dynamic.md +++ b/docs/docs/en/getting-started/subscription/dynamic.md @@ -17,9 +17,8 @@ However, the framework still allows you to do so in a suitable manner. async with TestKafkaBroker(broker) as br: subscriber = br.subscriber("test-topic", persistent=False) - await subscriber.start() - message = await subscriber.get_one() # does not work - await subscriber.stop() + async with subscriber: + message = await subscriber.get_one() # does not work ``` === "Confluent" ```python linenums="1" @@ -28,9 +27,9 @@ However, the framework still allows you to do so in a suitable manner. async with TestKafkaBroker(broker) as br: subscriber = br.subscriber("test-topic", persistent=False) - await subscriber.start() - message = await subscriber.get_one() # does not work - await subscriber.stop() + async with subscriber: + message = await subscriber.get_one() # does not work + ``` === "RabbitMQ" ```python linenums="1" @@ -39,9 +38,8 @@ However, the framework still allows you to do so in a suitable manner. async with TestRabbitBroker(broker) as br: subscriber = br.subscriber("test-queue", persistent=False) - await subscriber.start() - message = await subscriber.get_one() # does not work - await subscriber.stop() + async with subscriber: + message = await subscriber.get_one() # does not work ``` === "NATS" ```python linenums="1" @@ -50,9 +48,8 @@ However, the framework still allows you to do so in a suitable manner. async with TestNatsBroker(broker) as br: subscriber = br.subscriber("test-subject", persistent=False) - await subscriber.start() - message = await subscriber.get_one() # does not work - await subscriber.stop() + async with subscriber: + message = await subscriber.get_one() # does not work ``` === "Redis" ```python linenums="1" @@ -61,9 +58,8 @@ However, the framework still allows you to do so in a suitable manner. async with TestRedisBroker(broker) as br: subscriber = br.subscriber("test-channel", persistent=False) - await subscriber.start() - message = await subscriber.get_one() # does not work - await subscriber.stop() + async with subscriber: + message = await subscriber.get_one() # does not work ``` ## Consuming a Single Message @@ -81,6 +77,10 @@ To process a single message, you should create a subscriber and call the appropr ```python linenums="1" hl_lines="1 5" {!> docs_src/getting_started/subscription/kafka/dynamic.py [ln:6-10] !} ``` + Or so + ```python linenums="1" hl_lines="1" + {!> docs_src/getting_started/subscription/kafka/dynamic.py [ln:12-13] !} + ``` === "Confluent" ```python linenums="1" hl_lines="8" @@ -93,6 +93,11 @@ To process a single message, you should create a subscriber and call the appropr ```python linenums="1" hl_lines="1 5" {!> docs_src/getting_started/subscription/confluent/dynamic.py [ln:6-10] !} ``` + Or so + ```python linenums="1" hl_lines="1" + {!> docs_src/getting_started/subscription/confluent/dynamic.py [ln:12-13] !} + ``` + === "RabbitMQ" ```python linenums="1" hl_lines="8" @@ -105,6 +110,10 @@ To process a single message, you should create a subscriber and call the appropr ```python linenums="1" hl_lines="1 5" {!> docs_src/getting_started/subscription/rabbit/dynamic.py [ln:6-10] !} ``` + Or so + ```python linenums="1" hl_lines="1" + {!> docs_src/getting_started/subscription/rabbit/dynamic.py [ln:12-13] !} + ``` === "NATS" ```python linenums="1" hl_lines="8" @@ -117,6 +126,10 @@ To process a single message, you should create a subscriber and call the appropr ```python linenums="1" hl_lines="1 5" {!> docs_src/getting_started/subscription/nats/dynamic.py [ln:6-10] !} ``` + Or so + ```python linenums="1" hl_lines="1" + {!> docs_src/getting_started/subscription/nats/dynamic.py [ln:12-13] !} + ``` === "Redis" ```python linenums="1" hl_lines="8" @@ -129,6 +142,10 @@ To process a single message, you should create a subscriber and call the appropr ```python linenums="1" hl_lines="1 5" {!> docs_src/getting_started/subscription/redis/dynamic.py [ln:6-10] !} ``` + Or so + ```python linenums="1" hl_lines="1" + {!> docs_src/getting_started/subscription/redis/dynamic.py [ln:12-13] !} + ``` ## Iteration over messages diff --git a/docs/docs/en/release.md b/docs/docs/en/release.md index 84af613802..3b23e2bb75 100644 --- a/docs/docs/en/release.md +++ b/docs/docs/en/release.md @@ -2590,9 +2590,8 @@ subscriber = broker.subscriber("dynamic") subscriber(handler_method) ... broker.setup_subscriber(subscriber) -await subscriber.start() -... -await subscriber.close() +async with subscriber: + ... ``` 10. `faststream[docs]` distribution is removed. diff --git a/docs/docs_src/getting_started/subscription/confluent/dynamic.py b/docs/docs_src/getting_started/subscription/confluent/dynamic.py index a31e57081a..5983b063b3 100644 --- a/docs/docs_src/getting_started/subscription/confluent/dynamic.py +++ b/docs/docs_src/getting_started/subscription/confluent/dynamic.py @@ -9,4 +9,7 @@ async def main(): await subscriber.stop() + async with subscriber: + message: KafkaMessage | None = await subscriber.get_one(timeout=3.0) + return message diff --git a/docs/docs_src/getting_started/subscription/confluent/dynamic_iter.py b/docs/docs_src/getting_started/subscription/confluent/dynamic_iter.py index 0d300d1922..4cccf4b712 100644 --- a/docs/docs_src/getting_started/subscription/confluent/dynamic_iter.py +++ b/docs/docs_src/getting_started/subscription/confluent/dynamic_iter.py @@ -1,11 +1,9 @@ -from faststream.confluent import KafkaBroker, KafkaMessage +from faststream.confluent import KafkaBroker async def main(): async with KafkaBroker() as broker: subscriber = broker.subscriber("test-topic", persistent=False) - await subscriber.start() - async for msg in subscriber: # msg is KafkaMessage type - ... # do message process - - await subscriber.stop() + async with subscriber: + async for msg in subscriber: # msg is KafkaMessage type + ... # do message process diff --git a/docs/docs_src/getting_started/subscription/kafka/dynamic.py b/docs/docs_src/getting_started/subscription/kafka/dynamic.py index 2e1d4f5097..64e44a9036 100644 --- a/docs/docs_src/getting_started/subscription/kafka/dynamic.py +++ b/docs/docs_src/getting_started/subscription/kafka/dynamic.py @@ -9,4 +9,7 @@ async def main(): await subscriber.stop() + async with subscriber: + message: KafkaMessage | None = await subscriber.get_one(timeout=3.0) + return message diff --git a/docs/docs_src/getting_started/subscription/kafka/dynamic_iter.py b/docs/docs_src/getting_started/subscription/kafka/dynamic_iter.py index be9efcd7cd..31a3247115 100644 --- a/docs/docs_src/getting_started/subscription/kafka/dynamic_iter.py +++ b/docs/docs_src/getting_started/subscription/kafka/dynamic_iter.py @@ -1,11 +1,9 @@ -from faststream.kafka import KafkaBroker, KafkaMessage +from faststream.kafka import KafkaBroker async def main(): async with KafkaBroker() as broker: subscriber = broker.subscriber("test-topic", persistent=False) - await subscriber.start() - async for msg in subscriber: # msg is KafkaMessage type - ... # do message process - - await subscriber.stop() + async with subscriber: + async for msg in subscriber: # msg is KafkaMessage type + ... # do message process diff --git a/docs/docs_src/getting_started/subscription/nats/dynamic.py b/docs/docs_src/getting_started/subscription/nats/dynamic.py index 876fca8787..3cee2dbcad 100644 --- a/docs/docs_src/getting_started/subscription/nats/dynamic.py +++ b/docs/docs_src/getting_started/subscription/nats/dynamic.py @@ -9,4 +9,7 @@ async def main(): await subscriber.stop() + async with subscriber: + message: NatsMessage | None = await subscriber.get_one(timeout=3.0) + return message diff --git a/docs/docs_src/getting_started/subscription/nats/dynamic_iter.py b/docs/docs_src/getting_started/subscription/nats/dynamic_iter.py index 69fe1df908..2d6fe5ad9d 100644 --- a/docs/docs_src/getting_started/subscription/nats/dynamic_iter.py +++ b/docs/docs_src/getting_started/subscription/nats/dynamic_iter.py @@ -1,11 +1,9 @@ -from faststream.nats import NatsBroker, NatsMessage +from faststream.nats import NatsBroker async def main(): async with NatsBroker() as broker: subscriber = broker.subscriber("test-subject", persistent=False) - await subscriber.start() - async for msg in subscriber: # msg is NatsMessage type - ... # do message process - - await subscriber.stop() + async with subscriber: + async for msg in subscriber: # msg is NatsMessage type + ... # do message process diff --git a/docs/docs_src/getting_started/subscription/rabbit/dynamic.py b/docs/docs_src/getting_started/subscription/rabbit/dynamic.py index 891752eb51..f001f6a3d5 100644 --- a/docs/docs_src/getting_started/subscription/rabbit/dynamic.py +++ b/docs/docs_src/getting_started/subscription/rabbit/dynamic.py @@ -9,4 +9,7 @@ async def main(): await subscriber.stop() + async with subscriber: + message: RabbitMessage | None = await subscriber.get_one(timeout=3.0) + return message diff --git a/docs/docs_src/getting_started/subscription/rabbit/dynamic_iter.py b/docs/docs_src/getting_started/subscription/rabbit/dynamic_iter.py index 7a62951e7b..57c0c2a91c 100644 --- a/docs/docs_src/getting_started/subscription/rabbit/dynamic_iter.py +++ b/docs/docs_src/getting_started/subscription/rabbit/dynamic_iter.py @@ -1,11 +1,9 @@ -from faststream.rabbit import RabbitBroker, RabbitMessage +from faststream.rabbit import RabbitBroker async def main(): async with RabbitBroker() as broker: subscriber = broker.subscriber("test-queue", persistent=False) - await subscriber.start() - async for msg in subscriber: # msg is RabbitMessage type - ... # do message process - - await subscriber.stop() + async with subscriber: + async for msg in subscriber: # msg is RabbitMessage type + ... # do message process diff --git a/docs/docs_src/getting_started/subscription/redis/dynamic.py b/docs/docs_src/getting_started/subscription/redis/dynamic.py index 4c0ce1db0e..b9c5ddeb63 100644 --- a/docs/docs_src/getting_started/subscription/redis/dynamic.py +++ b/docs/docs_src/getting_started/subscription/redis/dynamic.py @@ -9,4 +9,7 @@ async def main(): await subscriber.stop() + async with subscriber: + message: RedisChannelMessage | None = await subscriber.get_one(timeout=3.0) + return message diff --git a/docs/docs_src/getting_started/subscription/redis/dynamic_iter.py b/docs/docs_src/getting_started/subscription/redis/dynamic_iter.py index ddc25cfae6..1118652faf 100644 --- a/docs/docs_src/getting_started/subscription/redis/dynamic_iter.py +++ b/docs/docs_src/getting_started/subscription/redis/dynamic_iter.py @@ -1,11 +1,9 @@ -from faststream.redis import RedisBroker, RedisMessage +from faststream.redis import RedisBroker async def main(): async with RedisBroker() as broker: subscriber = broker.subscriber("test-channel", persistent=False) - await subscriber.start() - async for msg in subscriber: # msg is RedisMessage type - ... # do message process - - await subscriber.stop() + async with subscriber: + async for msg in subscriber: # msg is RedisMessage type + ... # do message process diff --git a/faststream/_internal/endpoint/subscriber/usecase.py b/faststream/_internal/endpoint/subscriber/usecase.py index 5b3ed60bb8..de47aed3ef 100644 --- a/faststream/_internal/endpoint/subscriber/usecase.py +++ b/faststream/_internal/endpoint/subscriber/usecase.py @@ -2,6 +2,7 @@ from collections.abc import AsyncIterator, Callable, Iterable, Sequence from contextlib import AbstractContextManager, AsyncExitStack from itertools import chain +from types import TracebackType from typing import ( TYPE_CHECKING, Annotated, @@ -107,6 +108,18 @@ def __init__( def _broker_middlewares(self) -> Sequence["BrokerMiddleware[MsgType]"]: return self._outer_config.broker_middlewares + async def __aenter__(self) -> Self: + await self.start() + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> None: + await self.stop() + async def start(self) -> None: """Private method to start subscriber by broker.""" self.lock = MultiLock()