@@ -3,98 +3,29 @@ use crate::endpoint::{Endpoint, InputReceiver, OutputSender};
33use bytes:: Bytes ;
44use futures:: future:: BoxFuture ;
55use futures:: { FutureExt , TryStreamExt } ;
6+ use http:: header:: CONTENT_TYPE ;
7+ use http:: { response, Request , Response } ;
68use http_body_util:: { BodyExt , Either , Full } ;
79use hyper:: body:: { Body , Frame , Incoming } ;
8- use hyper:: header:: CONTENT_TYPE ;
9- use hyper:: http:: response;
10- use hyper:: server:: conn:: http2;
1110use hyper:: service:: Service ;
12- use hyper:: { Request , Response } ;
13- use hyper_util:: rt:: { TokioExecutor , TokioIo } ;
1411use restate_sdk_shared_core:: ResponseHead ;
1512use std:: convert:: Infallible ;
16- use std:: future:: { ready, Future , Ready } ;
17- use std:: net:: SocketAddr ;
13+ use std:: future:: { ready, Ready } ;
1814use std:: ops:: Deref ;
1915use std:: pin:: Pin ;
2016use std:: task:: { ready, Context , Poll } ;
21- use std:: time:: Duration ;
22- use tokio:: net:: TcpListener ;
2317use tokio:: sync:: mpsc;
24- use tracing:: { info , warn} ;
18+ use tracing:: warn;
2519
26- pub struct HyperServer {
27- endpoint : Endpoint ,
28- }
29-
30- impl From < Endpoint > for HyperServer {
31- fn from ( endpoint : Endpoint ) -> Self {
32- Self { endpoint }
33- }
34- }
20+ #[ derive( Clone ) ]
21+ pub struct HyperEndpoint ( Endpoint ) ;
3522
36- impl HyperServer {
23+ impl HyperEndpoint {
3724 pub fn new ( endpoint : Endpoint ) -> Self {
38- Self { endpoint }
39- }
40-
41- pub async fn listen_and_serve ( self , addr : SocketAddr ) {
42- let listener = TcpListener :: bind ( addr) . await . expect ( "listener can bind" ) ;
43- self . serve ( listener) . await ;
44- }
45-
46- pub async fn serve ( self , listener : TcpListener ) {
47- self . serve_with_cancel ( listener, tokio:: signal:: ctrl_c ( ) . map ( |_| ( ) ) )
48- . await ;
49- }
50-
51- pub async fn serve_with_cancel ( self , listener : TcpListener , cancel_signal_future : impl Future ) {
52- let endpoint = HyperEndpoint ( self . endpoint ) ;
53- let graceful = hyper_util:: server:: graceful:: GracefulShutdown :: new ( ) ;
54-
55- // when this signal completes, start shutdown
56- let mut signal = std:: pin:: pin!( cancel_signal_future) ;
57-
58- info ! ( "Starting listening on {}" , listener. local_addr( ) . unwrap( ) ) ;
59-
60- // Our server accept loop
61- loop {
62- tokio:: select! {
63- Ok ( ( stream, remote) ) = listener. accept( ) => {
64- let endpoint = endpoint. clone( ) ;
65-
66- let conn = http2:: Builder :: new( TokioExecutor :: default ( ) )
67- . serve_connection( TokioIo :: new( stream) , endpoint) ;
68-
69- let fut = graceful. watch( conn) ;
70-
71- tokio:: spawn( async move {
72- if let Err ( e) = fut. await {
73- warn!( "Error serving connection {remote}: {:?}" , e) ;
74- }
75- } ) ;
76- } ,
77- _ = & mut signal => {
78- info!( "Shutting down" ) ;
79- // stop the accept loop
80- break ;
81- }
82- }
83- }
84-
85- // Wait graceful shutdown
86- tokio:: select! {
87- _ = graceful. shutdown( ) => { } ,
88- _ = tokio:: time:: sleep( Duration :: from_secs( 10 ) ) => {
89- warn!( "Timed out waiting for all connections to close" ) ;
90- }
91- }
25+ Self ( endpoint)
9226 }
9327}
9428
95- #[ derive( Clone ) ]
96- struct HyperEndpoint ( Endpoint ) ;
97-
9829impl Service < Request < Incoming > > for HyperEndpoint {
9930 type Response = Response < Either < Full < Bytes > , BidiStreamRunner > > ;
10031 type Error = endpoint:: Error ;
@@ -155,8 +86,7 @@ fn response_builder_from_response_head(response_head: ResponseHead) -> response:
15586 response_builder
15687}
15788
158- // TODO use pin_project
159- struct BidiStreamRunner {
89+ pub struct BidiStreamRunner {
16090 fut : Option < BoxFuture < ' static , Result < ( ) , endpoint:: Error > > > ,
16191 output_rx : mpsc:: UnboundedReceiver < Bytes > ,
16292 end_stream : bool ,
0 commit comments