Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/bindings-typescript/src/server/sys.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ declare module 'spacetime:sys@1.0' {

declare module 'spacetime:sys@1.1' {
export type ModuleHooks = {
__call_view__(id: u32, sender: u256, args: Uint8Array): Uint8Array;
__call_view_anon__(id: u32, args: Uint8Array): Uint8Array;
__call_view__(id: u32, sender: u256, args: Uint8Array): Uint8Array | object;
__call_view_anon__(id: u32, args: Uint8Array): Uint8Array | object;
};

export function register_hooks(hooks: ModuleHooks);
Expand Down
2 changes: 2 additions & 0 deletions crates/codegen/examples/regen-typescript-moduledef.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use fs_err as fs;
use regex::Regex;
use spacetimedb_codegen::{generate, typescript, OutputFile};
use spacetimedb_lib::db::raw_def::v9::ViewResultHeader;
use spacetimedb_lib::{RawModuleDef, RawModuleDefV8};
use spacetimedb_schema::def::ModuleDef;
use std::path::Path;
Expand All @@ -22,6 +23,7 @@ macro_rules! regex_replace {
fn main() -> anyhow::Result<()> {
let module = RawModuleDefV8::with_builder(|module| {
module.add_type::<RawModuleDef>();
module.add_type::<ViewResultHeader>();
module.add_type::<spacetimedb_lib::http::Request>();
module.add_type::<spacetimedb_lib::http::Response>();
});
Expand Down
164 changes: 59 additions & 105 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::subscription::ExecutionCounters;
use crate::util::{asyncify, spawn_rayon};
use crate::worker_metrics::WORKER_METRICS;
use anyhow::{anyhow, Context};
use bytes::Bytes;
use enum_map::EnumMap;
use fs2::FileExt;
use spacetimedb_commitlog::repo::OnNewSegmentFn;
Expand Down Expand Up @@ -38,17 +37,14 @@ use spacetimedb_durability as durability;
use spacetimedb_lib::bsatn::ToBsatn;
use spacetimedb_lib::db::auth::StAccess;
use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, RawSql};
use spacetimedb_lib::de::DeserializeSeed;
use spacetimedb_lib::st_var::StVarValue;
use spacetimedb_lib::ConnectionId;
use spacetimedb_lib::Identity;
use spacetimedb_lib::{bsatn, ConnectionId};
use spacetimedb_paths::server::{CommitLogDir, ReplicaDir, SnapshotsPath};
use spacetimedb_primitives::*;
use spacetimedb_sats::algebraic_type::fmt::fmt_algebraic_type;
use spacetimedb_sats::memory_usage::MemoryUsage;
use spacetimedb_sats::{
AlgebraicType, AlgebraicTypeRef, AlgebraicValue, ProductType, ProductValue, Typespace, WithTypespace,
};
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue};
use spacetimedb_schema::def::{ModuleDef, TableDef, ViewDef};
use spacetimedb_schema::schema::{
ColumnSchema, IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema,
Expand Down Expand Up @@ -1593,29 +1589,24 @@ impl RelationalDB {
})
}

/// Write `bytes` into a (sender) view's backing table.
/// Write `rows` into a (sender) view's backing table.
///
/// # Process
/// 1. Delete all rows for `sender` from the view's backing table
/// 2. Deserialize `bytes`
/// 3. Insert the new rows into the backing table
/// 2. Insert the new rows into the backing table
///
/// # Arguments
/// * `tx` - Mutable transaction context
/// * `table_id` - The id of the view's backing table
/// * `sender` - The calling identity of the view being updated
/// * `row_type` - Expected return type of the view
/// * `bytes` - An array of product values (bsatn encoded)
/// * `typespace` - Type information for deserialization
/// * `rows` - Product values to insert
#[allow(clippy::too_many_arguments)]
pub fn materialize_view(
&self,
tx: &mut MutTxId,
table_id: TableId,
sender: Identity,
row_type: AlgebraicTypeRef,
bytes: Bytes,
typespace: &Typespace,
rows: Vec<ProductValue>,
) -> Result<(), DBError> {
// Delete rows for `sender` from the backing table
let rows_to_delete = self
Expand All @@ -1624,91 +1615,69 @@ impl RelationalDB {
.collect::<Vec<_>>();
self.delete(tx, table_id, rows_to_delete);

// Deserialize the return rows.
// The return type is expected to be an array of products.
let row_type = typespace.resolve(row_type);
let ret_type = AlgebraicType::array(row_type.ty().clone());
let seed = WithTypespace::new(typespace, &ret_type);
let rows = seed
.deserialize(bsatn::Deserializer::new(&mut &bytes[..]))
.map_err(|e| DatastoreError::from(ViewError::DeserializeReturn(e.to_string())))?;

// Insert new rows into the backing table
for product in rows
.into_array()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?
.into_iter()
{
let product = product
.into_product()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?;
self.insert(
tx,
table_id,
&ProductValue::from_iter(std::iter::once(sender.into()).chain(product.elements))
.to_bsatn_vec()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?,
)?;
}
self.write_view_rows(tx, table_id, rows, Some(sender))?;

Ok(())
}

/// Write `bytes` into an anonymous view's backing table.
/// Write `rows` into an anonymous view's backing table.
///
/// # Process
/// 1. Clear the view's backing table
/// 2. Deserialize `bytes`
/// 3. Insert the new rows into the backing table
/// 2. Insert the new rows into the backing table
///
/// # Arguments
/// * `tx` - Mutable transaction context
/// * `table_id` - The id of the view's backing table
/// * `row_type` - Expected return type of the view
/// * `bytes` - An array of product values (bsatn encoded)
/// * `typespace` - Type information for deserialization
/// * `rows` - Product values to insert
#[allow(clippy::too_many_arguments)]
pub fn materialize_anonymous_view(
&self,
tx: &mut MutTxId,
table_id: TableId,
row_type: AlgebraicTypeRef,
bytes: Bytes,
typespace: &Typespace,
rows: Vec<ProductValue>,
) -> Result<(), DBError> {
// Clear entire backing table
self.clear_table(tx, table_id)?;

// Deserialize the return rows.
// The return type is expected to be an array of products.
let row_type = typespace.resolve(row_type);
let ret_type = AlgebraicType::array(row_type.ty().clone());
let seed = WithTypespace::new(typespace, &ret_type);
let rows = seed
.deserialize(bsatn::Deserializer::new(&mut &bytes[..]))
.map_err(|e| DatastoreError::from(ViewError::DeserializeReturn(e.to_string())))?;

// Insert new rows into the backing table
for product in rows
.into_array()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?
.into_iter()
{
self.insert(
tx,
table_id,
&product
.into_product()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?
.to_bsatn_vec()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?,
)?;
self.write_view_rows(tx, table_id, rows, None)?;

Ok(())
}

fn write_view_rows(
&self,
tx: &mut MutTxId,
table_id: TableId,
rows: Vec<ProductValue>,
sender: Option<Identity>,
) -> Result<(), DBError> {
match sender {
Some(sender) => {
for product in rows {
let value = ProductValue::from_iter(std::iter::once(sender.into()).chain(product.elements));
self.insert(
tx,
table_id,
&value
.to_bsatn_vec()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?,
)?;
}
}
None => {
for product in rows {
self.insert(
tx,
table_id,
&product
.to_bsatn_vec()
.map_err(|_| ViewError::SerializeRow)
.map_err(DatastoreError::from)?,
)?;
}
}
}

Ok(())
Expand Down Expand Up @@ -2431,6 +2400,7 @@ mod tests {
begin_tx, insert, make_snapshot, with_auto_commit, with_read_only, TestDB,
};
use anyhow::bail;
use bytes::Bytes;
use commitlog::payload::txdata;
use commitlog::Commitlog;
use durability::EmptyHistory;
Expand Down Expand Up @@ -2592,24 +2562,12 @@ mod tests {
Ok((view_id, table_id, module_def.clone(), view_def.clone()))
}

fn insert_view_row(
stdb: &TestDB,
view_id: ViewId,
table_id: TableId,
typespace: &Typespace,
row_type: AlgebraicTypeRef,
sender: Identity,
v: u8,
) -> ResultTest<()> {
let to_bsatn = |pv: &ProductValue| {
Bytes::from(bsatn::to_vec(&AlgebraicValue::Array([pv.clone()].into())).expect("bstan serialization failed"))
};

fn insert_view_row(stdb: &TestDB, view_id: ViewId, table_id: TableId, sender: Identity, v: u8) -> ResultTest<()> {
let row_pv = |v: u8| product![v];

let mut tx = begin_mut_tx(stdb);
tx.subscribe_view(view_id, ArgId::SENTINEL, sender)?;
stdb.materialize_view(&mut tx, table_id, sender, row_type, to_bsatn(&row_pv(v)), typespace)?;
stdb.materialize_view(&mut tx, table_id, sender, vec![row_pv(v)])?;
stdb.commit_tx(tx)?;

Ok(())
Expand All @@ -2633,13 +2591,11 @@ mod tests {
fn test_view_tables_are_ephemeral() -> ResultTest<()> {
let stdb = TestDB::durable()?;

let (view_id, table_id, module_def, view_def) = setup_view(&stdb)?;
let row_type = view_def.product_type_ref;
let typespace = module_def.typespace();
let (view_id, table_id, _, _) = setup_view(&stdb)?;

// Write some rows (reusing the same helper)
insert_view_row(&stdb, view_id, table_id, typespace, row_type, Identity::ONE, 10)?;
insert_view_row(&stdb, view_id, table_id, typespace, row_type, Identity::ZERO, 20)?;
insert_view_row(&stdb, view_id, table_id, Identity::ONE, 10)?;
insert_view_row(&stdb, view_id, table_id, Identity::ZERO, 20)?;

assert!(
!project_views(&stdb, table_id, Identity::ZERO).is_empty(),
Expand Down Expand Up @@ -2668,14 +2624,12 @@ mod tests {
fn test_views() -> ResultTest<()> {
let stdb = TestDB::durable()?;

let (view_id, table_id, module_def, view_def) = setup_view(&stdb)?;
let row_type = view_def.product_type_ref;
let typespace = module_def.typespace();
let (view_id, table_id, _, _) = setup_view(&stdb)?;

let sender1 = Identity::ONE;

// Sender 1 insert
insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender1, 42)?;
insert_view_row(&stdb, view_id, table_id, sender1, 42)?;

assert_eq!(
project_views(&stdb, table_id, sender1)[0],
Expand All @@ -2686,7 +2640,7 @@ mod tests {
// Sender 2 insert
let sender2 = Identity::ZERO;
let before_sender2 = Instant::now();
insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender2, 84)?;
insert_view_row(&stdb, view_id, table_id, sender2, 84)?;

assert_eq!(
project_views(&stdb, table_id, sender2)[0],
Expand All @@ -2712,7 +2666,7 @@ mod tests {
stdb.commit_tx(tx)?;

// Reinsert after restart
insert_view_row(&stdb, view_id, table_id, typespace, row_type, sender2, 91)?;
insert_view_row(&stdb, view_id, table_id, sender2, 91)?;
assert_eq!(
project_views(&stdb, table_id, sender2)[0],
product![91u8],
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/host/v8/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ fn common_call<'scope, R, O, F>(
budget: FunctionBudget,
op: O,
call: F,
) -> ExecutionResult<Result<R, ExecutionError>>
) -> ExecutionResult<R, ExecutionError>
where
O: InstanceOp,
F: FnOnce(&mut PinScope<'scope, '_>, O) -> Result<R, ErrorOrException<ExceptionThrown>>,
Expand Down
8 changes: 5 additions & 3 deletions crates/core/src/host/v8/syscall/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use v8::{callback_scope, Context, FixedArray, Local, Module, PinScope};
use crate::host::v8::de::scratch_buf;
use crate::host::v8::error::{ErrorOrException, ExcResult, ExceptionThrown, Throwable, TypeError};
use crate::host::wasm_common::abi::parse_abi_version;
use crate::host::wasm_common::module_host_actor::{AnonymousViewOp, ProcedureOp, ReducerOp, ReducerResult, ViewOp};
use crate::host::wasm_common::module_host_actor::{
AnonymousViewOp, ProcedureOp, ReducerOp, ReducerResult, ViewOp, ViewReturnData,
};

mod hooks;
mod v1;
Expand Down Expand Up @@ -85,7 +87,7 @@ pub(super) fn call_call_view(
scope: &mut PinScope<'_, '_>,
hooks: &HookFunctions<'_>,
op: ViewOp<'_>,
) -> Result<Bytes, ErrorOrException<ExceptionThrown>> {
) -> Result<ViewReturnData, ErrorOrException<ExceptionThrown>> {
match hooks.abi {
AbiVersion::V1 => v1::call_call_view(scope, hooks, op),
}
Expand All @@ -98,7 +100,7 @@ pub(super) fn call_call_view_anon(
scope: &mut PinScope<'_, '_>,
hooks: &HookFunctions<'_>,
op: AnonymousViewOp<'_>,
) -> Result<Bytes, ErrorOrException<ExceptionThrown>> {
) -> Result<ViewReturnData, ErrorOrException<ExceptionThrown>> {
match hooks.abi {
AbiVersion::V1 => v1::call_call_view_anon(scope, hooks, op),
}
Expand Down
Loading
Loading