Skip to content

Commit 0e724fb

Browse files
committed
add more stream/future/resource tests
This addresses some gaps in the test coverage and fixes the issued uncovered (e.g. not handling drops of exported resources and not restoring payload ownership after a failed `future.write`). Signed-off-by: Joel Dice <joel.dice@fermyon.com>
1 parent 03b9635 commit 0e724fb

File tree

8 files changed

+326
-15
lines changed

8 files changed

+326
-15
lines changed

bundled/componentize_py_async_support/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ class _FutureState:
2626
handles: list[asyncio.Handle]
2727
pending_count: int
2828

29+
class _ReturnCode:
30+
COMPLETED = 0
31+
DROPPED = 1
32+
CANCELLED = 2
33+
2934
class _CallbackCode:
3035
EXIT = 0
3136
YIELD = 1

bundled/componentize_py_async_support/futures.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from typing import TypeVar, Generic, cast, Self, Any, Callable
66
from types import TracebackType
7+
from componentize_py_async_support import _ReturnCode
78

89
T = TypeVar('T')
910

@@ -13,7 +14,7 @@ def __init__(self, type_: int, handle: int):
1314
self.handle: int | None = handle
1415
self.finalizer = weakref.finalize(self, componentize_py_runtime.future_drop_readable, type_, handle)
1516

16-
async def read(self) -> T | None:
17+
async def read(self) -> T:
1718
self.finalizer.detach()
1819
handle = self.handle
1920
self.handle = None
@@ -57,15 +58,25 @@ def __init__(self, type_: int, handle: int, default: Callable[[], T]):
5758
self.default = default
5859
self.finalizer = weakref.finalize(self, write_default, type_, handle, default)
5960

60-
async def write(self, value: T) -> None:
61+
async def write(self, value: T) -> bool:
6162
self.finalizer.detach()
6263
handle = self.handle
6364
self.handle = None
6465
if handle is not None:
65-
await componentize_py_async_support.await_result(
66+
code, _ = await componentize_py_async_support.await_result(
6667
componentize_py_runtime.future_write(self.type_, handle, value)
6768
)
6869
componentize_py_runtime.future_drop_writable(self.type_, handle)
70+
match code:
71+
case _ReturnCode.COMPLETED:
72+
return True
73+
case _ReturnCode.DROPPED:
74+
return False
75+
case _ReturnCode.CANCELLED:
76+
# todo
77+
raise NotImplementedError
78+
case _:
79+
raise AssertionError
6980
else:
7081
raise AssertionError
7182

bundled/componentize_py_async_support/streams.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,7 @@
44

55
from typing import TypeVar, Generic, Self, cast
66
from types import TracebackType
7-
8-
class _ReturnCode:
9-
COMPLETED = 0
10-
DROPPED = 1
11-
CANCELLED = 2
7+
from componentize_py_async_support import _ReturnCode
128

139
class ByteStreamReader:
1410
def __init__(self, type_: int, handle: int):

bundled/componentize_py_runtime.pyi

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ def promise_get_result(event: int, promise: int) -> Any: ...
1919

2020
def future_read(ty: int, future: int) -> Result[Any, tuple[int, int]]: ...
2121

22-
def future_write(ty: int, future: int, value: Any) -> Result[None, tuple[int, int]]: ...
22+
def future_write(ty: int, future: int, value: Any) -> Result[tuple[int, int], tuple[int, int]]: ...
2323

2424
def future_drop_readable(ty: int, future: int) -> None: ...
2525

runtime/src/lib.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,7 @@ mod async_ {
310310
},
311311
FutureWrite {
312312
_call: MyCall<'static>,
313+
resources: Option<Vec<EmptyResource>>,
313314
},
314315
}
315316

