1- use std:: sync:: Arc ;
2-
3- use flume:: Receiver ;
1+ use tokio:: sync:: { broadcast, watch} ;
42use tokio_util:: sync:: CancellationToken ;
53
4+ #[ derive( Clone ) ]
65pub struct WSFrame {
76 pub data : Vec < u8 > ,
87 pub width : u32 ,
98 pub height : u32 ,
109 pub stride : u32 ,
1110}
1211
13- pub async fn create_frame_ws ( frame_rx : Receiver < WSFrame > ) -> ( u16 , CancellationToken ) {
12+ pub async fn create_watch_frame_ws (
13+ frame_rx : watch:: Receiver < Option < WSFrame > > ,
14+ ) -> ( u16 , CancellationToken ) {
1415 use axum:: {
1516 extract:: {
1617 State ,
@@ -19,9 +20,8 @@ pub async fn create_frame_ws(frame_rx: Receiver<WSFrame>) -> (u16, CancellationT
1920 response:: IntoResponse ,
2021 routing:: get,
2122 } ;
22- use tokio:: sync:: Mutex ;
2323
24- type RouterState = Arc < Mutex < Receiver < WSFrame > > > ;
24+ type RouterState = watch :: Receiver < Option < WSFrame > > ;
2525
2626 #[ axum:: debug_handler]
2727 async fn ws_handler (
@@ -31,19 +31,136 @@ pub async fn create_frame_ws(frame_rx: Receiver<WSFrame>) -> (u16, CancellationT
3131 ws. on_upgrade ( move |socket| handle_socket ( socket, state) )
3232 }
3333
34- async fn handle_socket ( mut socket : WebSocket , state : RouterState ) {
35- let camera_rx = state. lock ( ) . await ;
34+ async fn handle_socket ( mut socket : WebSocket , mut camera_rx : RouterState ) {
3635 println ! ( "socket connection established" ) ;
3736 tracing:: info!( "Socket connection established" ) ;
3837 let now = std:: time:: Instant :: now ( ) ;
3938
39+ // Send the current frame immediately upon connection (if one exists)
40+ // This ensures the client doesn't wait for the next config change to see the image
41+ {
42+ let frame_opt = camera_rx. borrow ( ) . clone ( ) ;
43+ if let Some ( mut frame) = frame_opt {
44+ frame. data . extend_from_slice ( & frame. stride . to_le_bytes ( ) ) ;
45+ frame. data . extend_from_slice ( & frame. height . to_le_bytes ( ) ) ;
46+ frame. data . extend_from_slice ( & frame. width . to_le_bytes ( ) ) ;
47+
48+ if let Err ( e) = socket. send ( Message :: Binary ( frame. data ) ) . await {
49+ tracing:: error!( "Failed to send initial frame to socket: {:?}" , e) ;
50+ return ;
51+ }
52+ }
53+ }
54+
4055 loop {
4156 tokio:: select! {
42- _ = socket. recv( ) => {
43- tracing:: info!( "Received message from socket" ) ;
44- break ;
57+ msg = socket. recv( ) => {
58+ match msg {
59+ Some ( Ok ( Message :: Close ( _) ) ) | None => {
60+ tracing:: info!( "WebSocket closed" ) ;
61+ break ;
62+ }
63+ Some ( Ok ( _) ) => {
64+ tracing:: info!( "Received message from socket (ignoring)" ) ;
65+ }
66+ Some ( Err ( e) ) => {
67+ tracing:: error!( "WebSocket error: {:?}" , e) ;
68+ break ;
69+ }
70+ }
4571 } ,
46- incoming_frame = camera_rx. recv_async( ) => {
72+ res = camera_rx. changed( ) => {
73+ if res. is_err( ) {
74+ tracing:: error!( "Camera channel closed" ) ;
75+ break ;
76+ }
77+ let frame_opt = camera_rx. borrow( ) . clone( ) ;
78+ if let Some ( mut frame) = frame_opt {
79+ frame. data. extend_from_slice( & frame. stride. to_le_bytes( ) ) ;
80+ frame. data. extend_from_slice( & frame. height. to_le_bytes( ) ) ;
81+ frame. data. extend_from_slice( & frame. width. to_le_bytes( ) ) ;
82+
83+ if let Err ( e) = socket. send( Message :: Binary ( frame. data) ) . await {
84+ tracing:: error!( "Failed to send frame to socket: {:?}" , e) ;
85+ break ;
86+ }
87+ }
88+ }
89+ }
90+ }
91+
92+ let elapsed = now. elapsed ( ) ;
93+ println ! ( "Websocket closing after {elapsed:.2?}" ) ;
94+ tracing:: info!( "Websocket closing after {elapsed:.2?}" ) ;
95+ }
96+
97+ let router = axum:: Router :: new ( )
98+ . route ( "/" , get ( ws_handler) )
99+ . with_state ( frame_rx) ;
100+
101+ let listener = tokio:: net:: TcpListener :: bind ( "127.0.0.1:0" ) . await . unwrap ( ) ;
102+ let port = listener. local_addr ( ) . unwrap ( ) . port ( ) ;
103+ tracing:: info!( "WebSocket server listening on port {}" , port) ;
104+
105+ let cancel_token = CancellationToken :: new ( ) ;
106+ let cancel_token_child = cancel_token. child_token ( ) ;
107+ tokio:: spawn ( async move {
108+ let server = axum:: serve ( listener, router. into_make_service ( ) ) ;
109+ tokio:: select! {
110+ _ = server => { } ,
111+ _ = cancel_token. cancelled( ) => {
112+ println!( "WebSocket server shutting down" ) ;
113+ }
114+ }
115+ } ) ;
116+
117+ ( port, cancel_token_child)
118+ }
119+
120+ pub async fn create_frame_ws ( frame_tx : broadcast:: Sender < WSFrame > ) -> ( u16 , CancellationToken ) {
121+ use axum:: {
122+ extract:: {
123+ State ,
124+ ws:: { Message , WebSocket , WebSocketUpgrade } ,
125+ } ,
126+ response:: IntoResponse ,
127+ routing:: get,
128+ } ;
129+
130+ type RouterState = broadcast:: Sender < WSFrame > ;
131+
132+ #[ axum:: debug_handler]
133+ async fn ws_handler (
134+ ws : WebSocketUpgrade ,
135+ State ( state) : State < RouterState > ,
136+ ) -> impl IntoResponse {
137+ let rx = state. subscribe ( ) ;
138+ ws. on_upgrade ( move |socket| handle_socket ( socket, rx) )
139+ }
140+
141+ async fn handle_socket ( mut socket : WebSocket , mut camera_rx : broadcast:: Receiver < WSFrame > ) {
142+ println ! ( "socket connection established" ) ;
143+ tracing:: info!( "Socket connection established" ) ;
144+ let now = std:: time:: Instant :: now ( ) ;
145+
146+ loop {
147+ tokio:: select! {
148+ msg = socket. recv( ) => {
149+ match msg {
150+ Some ( Ok ( Message :: Close ( _) ) ) | None => {
151+ tracing:: info!( "WebSocket closed" ) ;
152+ break ;
153+ }
154+ Some ( Ok ( _) ) => {
155+ tracing:: info!( "Received message from socket (ignoring)" ) ;
156+ }
157+ Some ( Err ( e) ) => {
158+ tracing:: error!( "WebSocket error: {:?}" , e) ;
159+ break ;
160+ }
161+ }
162+ } ,
163+ incoming_frame = camera_rx. recv( ) => {
47164 match incoming_frame {
48165 Ok ( mut frame) => {
49166 frame. data. extend_from_slice( & frame. stride. to_le_bytes( ) ) ;
@@ -55,13 +172,16 @@ pub async fn create_frame_ws(frame_rx: Receiver<WSFrame>) -> (u16, CancellationT
55172 break ;
56173 }
57174 }
58- Err ( e ) => {
175+ Err ( broadcast :: error :: RecvError :: Closed ) => {
59176 tracing:: error!(
60- "Connection has been lost! Shutting down websocket server: {:?}" ,
61- e
177+ "Connection has been lost! Shutting down websocket server"
62178 ) ;
63179 break ;
64180 }
181+ Err ( broadcast:: error:: RecvError :: Lagged ( skipped) ) => {
182+ tracing:: warn!( "Missed {skipped} frames on websocket receiver" ) ;
183+ continue ;
184+ }
65185 }
66186 }
67187 }
@@ -74,7 +194,7 @@ pub async fn create_frame_ws(frame_rx: Receiver<WSFrame>) -> (u16, CancellationT
74194
75195 let router = axum:: Router :: new ( )
76196 . route ( "/" , get ( ws_handler) )
77- . with_state ( Arc :: new ( Mutex :: new ( frame_rx ) ) ) ;
197+ . with_state ( frame_tx ) ;
78198
79199 let listener = tokio:: net:: TcpListener :: bind ( "127.0.0.1:0" ) . await . unwrap ( ) ;
80200 let port = listener. local_addr ( ) . unwrap ( ) . port ( ) ;
0 commit comments