Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 32 additions & 15 deletions docs/docs/en/getting-started/subscription/dynamic.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ However, the framework still allows you to do so in a suitable manner.
async with TestKafkaBroker(broker) as br:
subscriber = br.subscriber("test-topic", persistent=False)

await subscriber.start()
message = await subscriber.get_one() # does not work
await subscriber.stop()
async with subscriber:
message = await subscriber.get_one() # does not work
```
=== "Confluent"
```python linenums="1"
Expand All @@ -28,9 +27,9 @@ However, the framework still allows you to do so in a suitable manner.
async with TestKafkaBroker(broker) as br:
subscriber = br.subscriber("test-topic", persistent=False)

await subscriber.start()
message = await subscriber.get_one() # does not work
await subscriber.stop()
async with subscriber:
message = await subscriber.get_one() # does not work

```
=== "RabbitMQ"
```python linenums="1"
Expand All @@ -39,9 +38,8 @@ However, the framework still allows you to do so in a suitable manner.
async with TestRabbitBroker(broker) as br:
subscriber = br.subscriber("test-queue", persistent=False)

await subscriber.start()
message = await subscriber.get_one() # does not work
await subscriber.stop()
async with subscriber:
message = await subscriber.get_one() # does not work
```
=== "NATS"
```python linenums="1"
Expand All @@ -50,9 +48,8 @@ However, the framework still allows you to do so in a suitable manner.
async with TestNatsBroker(broker) as br:
subscriber = br.subscriber("test-subject", persistent=False)

await subscriber.start()
message = await subscriber.get_one() # does not work
await subscriber.stop()
async with subscriber:
message = await subscriber.get_one() # does not work
```
=== "Redis"
```python linenums="1"
Expand All @@ -61,9 +58,8 @@ However, the framework still allows you to do so in a suitable manner.
async with TestRedisBroker(broker) as br:
subscriber = br.subscriber("test-channel", persistent=False)

await subscriber.start()
message = await subscriber.get_one() # does not work
await subscriber.stop()
async with subscriber:
message = await subscriber.get_one() # does not work
```

## Consuming a Single Message
Expand All @@ -81,6 +77,10 @@ To process a single message, you should create a subscriber and call the appropr
```python linenums="1" hl_lines="1 5"
{!> docs_src/getting_started/subscription/kafka/dynamic.py [ln:6-10] !}
```
Or so
```python linenums="1" hl_lines="1"
{!> docs_src/getting_started/subscription/kafka/dynamic.py [ln:12-13] !}
```

=== "Confluent"
```python linenums="1" hl_lines="8"
Expand All @@ -93,6 +93,11 @@ To process a single message, you should create a subscriber and call the appropr
```python linenums="1" hl_lines="1 5"
{!> docs_src/getting_started/subscription/confluent/dynamic.py [ln:6-10] !}
```
Or so
```python linenums="1" hl_lines="1"
{!> docs_src/getting_started/subscription/confluent/dynamic.py [ln:12-13] !}
```


=== "RabbitMQ"
```python linenums="1" hl_lines="8"
Expand All @@ -105,6 +110,10 @@ To process a single message, you should create a subscriber and call the appropr
```python linenums="1" hl_lines="1 5"
{!> docs_src/getting_started/subscription/rabbit/dynamic.py [ln:6-10] !}
```
Or so
```python linenums="1" hl_lines="1"
{!> docs_src/getting_started/subscription/rabbit/dynamic.py [ln:12-13] !}
```

=== "NATS"
```python linenums="1" hl_lines="8"
Expand All @@ -117,6 +126,10 @@ To process a single message, you should create a subscriber and call the appropr
```python linenums="1" hl_lines="1 5"
{!> docs_src/getting_started/subscription/nats/dynamic.py [ln:6-10] !}
```
Or so
```python linenums="1" hl_lines="1"
{!> docs_src/getting_started/subscription/nats/dynamic.py [ln:12-13] !}
```

=== "Redis"
```python linenums="1" hl_lines="8"
Expand All @@ -129,6 +142,10 @@ To process a single message, you should create a subscriber and call the appropr
```python linenums="1" hl_lines="1 5"
{!> docs_src/getting_started/subscription/redis/dynamic.py [ln:6-10] !}
```
Or so
```python linenums="1" hl_lines="1"
{!> docs_src/getting_started/subscription/redis/dynamic.py [ln:12-13] !}
```

## Iteration over messages

Expand Down
5 changes: 2 additions & 3 deletions docs/docs/en/release.md
Original file line number Diff line number Diff line change
Expand Up @@ -2202,9 +2202,8 @@ subscriber = broker.subscriber("dynamic")
subscriber(handler_method)
...
broker.setup_subscriber(subscriber)
await subscriber.start()
...
await subscriber.close()
async with subscriber:
...
```

