Skip to content

Commit ba9aa24

Browse files
committed
[5/4] Listen Options and Server Timestamp
1 parent df9419a commit ba9aa24

File tree

10 files changed

+543
-31
lines changed

10 files changed

+543
-31
lines changed

packages/firestore/src/api/pipeline_impl.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,11 @@ import { ensureFirestoreConfigured, Firestore } from './database';
5151
import { Pipeline } from './pipeline'; // Keep this specific Pipeline import if needed alongside LitePipeline
5252
import { RealtimePipeline } from './realtime_pipeline';
5353
import { DocumentReference } from './reference';
54-
import { SnapshotListenOptions, Unsubscribe } from './reference_impl';
54+
import {
55+
PipelineListenOptions,
56+
SnapshotListenOptions,
57+
Unsubscribe
58+
} from './reference_impl';
5559
import { RealtimePipelineSnapshot } from './snapshot';
5660
import { ExpUserDataWriter } from './user_data_writer';
5761

@@ -234,7 +238,7 @@ export function _onRealtimePipelineSnapshot(
234238
*/
235239
export function _onRealtimePipelineSnapshot(
236240
pipeline: RealtimePipeline,
237-
options: SnapshotListenOptions,
241+
options: PipelineListenOptions,
238242
observer: {
239243
next?: (snapshot: RealtimePipelineSnapshot) => void;
240244
error?: (error: FirestoreError) => void;
@@ -257,7 +261,7 @@ export function _onRealtimePipelineSnapshot(
257261
*/
258262
export function _onRealtimePipelineSnapshot(
259263
pipeline: RealtimePipeline,
260-
options: SnapshotListenOptions,
264+
options: PipelineListenOptions,
261265
onNext: (snapshot: RealtimePipelineSnapshot) => void,
262266
onError?: (error: FirestoreError) => void,
263267
onComplete?: () => void
@@ -266,9 +270,10 @@ export function _onRealtimePipelineSnapshot(
266270
pipeline: RealtimePipeline,
267271
...args: unknown[]
268272
): Unsubscribe {
269-
let options: SnapshotListenOptions = {
273+
let options: PipelineListenOptions = {
270274
includeMetadataChanges: false,
271-
source: 'default'
275+
source: 'default',
276+
serverTimestampBehavior: 'none'
272277
};
273278
let currArg = 0;
274279
if (typeof args[currArg] === 'object' && !isPartialObserver(args[currArg])) {
@@ -278,7 +283,8 @@ export function _onRealtimePipelineSnapshot(
278283

279284
const internalOptions = {
280285
includeMetadataChanges: options.includeMetadataChanges,
281-
source: options.source as ListenerDataSource
286+
source: options.source as ListenerDataSource,
287+
serverTimestampBehavior: options.serverTimestampBehavior
282288
};
283289

284290
let userObserver: PartialObserver<RealtimePipelineSnapshot>;
@@ -296,7 +302,9 @@ export function _onRealtimePipelineSnapshot(
296302
const observer = {
297303
next: (snapshot: ViewSnapshot) => {
298304
if (userObserver.next) {
299-
userObserver.next(new RealtimePipelineSnapshot(pipeline, snapshot));
305+
userObserver.next(
306+
new RealtimePipelineSnapshot(pipeline, snapshot, internalOptions)
307+
);
300308
}
301309
},
302310
error: userObserver.error,
@@ -305,7 +313,7 @@ export function _onRealtimePipelineSnapshot(
305313

306314
return firestoreClientListen(
307315
client,
308-
toCorePipeline(pipeline),
316+
toCorePipeline(pipeline, internalOptions),
309317
internalOptions, // Pass parsed options here
310318
observer
311319
);

packages/firestore/src/api/reference_impl.ts

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1070,6 +1070,21 @@ export function onSnapshotResume<
10701070
}
10711071
}
10721072

1073+
export interface PipelineListenOptions {
1074+
/**
1075+
* Include a change even if only the metadata of the query or of a document
1076+
* changed. Default is false.
1077+
*/
1078+
readonly includeMetadataChanges?: boolean;
1079+
1080+
/**
1081+
* Set the source the query listens to. Default to "default", which
1082+
* listens to both cache and server.
1083+
*/
1084+
readonly source?: ListenSource;
1085+
readonly serverTimestampBehavior?: 'estimate' | 'previous' | 'none';
1086+
}
1087+
10731088
export function onPipelineSnapshot(
10741089
query: RealtimePipeline,
10751090
observer: {
@@ -1080,7 +1095,7 @@ export function onPipelineSnapshot(
10801095
): Unsubscribe;
10811096
export function onPipelineSnapshot(
10821097
query: RealtimePipeline,
1083-
options: SnapshotListenOptions,
1098+
options: PipelineListenOptions,
10841099
observer: {
10851100
next?: (snapshot: RealtimePipelineSnapshot) => void;
10861101
error?: (error: FirestoreError) => void;
@@ -1095,7 +1110,7 @@ export function onPipelineSnapshot(
10951110
): Unsubscribe;
10961111
export function onPipelineSnapshot(
10971112
query: RealtimePipeline,
1098-
options: SnapshotListenOptions,
1113+
options: PipelineListenOptions,
10991114
onNext: (snapshot: RealtimePipelineSnapshot) => void,
11001115
onError?: (error: FirestoreError) => void,
11011116
onCompletion?: () => void
@@ -1106,9 +1121,10 @@ export function onPipelineSnapshot(
11061121
): Unsubscribe {
11071122
reference = getModularInstance(reference);
11081123

1109-
let options: SnapshotListenOptions = {
1124+
let options: PipelineListenOptions = {
11101125
includeMetadataChanges: false,
1111-
source: 'default'
1126+
source: 'default',
1127+
serverTimestampBehavior: 'none'
11121128
};
11131129
let currArg = 0;
11141130
if (typeof args[currArg] === 'object' && !isPartialObserver(args[currArg])) {
@@ -1118,7 +1134,8 @@ export function onPipelineSnapshot(
11181134

11191135
const internalOptions = {
11201136
includeMetadataChanges: options.includeMetadataChanges,
1121-
source: options.source as ListenerDataSource
1137+
source: options.source as ListenerDataSource,
1138+
serverTimestampBehavior: options.serverTimestampBehavior
11221139
};
11231140

11241141
if (isPartialObserver(args[currArg])) {
@@ -1136,12 +1153,16 @@ export function onPipelineSnapshot(
11361153

11371154
// RealtimePipeline
11381155
firestore = cast(reference._db, Firestore);
1139-
internalQuery = toCorePipeline(reference);
1156+
internalQuery = toCorePipeline(reference, internalOptions);
11401157
observer = {
11411158
next: snapshot => {
11421159
if (args[currArg]) {
11431160
(args[currArg] as NextFn<RealtimePipelineSnapshot>)(
1144-
new RealtimePipelineSnapshot(reference as RealtimePipeline, snapshot)
1161+
new RealtimePipelineSnapshot(
1162+
reference as RealtimePipeline,
1163+
snapshot,
1164+
internalOptions
1165+
)
11451166
);
11461167
}
11471168
},

packages/firestore/src/api/snapshot.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import { BundleLoader } from '../core/bundle_impl';
1919
import { createBundleReaderSync } from '../core/firestore_client';
20+
import { ListenOptions } from '../core/event_manager';
2021
import { CorePipeline } from '../core/pipeline';
2122
import { isPipeline } from '../core/pipeline-util';
2223
import { newPipelineComparator } from '../core/pipeline_run';
@@ -1175,7 +1176,8 @@ export function resultChangesFromSnapshot(
11751176
new SnapshotMetadata(
11761177
querySnapshot._snapshot.mutatedKeys.has(change.doc.key),
11771178
querySnapshot._snapshot.fromCache
1178-
)
1179+
),
1180+
querySnapshot._listenOptions
11791181
);
11801182
lastDoc = change.doc;
11811183
return {
@@ -1205,7 +1207,8 @@ export function resultChangesFromSnapshot(
12051207
new SnapshotMetadata(
12061208
querySnapshot._snapshot.mutatedKeys.has(change.doc.key),
12071209
querySnapshot._snapshot.fromCache
1208-
)
1210+
),
1211+
querySnapshot._listenOptions
12091212
);
12101213
let oldIndex = -1;
12111214
let newIndex = -1;
@@ -1245,7 +1248,11 @@ export class RealtimePipelineSnapshot {
12451248
private _cachedChangesIncludeMetadataChanges?: boolean;
12461249

12471250
/** @hideconstructor */
1248-
constructor(pipeline: RealtimePipeline, readonly _snapshot: ViewSnapshot) {
1251+
constructor(
1252+
pipeline: RealtimePipeline,
1253+
readonly _snapshot: ViewSnapshot,
1254+
readonly _listenOptions: ListenOptions
1255+
) {
12491256
this.metadata = new SnapshotMetadata(
12501257
_snapshot.hasPendingWrites,
12511258
_snapshot.fromCache
@@ -1257,7 +1264,7 @@ export class RealtimePipelineSnapshot {
12571264
get results(): PipelineResult[] {
12581265
const result: PipelineResult[] = [];
12591266
this._snapshot.docs.forEach(doc =>
1260-
result.push(toPipelineResult(doc, this.pipeline))
1267+
result.push(toPipelineResult(doc, this.pipeline, this._listenOptions))
12611268
);
12621269
return result;
12631270
}

packages/firestore/src/core/event_manager.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,9 @@ export interface ListenOptions {
408408

409409
/** Set the source events raised from. */
410410
readonly source?: ListenerDataSource;
411+
412+
/** Realtime Pipeline Only: Set server timestamp behavior. */
413+
readonly serverTimestampBehavior?: 'estimate' | 'previous' | 'none';
411414
}
412415

413416
/**

packages/firestore/src/core/expressions.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ import { objectSize } from '../util/obj';
5858
import { isNegativeZero } from '../util/types';
5959

6060
import { EvaluationContext, PipelineInputOutput } from './pipeline_run';
61+
import {
62+
getLocalWriteTime,
63+
getPreviousValue,
64+
isServerTimestamp
65+
} from '../model/server_timestamps';
66+
import { SnapshotVersion } from './snapshot_version';
6167

6268
export type EvaluateResultType =
6369
| 'ERROR'
@@ -312,7 +318,34 @@ export class CoreField implements EvaluableExpr {
312318
}
313319
// Return 'UNSET' if the field doesn't exist, otherwise the Value.
314320
const result = input.data.field(this.expr.fieldPath);
321+
322+
function getServerTimestampValue(
323+
context: EvaluationContext,
324+
value: Value
325+
): Value {
326+
if (context.serverTimestampBehavior === 'estimate') {
327+
return {
328+
timestampValue: toVersion(
329+
context.serializer,
330+
SnapshotVersion.fromTimestamp(getLocalWriteTime(value))
331+
)
332+
};
333+
} else if (context.serverTimestampBehavior === 'previous') {
334+
const previousValue = getPreviousValue(value);
335+
if (previousValue) {
336+
return previousValue;
337+
}
338+
}
339+
return { nullValue: 'NULL_VALUE' };
340+
}
341+
315342
if (!!result) {
343+
if (isServerTimestamp(result)) {
344+
return EvaluateResult.newValue(
345+
getServerTimestampValue(context, result)
346+
);
347+
}
348+
316349
return EvaluateResult.newValue(result);
317350
} else {
318351
return EvaluateResult.newUnset();

packages/firestore/src/core/pipeline-util.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ import {
9393
targetIsPipelineTarget
9494
} from './target';
9595
import { VectorValue } from '../api';
96+
import { ListenOptions } from './event_manager';
9697

9798
/* eslint @typescript-eslint/no-explicit-any: 0 */
9899

@@ -559,7 +560,12 @@ function addSystemFields(
559560
}
560561

561562
export function toCorePipeline(
562-
p: ApiPipeline | RealtimePipeline
563+
p: ApiPipeline | RealtimePipeline,
564+
listenOptions?: ListenOptions
563565
): CorePipeline {
564-
return new CorePipeline(p.userDataReader.serializer, rewriteStages(p.stages));
566+
return new CorePipeline(
567+
p.userDataReader.serializer,
568+
rewriteStages(p.stages),
569+
listenOptions
570+
);
565571
}

packages/firestore/src/core/pipeline.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,14 @@ import { JsonProtoSerializer } from '../remote/serializer';
2828
import { debugAssert } from '../util/assert';
2929

3030
import { PipelineFlavor, PipelineSourceType } from './pipeline-util';
31+
import { ListenOptions } from './event_manager';
3132

3233
export class CorePipeline {
3334
isCorePipeline = true;
3435
constructor(
3536
readonly serializer: JsonProtoSerializer,
36-
readonly stages: Stage[]
37+
readonly stages: Stage[],
38+
readonly listenOptions?: ListenOptions
3739
) {}
3840
getPipelineCollection(): string | undefined {
3941
return getPipelineCollection(this);

packages/firestore/src/core/pipeline_run.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ export type PipelineInputOutput = MutableDocument;
4545

4646
export interface EvaluationContext {
4747
serializer: JsonProtoSerializer;
48+
serverTimestampBehavior?: 'estimate' | 'previous' | 'none';
4849
}
4950

5051
export function runPipeline(
@@ -53,7 +54,14 @@ export function runPipeline(
5354
): PipelineInputOutput[] {
5455
let current = input;
5556
for (const stage of pipeline.stages) {
56-
current = evaluate({ serializer: pipeline.serializer }, stage, current);
57+
current = evaluate(
58+
{
59+
serializer: pipeline.serializer,
60+
serverTimestampBehavior: pipeline.listenOptions?.serverTimestampBehavior
61+
},
62+
stage,
63+
current
64+
);
5765
}
5866

5967
return current;

0 commit comments

Comments
 (0)