@@ -3,6 +3,15 @@ use alloc::boxed::Box;
33use anyhow:: Result ;
44use bytes:: Bytes ;
55
6+ /// `Pollable::ready()` for `InputStream` and `OutputStream` may return
7+ /// prematurely due to `io::ErrorKind::WouldBlock`.
8+ ///
9+ /// To ensure that `blocking_` functions return a valid non-empty result,
10+ /// we use a loop with a maximum iteration limit.
11+ ///
12+ /// This constant defines the maximum number of loop attempts allowed.
13+ const MAX_BLOCKING_ATTEMPTS : u8 = 10 ;
14+
615/// Host trait for implementing the `wasi:io/streams.input-stream` resource: A
716/// bytestream which can be read from.
817#[ async_trait:: async_trait]
@@ -24,8 +33,24 @@ pub trait InputStream: Pollable {
2433 /// Similar to `read`, except that it blocks until at least one byte can be
2534 /// read.
2635 async fn blocking_read ( & mut self , size : usize ) -> StreamResult < Bytes > {
27- self . ready ( ) . await ;
28- self . read ( size)
36+ if size == 0 {
37+ self . ready ( ) . await ;
38+ return self . read ( size) ;
39+ }
40+
41+ let mut i = 0 ;
42+ loop {
43+ // This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`.
44+ self . ready ( ) . await ;
45+ let data = self . read ( size) ?;
46+ if !data. is_empty ( ) {
47+ return Ok ( data) ;
48+ }
49+ if i >= MAX_BLOCKING_ATTEMPTS {
50+ return Err ( StreamError :: trap ( "max blocking attempts exceeded" ) ) ;
51+ }
52+ i += 1 ;
53+ }
2954 }
3055
3156 /// Same as the `read` method except that bytes are skipped.
@@ -239,8 +264,19 @@ pub trait OutputStream: Pollable {
239264 /// Simultaneously waits for this stream to be writable and then returns how
240265 /// much may be written or the last error that happened.
241266 async fn write_ready ( & mut self ) -> StreamResult < usize > {
242- self . ready ( ) . await ;
243- self . check_write ( )
267+ let mut i = 0 ;
268+ loop {
269+ // This `ready` call may return prematurely due to `io::ErrorKind::WouldBlock`.
270+ self . ready ( ) . await ;
271+ let n = self . check_write ( ) ?;
272+ if n > 0 {
273+ return Ok ( n) ;
274+ }
275+ if i >= MAX_BLOCKING_ATTEMPTS {
276+ return Err ( StreamError :: trap ( "max blocking attempts exceeded" ) ) ;
277+ }
278+ i += 1 ;
279+ }
244280 }
245281
246282 /// Cancel any asynchronous work and wait for it to wrap up.
0 commit comments