10. `faststream[docs]` distribution is removed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@ async def main():

await subscriber.stop()

async with subscriber:
message: KafkaMessage | None = await subscriber.get_one(timeout=3.0)

return message
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from faststream.confluent import KafkaBroker, KafkaMessage
from faststream.confluent import KafkaBroker

async def main():
async with KafkaBroker() as broker:
subscriber = broker.subscriber("test-topic", persistent=False)
await subscriber.start()

async for msg in subscriber: # msg is KafkaMessage type
... # do message process

await subscriber.stop()
async with subscriber:
async for msg in subscriber: # msg is KafkaMessage type
... # do message process
3 changes: 3 additions & 0 deletions docs/docs_src/getting_started/subscription/kafka/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@ async def main():

await subscriber.stop()

async with subscriber:
message: KafkaMessage | None = await subscriber.get_one(timeout=3.0)

return message
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from faststream.kafka import KafkaBroker, KafkaMessage
from faststream.kafka import KafkaBroker

async def main():
async with KafkaBroker() as broker:
subscriber = broker.subscriber("test-topic", persistent=False)
await subscriber.start()

async for msg in subscriber: # msg is KafkaMessage type
... # do message process

await subscriber.stop()
async with subscriber:
async for msg in subscriber: # msg is KafkaMessage type
... # do message process
3 changes: 3 additions & 0 deletions docs/docs_src/getting_started/subscription/nats/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@ async def main():

await subscriber.stop()

async with subscriber:
message: NatsMessage | None = await subscriber.get_one(timeout=3.0)

return message
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from faststream.nats import NatsBroker, NatsMessage
from faststream.nats import NatsBroker

async def main():
async with NatsBroker() as broker:
subscriber = broker.subscriber("test-subject", persistent=False)
await subscriber.start()

async for msg in subscriber: # msg is NatsMessage type
... # do message process

await subscriber.stop()
async with subscriber:
async for msg in subscriber: # msg is NatsMessage type
... # do message process
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@ async def main():

await subscriber.stop()

async with subscriber:
message: RabbitMessage | None = await subscriber.get_one(timeout=3.0)

return message
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from faststream.rabbit import RabbitBroker, RabbitMessage
from faststream.rabbit import RabbitBroker

async def main():
async with RabbitBroker() as broker:
subscriber = broker.subscriber("test-queue", persistent=False)
await subscriber.start()

async for msg in subscriber: # msg is RabbitMessage type
... # do message process

await subscriber.stop()
async with subscriber:
async for msg in subscriber: # msg is RabbitMessage type
... # do message process
3 changes: 3 additions & 0 deletions docs/docs_src/getting_started/subscription/redis/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@ async def main():

await subscriber.stop()

async with subscriber:
message: RedisChannelMessage | None = await subscriber.get_one(timeout=3.0)

return message
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
from faststream.redis import RedisBroker, RedisMessage
from faststream.redis import RedisBroker

async def main():
async with RedisBroker() as broker:
subscriber = broker.subscriber("test-channel", persistent=False)
await subscriber.start()

async for msg in subscriber: # msg is RedisMessage type
... # do message process

await subscriber.stop()
async with subscriber:
async for msg in subscriber: # msg is RedisMessage type
... # do message process
13 changes: 13 additions & 0 deletions faststream/_internal/endpoint/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from collections.abc import AsyncIterator, Callable, Iterable, Sequence
from contextlib import AbstractContextManager, AsyncExitStack
from itertools import chain
from types import TracebackType
from typing import (
TYPE_CHECKING,
Annotated,
Expand Down Expand Up @@ -105,6 +106,18 @@ def __init__(
def _broker_middlewares(self) -> Sequence["BrokerMiddleware[MsgType]"]:
return self._outer_config.broker_middlewares

async def __aenter__(self) -> Self:
await self.start()
return self

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> None:
await self.stop()

async def start(self) -> None:
"""Private method to start subscriber by broker."""
self.lock = MultiLock()
Expand Down
Loading