@@ -391,9 +392,16 @@ mod async_ {
391392
}
392393
call.stack.pop().unwrap_or(py.None()).into_bound(py)
393394
}
394-
Promise::FutureWrite { .. } => {
395+
Promise::FutureWrite { ref resources, .. } => {
395396
let count = event >> 4;
396397
let code = event & 0xF;
398+
399+
if let (RETURN_CODE_DROPPED, Some(resources)) = (code, resources) {
400+
for resource in resources {
401+
resource.restore(py)
402+
}
403+
}
404+
397405
PyTuple::new(py, [code, count]).unwrap().into_any()
398406
}
399407
}
@@ -804,11 +812,16 @@ mod async_ {
804812
} else {
805813
unsafe { call.defer_deallocate(buffer, layout) };
806814

815+
call.resources = Some(Vec::new());
807816
let code = unsafe {
808817
ty.lower(&mut call, buffer);
809818

810819
ty.write()(handle, buffer.cast())
811820
};
821+
let resources = call
822+
.resources
823+
.take()
824+
.and_then(|v| if v.is_empty() { None } else { Some(v) });
812825

813826
Ok(if code == RETURN_CODE_BLOCKED {
814827
ERR_CONSTRUCTOR
@@ -822,6 +835,7 @@ mod async_ {
822835
usize::try_from(handle).unwrap(),
823836
Box::into_raw(Box::new(async_::Promise::FutureWrite {
824837
_call: call,
838+
resources,
825839
})) as usize,
826840
],
827841
)
@@ -831,6 +845,13 @@ mod async_ {
831845
} else {
832846
let count = code >> 4;
833847
let code = code & 0xF;
848+
849+
if let (RETURN_CODE_DROPPED, Some(resources)) = (code, &resources) {
850+
for resource in resources {
851+
resource.restore(py)
852+
}
853+
}
854+
834855
OK_CONSTRUCTOR
835856
.get()
836857
.unwrap()
@@ -1330,8 +1351,10 @@ impl Interpreter for MyInterpreter {
13301351
}
13311352

13321353
fn resource_dtor(ty: wit::Resource, handle: usize) {
1354+
// We don't currently include a `drop` function as part of the abstract
1355+
// base class we generate for an exported resource, so there's nothing
1356+
// to do here. If/when that changes, we'll want to call `drop` here.
13331357
_ = (ty, handle);
1334-
todo!()
13351358
}
13361359
}
13371360

src/test/python_source/app.py

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@
55
import resource_alias1
66
import resource_borrow_in_record
77
import componentize_py_async_support
8+
import streams_and_futures as my_streams_and_futures
89

910
from componentize_py_types import Result, Ok, Err
1011
from tests import exports, imports
1112
from tests.imports import resource_borrow_import
1213
from tests.imports import simple_import_and_export
1314
from tests.imports import simple_async_import_and_export
15+
from tests.imports import host_thing_interface
1416
from tests.exports import resource_alias2
1517
from tests.exports import streams_and_futures
1618
from typing import Tuple, List, Optional
@@ -147,6 +149,42 @@ async def pipe_things(rx: StreamReader[streams_and_futures.Thing], tx: StreamWri
147149
# Write the things all at once. The host will read them only one at a time,
148150
# forcing us to re-take ownership of any unwritten items between writes.
149151
await tx.write_all(things)
152+
153+
async def pipe_host_things(rx: StreamReader[host_thing_interface.HostThing], tx: StreamWriter[host_thing_interface.HostThing]):
154+
# Read the things one at a time, forcing the host to re-take ownership of
155+
# any unwritten items between writes.
156+
things = []
157+
while not rx.writer_dropped:
158+
things += await rx.read(1)
159+
160+
# Write the things all at once. The host will read them only one at a time,
161+
# forcing us to re-take ownership of any unwritten items between writes.
162+
await tx.write_all(things)
163+
164+
async def write_thing(thing: my_streams_and_futures.Thing,
165+
tx1: FutureWriter[streams_and_futures.Thing],
166+
tx2: FutureWriter[streams_and_futures.Thing]):
167+
# The host will drop the first reader without reading, which should give us
168+
# back ownership of `thing`.
169+
wrote = await tx1.write(thing)
170+
assert not wrote
171+
# The host will read from the second reader, though.
172+
wrote = await tx2.write(thing)
173+
assert wrote
174+
175+
async def write_host_thing(thing: host_thing_interface.HostThing,
176+
tx1: FutureWriter[host_thing_interface.HostThing],
177+
tx2: FutureWriter[host_thing_interface.HostThing]):
178+
# The host will drop the first reader without reading, which should give us
179+
# back ownership of `thing`.
180+
wrote = await tx1.write(thing)
181+
assert not wrote
182+
# The host will read from the second reader, though.
183+
wrote = await tx2.write(thing)
184+
assert wrote
185+
186+
def unreachable() -> str:
187+
raise AssertionError
150188

151189
class StreamsAndFutures(exports.StreamsAndFutures):
152190
async def echo_stream_u8(self, stream: ByteStreamReader) -> ByteStreamReader:
@@ -155,8 +193,6 @@ async def echo_stream_u8(self, stream: ByteStreamReader) -> ByteStreamReader:
155193
return rx
156194

157195
async def echo_future_string(self, future: FutureReader[str]) -> FutureReader[str]:
158-
def unreachable() -> str:
159-
raise AssertionError
160196
tx, rx = tests.string_future(unreachable)
161197
componentize_py_async_support.spawn(pipe_strings(future, tx))
162198
return rx
@@ -166,6 +202,23 @@ async def short_reads(self, stream: StreamReader[streams_and_futures.Thing]) ->
166202
componentize_py_async_support.spawn(pipe_things(stream, tx))
167203
return rx
168204

205+
async def short_reads_host(self, stream: StreamReader[host_thing_interface.HostThing]) -> StreamReader[host_thing_interface.HostThing]:
206+
tx, rx = tests.host_thing_interface_host_thing_stream()
207+
componentize_py_async_support.spawn(pipe_host_things(stream, tx))
208+
return rx
209+
210+
async def dropped_future_reader(self, value: str) -> tuple[FutureReader[streams_and_futures.Thing], FutureReader[streams_and_futures.Thing]]:
211+
tx1, rx1 = tests.streams_and_futures_thing_future(unreachable)
212+
tx2, rx2 = tests.streams_and_futures_thing_future(unreachable)
213+
componentize_py_async_support.spawn(write_thing(my_streams_and_futures.Thing(value), tx1, tx2))
214+
return (rx1, rx2)
215+
216+
async def dropped_future_reader_host(self, value: str) -> tuple[FutureReader[host_thing_interface.HostThing], FutureReader[host_thing_interface.HostThing]]:
217+
tx1, rx1 = tests.host_thing_interface_host_thing_future(unreachable)
218+
tx2, rx2 = tests.host_thing_interface_host_thing_future(unreachable)
219+
componentize_py_async_support.spawn(write_host_thing(host_thing_interface.HostThing(value), tx1, tx2))
220+
return (rx1, rx2)
221+
169222
class Tests(tests.Tests):
170223
def test_resource_borrow_import(self, v: int) -> int:
171224
return resource_borrow_import.foo(resource_borrow_import.Thing(v + 1)) + 4

0 commit comments

Comments
 (0)