Skip to content

Commit 08011a7

Browse files
Refactor module instance code (#3605)
# Description of Changes The main thing this does is implement `WasmInstance` for `V8Instance`, so we can now change all the `vm_call*: impl FnOnce()` args on `InstanceCommon` to instead just take `&mut impl WasmInstance`. (Obviously our javascript instance is not a wasm instance, but `wasmtime::module_host_actor` is desperately in need of a rebrand in general, so I just left `WasmInstance`'s as is.) This vastly simplifies the callsites, and also allows @joshua-spacetime to call views from inside of `call_reducer_with_tx`, which was the main motivating factor behind this. If the last commit is a bit much, I'm happy to split it off - I just don't get explicitly asked to refactor the rough edges of the codebase very often :3 # Expected complexity level and risk 2 - it's a refactor, but overall pretty straightforward, and Rust lets you do fearless refactoring etc etc # Testing n/a - internal code change --------- Signed-off-by: Noa <coolreader18@gmail.com> Co-authored-by: joshua-spacetime <josh@clockworklabs.io>
1 parent edac806 commit 08011a7

File tree

3 files changed

+308
-288
lines changed

3 files changed

+308
-288
lines changed

crates/core/src/host/v8/mod.rs

Lines changed: 84 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@ use self::syscall::{
1212
use super::module_common::{build_common_module_from_raw, run_describer, ModuleCommon};
1313
use super::module_host::{CallProcedureParams, CallReducerParams, Module, ModuleInfo, ModuleRuntime};
1414
use super::UpdateDatabaseResult;
15-
use crate::host::instance_env::{ChunkPool, InstanceEnv};
15+
use crate::host::instance_env::{ChunkPool, InstanceEnv, TxSlot};
1616
use crate::host::module_host::{CallViewParams, Instance, ViewCallResult};
1717
use crate::host::v8::error::{ErrorOrException, ExceptionThrown};
1818
use crate::host::wasm_common::instrumentation::CallTimes;
1919
use crate::host::wasm_common::module_host_actor::{
20-
DescribeError, ExecutionStats, ExecutionTimings, InstanceCommon, ReducerExecuteResult, ViewExecuteResult,
20+
AnonymousViewOp, DescribeError, ExecutionError, ExecutionResult, ExecutionStats, ExecutionTimings, InstanceCommon,
21+
InstanceOp, ProcedureExecuteResult, ProcedureOp, ReducerExecuteResult, ReducerOp, ViewExecuteResult, ViewOp,
22+
WasmInstance,
2123
};
2224
use crate::host::wasm_common::{RowIters, TimingSpanSet};
2325
use crate::host::{ReducerCallResult, Scheduler};
@@ -28,7 +30,7 @@ use anyhow::Context as _;
2830
use core::str;
2931
use itertools::Either;
3032
use spacetimedb_client_api_messages::energy::FunctionBudget;
31-
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId, ViewCall};
33+
use spacetimedb_datastore::locking_tx_datastore::{FuncCallType, MutTxId};
3234
use spacetimedb_datastore::traits::Program;
3335
use spacetimedb_lib::{RawModuleDef, Timestamp};
3436
use spacetimedb_schema::auto_migrate::MigrationPolicy;
@@ -439,6 +441,12 @@ fn spawn_instance_worker(
439441
let instance_env = InstanceEnv::new(replica_ctx.clone(), scheduler);
440442
scope.set_slot(JsInstanceEnv::new(instance_env));
441443

444+
let mut inst = V8Instance {
445+
scope,
446+
replica_ctx,
447+
hooks: &hooks,
448+
};
449+
442450
// Process requests to the worker.
443451
//
444452
// The loop is terminated when a `JsInstance` is dropped.
@@ -466,7 +474,7 @@ fn spawn_instance_worker(
466474
// but rather let this happen by `return_instance` using `JsInstance::trapped`
467475
// which will cause `JsInstance` to be dropped,
468476
// which in turn results in the loop being terminated.
469-
let res = call_reducer(&mut instance_common, replica_ctx, scope, &hooks, tx, params);
477+
let res = instance_common.call_reducer_with_tx(tx, params, &mut inst);
470478

471479
// Reply to `JsInstance::call_reducer`.
472480
if let Err(e) = call_reducer_response_tx.send(res) {
@@ -476,7 +484,7 @@ fn spawn_instance_worker(
476484
}
477485
}
478486
JsWorkerRequest::CallView { tx, params } => {
479-
let res = call_view(&mut instance_common, replica_ctx, scope, &hooks, tx, params);
487+
let res = instance_common.call_view_with_tx(tx, params, &mut inst);
480488

481489
if let Err(e) = call_view_response_tx.send(res) {
482490
unreachable!("should have receiver for `call_view` response, {e}");
@@ -603,43 +611,83 @@ fn call_free_fun<'scope>(
603611
fun.call(scope, receiver, args).ok_or_else(exception_already_thrown)
604612
}
605613

606-
#[allow(clippy::too_many_arguments)]
607-
fn common_call<'scope, R>(
614+
struct V8Instance<'a, 'scope, 'isolate> {
615+
scope: &'a mut PinScope<'scope, 'isolate>,
616+
replica_ctx: &'a Arc<ReplicaContext>,
617+
hooks: &'a HookFunctions<'a>,
618+
}
619+
620+
impl WasmInstance for V8Instance<'_, '_, '_> {
621+
fn extract_descriptions(&mut self) -> Result<RawModuleDef, DescribeError> {
622+
extract_description(self.scope, self.hooks, self.replica_ctx)
623+
}
624+
625+
fn replica_ctx(&self) -> &Arc<ReplicaContext> {
626+
self.replica_ctx
627+
}
628+
629+
fn tx_slot(&self) -> TxSlot {
630+
self.scope.get_slot::<JsInstanceEnv>().unwrap().instance_env.tx.clone()
631+
}
632+
633+
fn call_reducer(&mut self, op: ReducerOp<'_>, budget: FunctionBudget) -> ReducerExecuteResult {
634+
let ExecutionResult { stats, call_result } = common_call(self.scope, budget, op, |scope, op| {
635+
Ok(call_call_reducer(scope, self.hooks, op)?)
636+
});
637+
let call_result = call_result.and_then(|res| res.map_err(ExecutionError::User));
638+
ExecutionResult { stats, call_result }
639+
}
640+
641+
fn call_view(&mut self, op: ViewOp<'_>, budget: FunctionBudget) -> ViewExecuteResult {
642+
common_call(self.scope, budget, op, |scope, op| {
643+
call_call_view(scope, self.hooks, op)
644+
})
645+
}
646+
647+
fn call_view_anon(&mut self, op: AnonymousViewOp<'_>, budget: FunctionBudget) -> ViewExecuteResult {
648+
common_call(self.scope, budget, op, |scope, op| {
649+
call_call_view_anon(scope, self.hooks, op)
650+
})
651+
}
652+
653+
fn log_traceback(&self, func_type: &str, func: &str, trap: &anyhow::Error) {
654+
log_traceback(self.replica_ctx, func_type, func, trap)
655+
}
656+
657+
async fn call_procedure(&mut self, _op: ProcedureOp, _budget: FunctionBudget) -> ProcedureExecuteResult {
658+
todo!("JS/TS module procedure support")
659+
}
660+
}
661+
662+
fn common_call<'scope, R, O, F>(
608663
scope: &mut PinScope<'scope, '_>,
609-
tx: MutTxId,
610-
name: &str,
611-
timestamp: Timestamp,
612664
budget: FunctionBudget,
613-
func_type: FuncCallType,
614-
trapped: &mut bool,
615-
call: impl FnOnce(&mut PinScope<'scope, '_>) -> Result<R, ErrorOrException<ExceptionThrown>>,
616-
) -> (MutTxId, ExecutionStats, anyhow::Result<R>) {
665+
op: O,
666+
call: F,
667+
) -> ExecutionResult<Result<R, ExecutionError>>
668+
where
669+
O: InstanceOp,
670+
F: FnOnce(&mut PinScope<'scope, '_>, O) -> Result<R, ErrorOrException<ExceptionThrown>>,
671+
{
617672
// TODO(v8): Start the budget timeout and long-running logger.
618673
let env = env_on_isolate_unwrap(scope);
619-
let mut tx_slot = env.instance_env.tx.clone();
620674

621675
// Start the timer.
622676
// We'd like this tightly around `call`.
623-
env.start_funcall(name, timestamp, func_type);
624-
625-
// Call the function with `tx` provided.
626-
// It should not be available before.
627-
let (tx, call_result) = tx_slot.set(tx, || {
628-
catch_exception(scope, call).map_err(|(e, can_continue)| {
629-
// Convert `can_continue` to whether the isolate has "trapped".
630-
// Also cancel execution termination if needed,
631-
// that can occur due to terminating long running reducers.
632-
*trapped = match can_continue {
633-
CanContinue::No => false,
634-
CanContinue::Yes => true,
635-
CanContinue::YesCancelTermination => {
636-
scope.cancel_terminate_execution();
637-
true
638-
}
639-
};
640-
641-
anyhow::Error::from(e)
642-
})
677+
env.start_funcall(op.name(), op.timestamp(), op.call_type());
678+
679+
let call_result = catch_exception(scope, |scope| call(scope, op)).map_err(|(e, can_continue)| {
680+
// Convert `can_continue` to whether the isolate has "trapped".
681+
// Also cancel execution termination if needed,
682+
// that can occur due to terminating long running reducers.
683+
match can_continue {
684+
CanContinue::No => ExecutionError::Trap(e.into()),
685+
CanContinue::Yes => ExecutionError::Recoverable(e.into()),
686+
CanContinue::YesCancelTermination => {
687+
scope.cancel_terminate_execution();
688+
ExecutionError::Trap(e.into())
689+
}
690+
}
643691
});
644692

645693
// Finish timings.
@@ -657,73 +705,7 @@ fn common_call<'scope, R>(
657705
timings,
658706
memory_allocation,
659707
};
660-
(tx, stats, call_result)
661-
}
662-
663-
fn call_reducer<'scope>(
664-
instance_common: &mut InstanceCommon,
665-
replica_ctx: &ReplicaContext,
666-
scope: &mut PinScope<'scope, '_>,
667-
hooks: &HookFunctions<'_>,
668-
tx: Option<MutTxId>,
669-
params: CallReducerParams,
670-
) -> (super::ReducerCallResult, bool) {
671-
let mut trapped = false;
672-
673-
let (res, _) = instance_common.call_reducer_with_tx(
674-
replica_ctx,
675-
tx,
676-
params,
677-
move |a, b, c| log_traceback(replica_ctx, a, b, c),
678-
|tx, op, budget| {
679-
let func = FuncCallType::Reducer;
680-
let (tx, stats, call_result) =
681-
common_call(scope, tx, op.name, op.timestamp, budget, func, &mut trapped, |scope| {
682-
let res = call_call_reducer(scope, hooks, op)?;
683-
Ok(res)
684-
});
685-
(tx, ReducerExecuteResult { stats, call_result })
686-
},
687-
);
688-
689-
(res, trapped)
690-
}
691-
692-
fn call_view<'scope>(
693-
instance_common: &mut InstanceCommon,
694-
replica_ctx: &ReplicaContext,
695-
scope: &mut PinScope<'scope, '_>,
696-
hooks: &HookFunctions<'_>,
697-
tx: MutTxId,
698-
params: CallViewParams,
699-
) -> (ViewCallResult, bool) {
700-
let mut trapped = false;
701-
702-
let is_anonymous = params.is_anonymous;
703-
let (res, _) = instance_common.call_view_with_tx(
704-
replica_ctx,
705-
tx,
706-
params,
707-
move |a, b, c| log_traceback(replica_ctx, a, b, c),
708-
|tx, op, budget| {
709-
let func = FuncCallType::View(if is_anonymous {
710-
ViewCall::anonymous(op.db_id, op.args.get_bsatn().clone())
711-
} else {
712-
ViewCall::with_identity(*op.caller_identity, op.db_id, op.args.get_bsatn().clone())
713-
});
714-
let (tx, stats, call_result) =
715-
common_call(scope, tx, op.name, op.timestamp, budget, func, &mut trapped, |scope| {
716-
Ok(if is_anonymous {
717-
call_call_view_anon(scope, hooks, op.into())?
718-
} else {
719-
call_call_view(scope, hooks, op)?
720-
})
721-
});
722-
(tx, ViewExecuteResult { stats, call_result })
723-
},
724-
);
725-
726-
(res, trapped)
708+
ExecutionResult { stats, call_result }
727709
}
728710

729711
/// Extracts the raw module def by running the registered `__describe_module__` hook.

0 commit comments

Comments
 (0)