Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.8-alpine
FROM python:3.11-alpine
WORKDIR /code
# ENV FLASK_APP app.py
RUN apk add --no-cache gcc musl-dev linux-headers make python3-dev openssl-dev libffi-dev git
Expand Down
32 changes: 28 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,48 @@
# redis-streams-fastapi-chat
A simple demo of Redis Streams backed Chat app using Websockets, Python Asyncio and FastAPI/Starlette.

Requires Python version >= 3.6 and Redis
Requires Python version >= 3.11 and Redis 7+

# Overview
This project has been created to help understand some related concepts. Python standard library asyncio, websockets (which are often cited as a classic use case for async python code), also Redis Streams. It is very much inteded to be an intentionally simple starting point rather than a usable product as is.

# Installation

## Local Development

```shell
$ pip install -r requirements.txt
```

Make sure you have Redis running locally:
```shell
$ redis-server
```

# Usage

## Local Development

```shell
$ python chat.py
```

# Docker compose
If you don't have redis installed you can use the docker-compose.yml file to set up a
working environment.
Then open http://localhost:9080 in your browser.

## Docker Compose

The easiest way to run the application with all dependencies:

```shell
$ docker-compose up
```

This will start both the chat application and Redis in containers. The app will be available at http://localhost:9080

## Environment Variables

The following environment variables can be configured:

- `REDIS_HOST` - Redis server hostname (default: `localhost`, set to `redis` in Docker)
- `REDIS_PORT` - Redis server port (default: `6379`)

128 changes: 71 additions & 57 deletions chat.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
import os
import asyncio
import aioredis
from redis import asyncio as aioredis
import uvloop
import socket
import uuid
import contextvars
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends, Request
from starlette.staticfiles import StaticFiles
from starlette.templating import Jinja2Templates
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.websockets import WebSocket, WebSocketDisconnect

from websockets.exceptions import ConnectionClosedError, ConnectionClosedOK
from aioredis.errors import ConnectionClosedError as ServerConnectionClosedError
from redis.exceptions import ConnectionError as ServerConnectionClosedError

REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_HOST = os.getenv('REDIS_HOST', 'localhost')
REDIS_PORT = int(os.getenv('REDIS_PORT', '6379'))
XREAD_TIMEOUT = 0
XREAD_COUNT = 100
NUM_PREVIOUS = 30
Expand Down Expand Up @@ -43,7 +44,32 @@ async def dispatch(self, request, call_next):


asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
app = FastAPI()

@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
try:
redis_url = f"redis://{REDIS_HOST}:{REDIS_PORT}"
pool = await aioredis.from_url(
redis_url,
encoding='utf-8',
decode_responses=True,
max_connections=20
)
cvar_redis.set(pool)
print("Connected to Redis on ", REDIS_HOST, REDIS_PORT)
except ConnectionRefusedError as e:
print('cannot connect to redis on:', REDIS_HOST, REDIS_PORT)

yield

# Shutdown
redis = cvar_redis.get()
if redis:
await redis.aclose()
print("closed connection Redis on ", REDIS_HOST, REDIS_PORT)

app = FastAPI(lifespan=lifespan)
app.add_middleware(CustomHeaderMiddleware)
templates = Jinja2Templates(directory="templates")

Expand Down Expand Up @@ -73,8 +99,12 @@ def get_local_ip():

async def get_redis_pool():
try:
pool = await aioredis.create_redis_pool(
(REDIS_HOST, REDIS_PORT), encoding='utf-8')
redis_url = f"redis://{REDIS_HOST}:{REDIS_PORT}"
pool = await aioredis.from_url(
redis_url,
encoding='utf-8',
decode_responses=True
)
return pool
except ConnectionRefusedError as e:
print('cannot connect to redis on:', REDIS_HOST, REDIS_PORT)
Expand All @@ -97,21 +127,20 @@ async def ws_send_moderator(websocket: WebSocket, chat_info: dict):
"""
pool = await get_redis_pool()
streams = chat_info['room'].split(',')
latest_ids = ['$' for i in streams]
latest_ids = {stream: '$' for stream in streams}
ws_connected = True
print(streams, latest_ids)
while pool and ws_connected:
try:
events = await pool.xread(
streams=streams,
streams=latest_ids,
count=XREAD_COUNT,
timeout=XREAD_TIMEOUT,
latest_ids=latest_ids
block=5000 # Block for 5 seconds waiting for new messages
)
for _, e_id, e in events:
e['e_id'] = e_id
await websocket.send_json(e)
#latest_ids = [e_id]
for stream, messages in events:
for e_id, e in messages:
e['e_id'] = e_id
await websocket.send_json(e)
except ConnectionClosedError:
ws_connected = False

Expand All @@ -130,18 +159,19 @@ async def ws_send(websocket: WebSocket, chat_info: dict):
:type chat_info:
"""
pool = await get_redis_pool()
latest_ids = ['$']
stream_key = cvar_tenant.get() + ":stream"
latest_ids = {stream_key: '$'}
ws_connected = True
first_run = True
while pool and ws_connected:
try:
if first_run:
# fetch some previous chat history
events = await pool.xrevrange(
stream=cvar_tenant.get() + ":stream",
name=stream_key,
count=NUM_PREVIOUS,
start='+',
stop='-'
min='-',
max='+'
)
first_run = False
events.reverse()
Expand All @@ -150,15 +180,15 @@ async def ws_send(websocket: WebSocket, chat_info: dict):
await websocket.send_json(e)
else:
events = await pool.xread(
streams=[cvar_tenant.get() + ":stream"],
streams=latest_ids,
count=XREAD_COUNT,
timeout=XREAD_TIMEOUT,
latest_ids=latest_ids
block=5000 # Block for 5 seconds waiting for new messages
)
for _, e_id, e in events:
e['e_id'] = e_id
await websocket.send_json(e)
latest_ids = [e_id]
for stream, messages in events:
for e_id, e in messages:
e['e_id'] = e_id
await websocket.send_json(e)
latest_ids = {stream_key: e_id}
#print('################contextvar ', cvar_tenant.get())
except ConnectionClosedError:
ws_connected = False
Expand All @@ -169,7 +199,7 @@ async def ws_send(websocket: WebSocket, chat_info: dict):
except ServerConnectionClosedError:
print('redis server connection closed')
return
pool.close()
await pool.aclose()


