@@ -35,12 +35,30 @@ pub struct StreamVtable<T> {
3535 future : u32 ,
3636 values : & mut [ MaybeUninit < T > ] ,
3737 ) -> Pin < Box < dyn Future < Output = Option < Result < usize , ErrorContext > > > + ' _ > > ,
38- pub cancel_write : fn ( future : u32 ) ,
39- pub cancel_read : fn ( future : u32 ) ,
40- pub close_writable : fn ( future : u32 , err_ctx : u32 ) ,
41- pub close_readable : fn ( future : u32 ) ,
38+ pub cancel_write : unsafe extern "C" fn ( future : u32 ) -> u32 ,
39+ pub cancel_read : unsafe extern "C" fn ( future : u32 ) -> u32 ,
40+ pub close_writable : unsafe extern "C" fn ( future : u32 , err_ctx : u32 ) ,
41+ pub close_readable : unsafe extern "C" fn ( future : u32 ) ,
42+ pub new : unsafe extern "C" fn ( ) -> u32 ,
4243}
4344
45+ /// Helper function to create a new read/write pair for a component model
46+ /// stream.
47+ pub unsafe fn stream_new < T > (
48+ vtable : & ' static StreamVtable < T > ,
49+ ) -> ( StreamWriter < T > , StreamReader < T > ) {
50+ let handle = unsafe { ( vtable. new ) ( ) } ;
51+ super :: with_entry ( handle, |entry| match entry {
52+ Entry :: Vacant ( entry) => {
53+ entry. insert ( Handle :: LocalOpen ) ;
54+ }
55+ Entry :: Occupied ( _) => unreachable ! ( ) ,
56+ } ) ;
57+ (
58+ StreamWriter :: new ( handle, vtable) ,
59+ StreamReader :: new ( handle, vtable) ,
60+ )
61+ }
4462struct CancelWriteOnDrop < T : ' static > {
4563 handle : Option < u32 > ,
4664 vtable : & ' static StreamVtable < T > ,
@@ -60,7 +78,11 @@ impl<T> Drop for CancelWriteOnDrop<T> {
6078 Handle :: LocalReady ( ..) => {
6179 entry. insert ( Handle :: LocalOpen ) ;
6280 }
63- Handle :: Write => ( self . vtable . cancel_write ) ( handle) ,
81+ Handle :: Write => unsafe {
82+ // TODO: spec-wise this can return `BLOCKED` which seems
83+ // bad?
84+ ( self . vtable . cancel_write ) ( handle) ;
85+ } ,
6486 } ,
6587 } ) ;
6688 }
@@ -221,18 +243,20 @@ impl<T> Drop for StreamWriter<T> {
221243 entry. insert ( Handle :: LocalClosed ) ;
222244 }
223245 Handle :: Read => unreachable ! ( ) ,
224- Handle :: Write | Handle :: LocalClosed => {
246+ Handle :: Write | Handle :: LocalClosed => unsafe {
225247 entry. remove ( ) ;
226248 ( self . vtable . close_writable ) ( self . handle , 0 ) ;
227- }
249+ } ,
228250 Handle :: WriteClosedErr ( _) => match entry. remove ( ) {
229251 // Care is taken to avoid dropping the ErrorContext before close_writable is called.
230252 // If the error context is dropped prematurely, the component may garbage collect
231253 // the error context before it can be used/referenced by close_writable().
232- Handle :: WriteClosedErr ( Some ( e) ) => {
254+ Handle :: WriteClosedErr ( Some ( e) ) => unsafe {
233255 ( self . vtable . close_writable ) ( self . handle , e. handle )
234- }
235- Handle :: WriteClosedErr ( None ) => ( self . vtable . close_writable ) ( self . handle , 0 ) ,
256+ } ,
257+ Handle :: WriteClosedErr ( None ) => unsafe {
258+ ( self . vtable . close_writable ) ( self . handle , 0 )
259+ } ,
236260 _ => unreachable ! ( ) ,
237261 } ,
238262 } ,
@@ -259,7 +283,11 @@ impl<T> Drop for CancelReadOnDrop<T> {
259283 Handle :: LocalWaiting ( _) => {
260284 entry. insert ( Handle :: LocalOpen ) ;
261285 }
262- Handle :: Read => ( self . vtable . cancel_read ) ( handle) ,
286+ Handle :: Read => unsafe {
287+ // TODO: spec-wise this can return `BLOCKED` which seems
288+ // bad?
289+ ( self . vtable . cancel_read ) ( handle) ;
290+ } ,
263291 } ,
264292 } ) ;
265293 }
@@ -457,10 +485,10 @@ impl<T> Drop for StreamReader<T> {
457485 Handle :: LocalOpen | Handle :: LocalWaiting ( _) => {
458486 entry. insert ( Handle :: LocalClosed ) ;
459487 }
460- Handle :: Read | Handle :: LocalClosed => {
488+ Handle :: Read | Handle :: LocalClosed => unsafe {
461489 entry. remove ( ) ;
462490 ( self . vtable . close_readable ) ( handle) ;
463- }
491+ } ,
464492 Handle :: Write | Handle :: WriteClosedErr ( _) => unreachable ! ( ) ,
465493 } ,
466494 } ) ;
0 commit comments