Skip to content

Commit e71cf8c

Browse files
authored
Add handling for backpressure
1 parent fa9fd6d commit e71cf8c

File tree

1 file changed

+5
-3
lines changed

1 file changed

+5
-3
lines changed

docs/howto/patterns.rst

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,13 @@ In-process queue
9393
For simple applications that need to be able to consume and act on multiple
9494
messages at one, an in-process queue can be leveraged::
9595

96-
queue = asyncio.Queue()
96+
MAX_WORKERS = os.cpu_count()
97+
98+
queue = asyncio.Queue(MAX_WORKERS)
9799

98100
async def consumer_handler(websocket):
99101
for message in websocket:
100-
queue.put(message)
102+
await queue.put(message)
101103

102104
async def worker():
103105
while True:
@@ -108,7 +110,7 @@ messages at one, an in-process queue can be leveraged::
108110
async def handler(websocket):
109111
async with asyncio.TaskGroup() as tg:
110112
tg.create_task(consumer_handler(websocket))
111-
for _ in range(os.cpu_count()):
113+
for _ in range(MAX_WORKERS):
112114
tg.create_task(worker())
113115

114116
Registration

0 commit comments

Comments
 (0)