async def ws_recieve(websocket: WebSocket, chat_info: dict):
Expand Down Expand Up @@ -205,10 +235,12 @@ async def ws_recieve(websocket: WebSocket, chat_info: dict):
'type': 'comment',
'room': chat_info['room']
}
await pool.xadd(stream=cvar_tenant.get() + ":stream",
fields=fields,
message_id=b'*',
max_len=STREAM_MAX_LEN)
await pool.xadd(
name=cvar_tenant.get() + ":stream",
fields=fields,
id='*',
maxlen=STREAM_MAX_LEN
)
#print('################contextvar ', cvar_tenant.get())
except WebSocketDisconnect:
await remove_room_user(chat_info, pool)
Expand All @@ -223,7 +255,7 @@ async def ws_recieve(websocket: WebSocket, chat_info: dict):
print('redis server connection closed')
return

pool.close()
await pool.aclose()


async def add_room_user(chat_info: dict, pool):
Expand Down Expand Up @@ -259,10 +291,12 @@ async def announce(pool, chat_info: dict, action: str):
}
#print(fields)

await pool.xadd(stream=cvar_tenant.get() + ":stream",
fields=fields,
message_id=b'*',
max_len=STREAM_MAX_LEN)
await pool.xadd(
name=cvar_tenant.get() + ":stream",
fields=fields,
id='*',
maxlen=STREAM_MAX_LEN
)


async def chat_info_vars(username: str = None, room: str = None):
Expand Down Expand Up @@ -355,30 +389,10 @@ async def verify_user_for_room(chat_info):
# whitelist rooms
if not chat_info['room'] in ALLOWED_ROOMS:
verified = False
pool.close()
await pool.aclose()
return verified


@app.on_event("startup")
async def handle_startup():
try:
pool = await aioredis.create_redis_pool(
(REDIS_HOST, REDIS_PORT), encoding='utf-8', maxsize=20)
cvar_redis.set(pool)
print("Connected to Redis on ", REDIS_HOST, REDIS_PORT)
except ConnectionRefusedError as e:
print('cannot connect to redis on:', REDIS_HOST, REDIS_PORT)
return


@app.on_event("shutdown")
async def handle_shutdown():
redis = cvar_redis.get()
redis.close()
await redis.wait_closed()
print("closed connection Redis on ", REDIS_HOST, REDIS_PORT)


if __name__ == "__main__":
import uvicorn
print(dir(app))
Expand Down
9 changes: 6 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ services:
- "8082:8082"
volumes:
- .:/code
links:
- "redis"
environment:
- REDIS_HOST=redis
- REDIS_PORT=6379
depends_on:
- redis
redis:
image: "redis:6.0-rc2-alpine3.11"
image: "redis:7-alpine"
ports:
- 6379:6379
23 changes: 11 additions & 12 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
uvicorn==0.12.1
websockets==9.1
fastapi==0.65.2
aioredis==1.3.1
redis==4.5.4
uvloop==0.15.2
jinja2==2.11.3
aiofiles==0.6.0
httpx==0.23.0
itsdangerous==1.1.0
databases[sqlite]==0.4.3
sqlalchemy==1.3.0
uvicorn==0.30.6
websockets==12.0
fastapi==0.115.0
redis==5.0.8
uvloop==0.20.0
jinja2==3.1.4
aiofiles==24.1.0
httpx==0.27.2
itsdangerous==2.2.0
databases[sqlite]==0.9.0
sqlalchemy==2.0.35
Loading