Skip to content

Commit f7ef92b

Browse files
fix plan caching for client-specific views (#3672)
# Description of Changes Fixes a subscription plan caching bug related to client-specific views. Before this fix, you could define a client-specific view: ```rust fn my_view(ctx: &ViewContext) -> Option<Player> { ctx.db.player().identity().find(ctx.sender) } ``` And subscribe to it as follows: ```sql SELECT * FROM my_view ``` Note this view is implicitly parameterized by `:sender`, however when generating a query hash for this subscription, this fact would not be taken into account which would result in this query being cached and reused for all callers. After this fix, a query hash is generated for this subscription as though it were given as: ```sql SELECT * FROM my_view WHERE identity = :sender ``` # API and ABI breaking changes None **Note for CLI code owners:** I had to touch the `subscribe` cli command file. No updates to the api. It just needed to be updated to look for views in the module def. # Expected complexity level and risk 1 # Testing - [x] Added a regression smoketest
1 parent cb5eb21 commit f7ef92b

File tree

5 files changed

+154
-8
lines changed

5 files changed

+154
-8
lines changed

crates/cli/src/subcommands/subscribe.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,11 @@ fn reformat_update<'a>(
8686
msg.tables
8787
.iter()
8888
.map(|upd| {
89-
let table_schema = schema
90-
.tables
91-
.iter()
92-
.find(|tbl| tbl.name == upd.table_name)
93-
.context("table not found in schema")?;
94-
let table_ty = schema.typespace.resolve(table_schema.product_type_ref);
89+
let table_ty = schema.typespace.resolve(
90+
schema
91+
.type_ref_for_table_like(&upd.table_name)
92+
.context("table not found in schema")?,
93+
);
9594

9695
let reformat_row = |row: &str| -> anyhow::Result<Value> {
9796
// TODO: can the following two calls be merged into a single call to reduce allocations?

crates/lib/src/db/raw_def/v9.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,50 @@ pub struct RawModuleDefV9 {
9797
pub row_level_security: Vec<RawRowLevelSecurityDefV9>,
9898
}
9999

100+
impl RawModuleDefV9 {
101+
/// Find a [`RawTableDefV9`] by name in this raw module def
102+
fn find_table_def(&self, table_name: &str) -> Option<&RawTableDefV9> {
103+
self.tables
104+
.iter()
105+
.find(|table_def| table_def.name.as_ref() == table_name)
106+
}
107+
108+
/// Find a [`RawViewDefV9`] by name in this raw module def
109+
fn find_view_def(&self, view_name: &str) -> Option<&RawViewDefV9> {
110+
self.misc_exports.iter().find_map(|misc_export| match misc_export {
111+
RawMiscModuleExportV9::View(view_def) if view_def.name.as_ref() == view_name => Some(view_def),
112+
_ => None,
113+
})
114+
}
115+
116+
/// Find and return the product type ref for a table in this module def
117+
fn type_ref_for_table(&self, table_name: &str) -> Option<AlgebraicTypeRef> {
118+
self.find_table_def(table_name)
119+
.map(|table_def| table_def.product_type_ref)
120+
}
121+
122+
/// Find and return the product type ref for a view in this module def
123+
fn type_ref_for_view(&self, view_name: &str) -> Option<AlgebraicTypeRef> {
124+
self.find_view_def(view_name)
125+
.map(|view_def| &view_def.return_type)
126+
.and_then(|return_type| {
127+
return_type
128+
.as_option()
129+
.and_then(|inner| inner.clone().into_ref().ok())
130+
.or_else(|| {
131+
return_type
132+
.as_array()
133+
.and_then(|inner| inner.elem_ty.clone().into_ref().ok())
134+
})
135+
})
136+
}
137+
138+
/// Find and return the product type ref for a table or view in this module def
139+
pub fn type_ref_for_table_like(&self, name: &str) -> Option<AlgebraicTypeRef> {
140+
self.type_ref_for_table(name).or_else(|| self.type_ref_for_view(name))
141+
}
142+
}
143+
100144
/// The definition of a database table.
101145
///
102146
/// This struct holds information about the table, including its name, columns, indexes,

crates/physical-plan/src/plan.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,13 @@ impl ProjectPlan {
115115
pub fn returns_view_table(&self) -> bool {
116116
self.return_table().is_some_and(|schema| schema.is_view())
117117
}
118+
119+
/// Does this plan read from an (anonymous) view?
120+
pub fn reads_from_view(&self, anonymous: bool) -> bool {
121+
match self {
122+
Self::None(plan) | Self::Name(plan, ..) => plan.reads_from_view(anonymous),
123+
}
124+
}
118125
}
119126

120127
/// Physical plans always terminate with a projection.
@@ -213,6 +220,15 @@ impl ProjectListPlan {
213220
pub fn returns_view_table(&self) -> bool {
214221
self.return_table().is_some_and(|schema| schema.is_view())
215222
}
223+
224+
/// Does this plan read from an (anonymous) view?
225+
pub fn reads_from_view(&self, anonymous: bool) -> bool {
226+
match self {
227+
Self::Limit(plan, _) => plan.reads_from_view(anonymous),
228+
Self::Name(plans) => plans.iter().any(|plan| plan.reads_from_view(anonymous)),
229+
Self::List(plans, ..) | Self::Agg(plans, ..) => plans.iter().any(|plan| plan.reads_from_view(anonymous)),
230+
}
231+
}
216232
}
217233

218234
/// Query operators return tuples of rows.
@@ -1124,6 +1140,19 @@ impl PhysicalPlan {
11241140
pub fn returns_view_table(&self) -> bool {
11251141
self.return_table().is_some_and(|schema| schema.is_view())
11261142
}
1143+
1144+
/// Does this plan read from an (anonymous) view?
1145+
pub fn reads_from_view(&self, anonymous: bool) -> bool {
1146+
self.any(&|plan| match plan {
1147+
Self::TableScan(scan, _) if anonymous => scan.schema.is_anonymous_view(),
1148+
Self::TableScan(scan, _) => scan.schema.is_view() && !scan.schema.is_anonymous_view(),
1149+
Self::IxScan(scan, _) if anonymous => scan.schema.is_anonymous_view(),
1150+
Self::IxScan(scan, _) => scan.schema.is_view() && !scan.schema.is_anonymous_view(),
1151+
Self::IxJoin(join, _) if anonymous => join.rhs.is_anonymous_view(),
1152+
Self::IxJoin(join, _) => join.rhs.is_view() && !join.rhs.is_anonymous_view(),
1153+
_ => false,
1154+
})
1155+
}
11271156
}
11281157

11291158
/// Scan a table row by row, returning row ids

crates/query/src/lib.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,14 @@ pub fn compile_subscription(
4545
let plan_fragments = resolve_views_for_sub(tx, plan, auth, &mut has_param)?
4646
.into_iter()
4747
.map(compile_select)
48-
.collect();
48+
.collect::<Vec<_>>();
4949

50-
Ok((plan_fragments, return_id, return_name, has_param))
50+
// Does this subscription read from a client-specific view?
51+
// If so, it is as if the view is parameterized by `:sender`.
52+
// We must know this in order to generate the correct query hash.
53+
let reads_view = plan_fragments.iter().any(|plan| plan.reads_from_view(false));
54+
55+
Ok((plan_fragments, return_id, return_name, has_param || reads_view))
5156
}
5257

5358
/// A utility for parsing and type checking a sql statement

smoketests/tests/views.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,3 +416,72 @@ def test_recovery_from_trapped_views_auto_migration(self):
416416
id | level
417417
----+-------
418418
""")
419+
420+
class SubscribeViews(Smoketest):
421+
MODULE_CODE = """
422+
use spacetimedb::{Identity, ReducerContext, Table, ViewContext};
423+
424+
#[spacetimedb::table(name = player_state)]
425+
pub struct PlayerState {
426+
#[primary_key]
427+
identity: Identity,
428+
#[unique]
429+
name: String,
430+
}
431+
432+
#[spacetimedb::view(name = my_player, public)]
433+
pub fn my_player(ctx: &ViewContext) -> Option<PlayerState> {
434+
ctx.db.player_state().identity().find(ctx.sender)
435+
}
436+
437+
#[spacetimedb::reducer]
438+
pub fn insert_player(ctx: &ReducerContext, name: String) {
439+
ctx.db.player_state().insert(PlayerState { name, identity: ctx.sender });
440+
}
441+
"""
442+
443+
def _test_subscribing_with_different_identities(self):
444+
"""Tests different clients subscribing to a client-specific view"""
445+
446+
# Insert an identity for Alice
447+
self.call("insert_player", "Alice")
448+
449+
# Generate and insert a new identity for Bob
450+
self.reset_config()
451+
self.new_identity()
452+
self.call("insert_player", "Bob")
453+
454+
# Subscribe to `my_player` as Bob
455+
sub = self.subscribe("select * from my_player", n=0)
456+
events = sub()
457+
458+
# Project out the identity field.
459+
# TODO: Eventually we should be able to do this directly in the sql.
460+
# But for now we implement it in python.
461+
projection = [
462+
{
463+
'my_player': {
464+
'deletes': [
465+
{'name': row['name']}
466+
for row in event['my_player']['deletes']
467+
],
468+
'inserts': [
469+
{'name': row['name']}
470+
for row in event['my_player']['inserts']
471+
],
472+
}
473+
}
474+
for event in events
475+
]
476+
477+
self.assertEqual(
478+
[
479+
{
480+
'my_player': {
481+
'deletes': [],
482+
'inserts': [{'name': 'Bob'}],
483+
}
484+
},
485+
],
486+
projection,
487+
)

0 commit comments

Comments
 (0)