22//! module documentation in `future_support.rs`.
33
44use crate :: async_support:: waitable:: { WaitableOp , WaitableOperation } ;
5- use crate :: async_support:: { AbiBuffer , ReturnCode } ;
5+ use crate :: async_support:: { AbiBuffer , ReturnCode , DROPPED } ;
66use {
77 crate :: Cleanup ,
88 std:: {
@@ -85,12 +85,17 @@ pub unsafe fn stream_new<T>(
8585pub struct StreamWriter < T : ' static > {
8686 handle : u32 ,
8787 vtable : & ' static StreamVtable < T > ,
88+ done : bool ,
8889}
8990
9091impl < T > StreamWriter < T > {
9192 #[ doc( hidden) ]
9293 pub unsafe fn new ( handle : u32 , vtable : & ' static StreamVtable < T > ) -> Self {
93- Self { handle, vtable }
94+ Self {
95+ handle,
96+ vtable,
97+ done : false ,
98+ }
9499 }
95100
96101 /// Initiate a write of the `values` provided into this stream.
@@ -239,6 +244,10 @@ where
239244 type Cancel = ( StreamResult , AbiBuffer < T > ) ;
240245
241246 fn start ( ( writer, buf) : Self :: Start ) -> ( u32 , Self :: InProgress ) {
247+ if writer. done {
248+ return ( DROPPED , ( writer, buf) ) ;
249+ }
250+
242251 let ( ptr, len) = buf. abi_ptr_and_len ( ) ;
243252 // SAFETY: sure hope this is safe, everything in this module and
244253 // `AbiBuffer` is trying to make this safe.
@@ -262,9 +271,14 @@ where
262271 ReturnCode :: Blocked => Err ( ( writer, buf) ) ,
263272 ReturnCode :: Dropped ( 0 ) => Ok ( ( StreamResult :: Dropped , buf) ) ,
264273 ReturnCode :: Cancelled ( 0 ) => Ok ( ( StreamResult :: Cancelled , buf) ) ,
265- ReturnCode :: Completed ( amt) | ReturnCode :: Dropped ( amt) | ReturnCode :: Cancelled ( amt) => {
274+ code @ ( ReturnCode :: Completed ( amt)
275+ | ReturnCode :: Dropped ( amt)
276+ | ReturnCode :: Cancelled ( amt) ) => {
266277 let amt = amt. try_into ( ) . unwrap ( ) ;
267278 buf. advance ( amt) ;
279+ if let ReturnCode :: Dropped ( _) = code {
280+ writer. done = true ;
281+ }
268282 Ok ( ( StreamResult :: Complete ( amt) , buf) )
269283 }
270284 }
@@ -321,6 +335,7 @@ impl<'a, T: 'static> StreamWrite<'a, T> {
321335pub struct StreamReader < T : ' static > {
322336 handle : AtomicU32 ,
323337 vtable : & ' static StreamVtable < T > ,
338+ done : bool ,
324339}
325340
326341impl < T > fmt:: Debug for StreamReader < T > {
@@ -337,6 +352,7 @@ impl<T> StreamReader<T> {
337352 Self {
338353 handle : AtomicU32 :: new ( handle) ,
339354 vtable,
355+ done : false ,
340356 }
341357 }
342358
@@ -444,6 +460,10 @@ where
444460 type Cancel = ( StreamResult , Vec < T > ) ;
445461
446462 fn start ( ( reader, mut buf) : Self :: Start ) -> ( u32 , Self :: InProgress ) {
463+ if reader. done {
464+ return ( DROPPED , ( reader, buf, None ) ) ;
465+ }
466+
447467 let cap = buf. spare_capacity_mut ( ) ;
448468 let ptr;
449469 let cleanup;
@@ -493,7 +513,9 @@ where
493513 // the read allocation?
494514 ReturnCode :: Cancelled ( 0 ) => Ok ( ( StreamResult :: Cancelled , buf) ) ,
495515
496- ReturnCode :: Completed ( amt) | ReturnCode :: Dropped ( amt) | ReturnCode :: Cancelled ( amt) => {
516+ code @ ( ReturnCode :: Completed ( amt)
517+ | ReturnCode :: Dropped ( amt)
518+ | ReturnCode :: Cancelled ( amt) ) => {
497519 let amt = usize:: try_from ( amt) . unwrap ( ) ;
498520 let cur_len = buf. len ( ) ;
499521 assert ! ( amt <= buf. capacity( ) - cur_len) ;
@@ -523,6 +545,9 @@ where
523545 // Intentionally dispose of `cleanup` here as, if it was used, all
524546 // allocations have been read from it and appended to `buf`.
525547 drop ( cleanup) ;
548+ if let ReturnCode :: Dropped ( _) = code {
549+ reader. done = true ;
550+ }
526551 Ok ( ( StreamResult :: Complete ( amt) , buf) )
527552 }
528553 }
0 commit comments