@@ -17,7 +17,10 @@ package com.google.firebase.firestore
1717import com.google.android.gms.tasks.Task
1818import com.google.android.gms.tasks.TaskCompletionSource
1919import com.google.firebase.Timestamp
20+ import com.google.firebase.firestore.core.Canonicalizable
21+ import com.google.firebase.firestore.model.Document
2022import com.google.firebase.firestore.model.DocumentKey
23+ import com.google.firebase.firestore.model.MutableDocument
2124import com.google.firebase.firestore.model.Values
2225import com.google.firebase.firestore.pipeline.AddFieldsStage
2326import com.google.firebase.firestore.pipeline.AggregateFunction
@@ -29,6 +32,7 @@ import com.google.firebase.firestore.pipeline.CollectionSource
2932import com.google.firebase.firestore.pipeline.DatabaseSource
3033import com.google.firebase.firestore.pipeline.DistinctStage
3134import com.google.firebase.firestore.pipeline.DocumentsSource
35+ import com.google.firebase.firestore.pipeline.EvaluationContext
3236import com.google.firebase.firestore.pipeline.Expr
3337import com.google.firebase.firestore.pipeline.Expr.Companion.field
3438import com.google.firebase.firestore.pipeline.ExprWithAlias
@@ -51,6 +55,7 @@ import com.google.firebase.firestore.pipeline.Stage
5155import com.google.firebase.firestore.pipeline.UnionStage
5256import com.google.firebase.firestore.pipeline.UnnestStage
5357import com.google.firebase.firestore.pipeline.WhereStage
58+ import com.google.firebase.firestore.util.Assert.fail
5459import com.google.firestore.v1.ExecutePipelineRequest
5560import com.google.firestore.v1.StructuredPipeline
5661import com.google.firestore.v1.Value
@@ -759,7 +764,7 @@ internal constructor(
759764 firestore: FirebaseFirestore ,
760765 userDataReader: UserDataReader ,
761766 stages: List <Stage <* >>
762- ) : AbstractPipeline (firestore, userDataReader, stages) {
767+ ) : AbstractPipeline (firestore, userDataReader, stages), Canonicalizable {
763768 internal constructor (
764769 firestore: FirebaseFirestore ,
765770 userDataReader: UserDataReader ,
@@ -786,31 +791,107 @@ internal constructor(
786791
787792 fun where (condition : BooleanExpr ): RealtimePipeline = append(WhereStage (condition))
788793
789- internal fun rewriteStages (): RealtimePipeline {
794+ internal val rewrittenStages : List < Stage < * >> by lazy {
790795 var hasOrder = false
791- return with (
792- buildList {
793- for (stage in stages) when (stage) {
794- // Stages whose semantics depend on ordering
795- is LimitStage ,
796- is OffsetStage -> {
797- if (! hasOrder) {
798- hasOrder = true
799- add(SortStage .BY_DOCUMENT_ID )
800- }
801- add(stage)
802- }
803- is SortStage -> {
796+ buildList {
797+ for (stage in stages) when (stage) {
798+ // Stages whose semantics depend on ordering
799+ is LimitStage ,
800+ is OffsetStage -> {
801+ if (! hasOrder) {
804802 hasOrder = true
805- add(stage.withStableOrdering() )
803+ add(SortStage . BY_DOCUMENT_ID )
806804 }
807- else -> add(stage)
805+ add(stage)
808806 }
809- if (! hasOrder) {
810- add(SortStage .BY_DOCUMENT_ID )
807+ is SortStage -> {
808+ hasOrder = true
809+ add(stage.withStableOrdering())
811810 }
811+ else -> add(stage)
812812 }
813- )
813+ if (! hasOrder) {
814+ add(SortStage .BY_DOCUMENT_ID )
815+ }
816+ }
817+ }
818+
819+ override fun canonicalId (): String {
820+ return rewrittenStages.joinToString(" |" ) { stage -> (stage as Canonicalizable ).canonicalId() }
821+ }
822+
823+ override fun equals (other : Any? ): Boolean {
824+ if (this == = other) return true
825+ if (other !is RealtimePipeline ) return false
826+ return stages == other.stages
827+ }
828+
829+ override fun hashCode (): Int {
830+ return stages.hashCode()
831+ }
832+
833+ internal fun evaluate (inputs : List <MutableDocument >): List <MutableDocument > {
834+ val context = EvaluationContext (this )
835+ return rewrittenStages.fold(inputs) { documents, stage -> stage.evaluate(context, documents) }
836+ }
837+
838+ internal fun matchesAllDocuments (): Boolean {
839+ for (stage in rewrittenStages) {
840+ // Check for LimitStage
841+ if (stage.name == " limit" ) {
842+ return false
843+ }
844+
845+ // Check for Where stage
846+ if (stage is WhereStage ) {
847+ // Check if it's the special 'exists(__name__)' case
848+ val funcExpr = stage.condition as ? FunctionExpr
849+ if (funcExpr?.name == " exists" && funcExpr.params.size == 1 ) {
850+ val fieldExpr = funcExpr.params[0 ] as ? Field
851+ if (fieldExpr?.fieldPath?.isKeyField == true ) {
852+ continue // This specific 'exists(__name__)' filter doesn't count
853+ }
854+ }
855+ return false
856+ }
857+ // TODO(pipeline) : Add checks for other filtering stages like Aggregate,
858+ // Distinct, FindNearest once they are implemented.
859+ }
860+ return true
861+ }
862+
863+ internal fun hasLimit (): Boolean {
864+ for (stage in rewrittenStages) {
865+ if (stage.name == " limit" ) {
866+ return true
867+ }
868+ // TODO(pipeline): need to check for other stages that could have a limit,
869+ // like findNearest
870+ }
871+ return false
872+ }
873+
874+ internal fun matches (doc : Document ): Boolean {
875+ val result = evaluate(listOf (doc as MutableDocument ))
876+ return result.isNotEmpty()
877+ }
878+
879+ private fun evaluateContext (): EvaluationContext {
880+ return EvaluationContext (this )
881+ }
882+
883+ internal fun comparator (): Comparator <Document > =
884+ getLastEffectiveSortStage().comparator(evaluateContext())
885+
886+ private fun getLastEffectiveSortStage (): SortStage {
887+ for (stage in rewrittenStages.asReversed()) {
888+ if (stage is SortStage ) {
889+ return stage
890+ }
891+ // TODO(pipeline): Consider stages that might invalidate ordering later,
892+ // like fineNearest
893+ }
894+ throw fail(" RealtimePipeline must contain at least one Sort stage (ensured by RewriteStages)." )
814895 }
815896}
816897
0 commit comments