From afaa3a3ebd9be2610f6f4c8ccb0420b3db43ff7b Mon Sep 17 00:00:00 2001 From: MrAnayDongre Date: Fri, 24 Oct 2025 11:47:59 -0700 Subject: [PATCH 1/3] feat(enum): add Enum type (engine+python) --- docs/docs/core/data_types.mdx | 4 + .../examples/docs_to_knowledge_graph.md | 2 +- docs/docs/sources/index.md | 4 +- docs/docs/targets/index.md | 3 - docs/docs/targets/kuzu.md | 4 +- docs/docs/targets/neo4j.md | 4 +- examples/product_recommendation/README.md | 2 +- python/cocoindex/typing.py | 16 +++ src/base/json_schema.rs | 104 ++++++++++++++++-- src/base/schema.rs | 4 + src/base/value.rs | 2 + src/ops/targets/kuzu.rs | 1 + src/ops/targets/postgres.rs | 1 + src/py/convert.rs | 1 + 14 files changed, 132 insertions(+), 20 deletions(-) diff --git a/docs/docs/core/data_types.mdx b/docs/docs/core/data_types.mdx index d3209b88..daa30f07 100644 --- a/docs/docs/core/data_types.mdx +++ b/docs/docs/core/data_types.mdx @@ -46,6 +46,7 @@ This is the list of all primitive types supported by CocoIndex: | *Bytes* | `bytes` | | | | *Str* | `str` | | | | *Bool* | `bool` | | | +| *Enum* | `str`, `cocoindex.typing.Enum()` | | | | *Int64* | `cocoindex.Int64`, `int`, `numpy.int64` | | | | *Float32* | `cocoindex.Float32`, `numpy.float32` | *Float64* | | | *Float64* | `cocoindex.Float64`, `float`, `numpy.float64` | | | @@ -84,6 +85,9 @@ Notes: In Python, it's represented by `cocoindex.Json`. It's useful to hold data without fixed schema known at flow definition time. +#### Enum Type + +*Enum* represents a string-like enumerated type. In Python, use the helper from `cocoindex.typing`. #### Vector Types diff --git a/docs/docs/examples/examples/docs_to_knowledge_graph.md b/docs/docs/examples/examples/docs_to_knowledge_graph.md index 0c644f41..ad3a9918 100644 --- a/docs/docs/examples/examples/docs_to_knowledge_graph.md +++ b/docs/docs/examples/examples/docs_to_knowledge_graph.md @@ -373,4 +373,4 @@ You can open it at [http://localhost:7474](http://localhost:7474), and run the f MATCH p=()-->() RETURN p ``` -![Neo4j Browser](/img/examples/docs_to_knowledge_graph/neo4j_browser.png) \ No newline at end of file +![Neo4j Browser](/img/examples/docs_to_knowledge_graph/neo4j_browser.png) diff --git a/docs/docs/sources/index.md b/docs/docs/sources/index.md index 09cbe166..0857b146 100644 --- a/docs/docs/sources/index.md +++ b/docs/docs/sources/index.md @@ -17,6 +17,6 @@ In CocoIndex, a source is the data origin you import from (e.g., files, database | [Postgres](/docs/sources/postgres) | Relational database (Postgres) | Related: -- [Life cycle of a indexing flow](/docs/core/basics#life-cycle-of-an-indexing-flow) -- [Live Update Tutorial](/docs/tutorials/live_updates) +- [Life cycle of a indexing flow](/docs/core/basics#life-cycle-of-an-indexing-flow) +- [Live Update Tutorial](/docs/tutorials/live_updates) for change capture mechanisms. diff --git a/docs/docs/targets/index.md b/docs/docs/targets/index.md index c90d7654..f90a5c32 100644 --- a/docs/docs/targets/index.md +++ b/docs/docs/targets/index.md @@ -334,6 +334,3 @@ You can find end-to-end examples fitting into any of supported property graphs i * * - - - diff --git a/docs/docs/targets/kuzu.md b/docs/docs/targets/kuzu.md index 441e9e78..dc741063 100644 --- a/docs/docs/targets/kuzu.md +++ b/docs/docs/targets/kuzu.md @@ -13,7 +13,7 @@ Exports data to a [Kuzu](https://kuzu.com/) graph database. ## Get Started -Read [Property Graph Targets](./index.md#property-graph-targets) for more information to get started on how it works in CocoIndex. +Read [Property Graph Targets](./index.md#property-graph-targets) for more information to get started on how it works in CocoIndex. ## Spec @@ -59,4 +59,4 @@ You can then access the explorer at [http://localhost:8124](http://localhost:812 href="https://github.com/cocoindex-io/cocoindex/tree/main/examples/docs_to_knowledge_graph" text="Docs to Knowledge Graph" margin="16px 0 24px 0" -/> \ No newline at end of file +/> diff --git a/docs/docs/targets/neo4j.md b/docs/docs/targets/neo4j.md index ab9e0d16..5e4fdb22 100644 --- a/docs/docs/targets/neo4j.md +++ b/docs/docs/targets/neo4j.md @@ -11,7 +11,7 @@ import { ExampleButton } from '../../src/components/GitHubButton'; ## Get Started -Read [Property Graph Targets](./index.md#property-graph-targets) for more information to get started on how it works in CocoIndex. +Read [Property Graph Targets](./index.md#property-graph-targets) for more information to get started on how it works in CocoIndex. ## Spec @@ -59,4 +59,4 @@ If you are building multiple CocoIndex flows from different projects to neo4j, w This way, you can clean up the data for each flow independently. -In case you need to clean up the data in the same database, you can do it manually by running `cocoindex drop ` from the project you want to clean up. \ No newline at end of file +In case you need to clean up the data in the same database, you can do it manually by running `cocoindex drop ` from the project you want to clean up. diff --git a/examples/product_recommendation/README.md b/examples/product_recommendation/README.md index f3ce29b0..314464cf 100644 --- a/examples/product_recommendation/README.md +++ b/examples/product_recommendation/README.md @@ -8,7 +8,7 @@ Please drop [CocoIndex on Github](https://github.com/cocoindex-io/cocoindex) a s ## Prerequisite -* [Install Postgres](https://cocoindex.io/docs/getting_started/installation#-install-postgres) +* [Install Postgres](https://cocoindex.io/docs/getting_started/installation#-install-postgres) * Install [Neo4j](https://cocoindex.io/docs/targets/neo4j) * [Configure your OpenAI API key](https://cocoindex.io/docs/ai/llm#openai). diff --git a/python/cocoindex/typing.py b/python/cocoindex/typing.py index c4b0ef60..00244167 100644 --- a/python/cocoindex/typing.py +++ b/python/cocoindex/typing.py @@ -13,6 +13,8 @@ Literal, NamedTuple, Protocol, + Optional, + Sequence, TypeVar, overload, Self, @@ -64,6 +66,19 @@ def __init__(self, key: str, value: Any): LocalDateTime = Annotated[datetime.datetime, TypeKind("LocalDateTime")] OffsetDateTime = Annotated[datetime.datetime, TypeKind("OffsetDateTime")] + +def Enum(*, variants: Optional[Sequence[str]] = None) -> Any: + """ + String-like enumerated type. Use `variants` to hint allowed values. + Example: + color: Enum(variants=["red", "green", "blue"]) + At runtime this is a plain `str`; `variants` are emitted as schema attrs. + """ + if variants is not None: + return Annotated[str, TypeKind("Enum"), TypeAttr("variants", list(variants))] + return Annotated[str, TypeKind("Enum")] + + if TYPE_CHECKING: T_co = TypeVar("T_co", covariant=True) Dim_co = TypeVar("Dim_co", bound=int | None, covariant=True, default=None) @@ -587,6 +602,7 @@ class BasicValueType: "OffsetDateTime", "TimeDelta", "Json", + "Enum", "Vector", "Union", ] diff --git a/src/base/json_schema.rs b/src/base/json_schema.rs index c7a9756c..3c14ea7e 100644 --- a/src/base/json_schema.rs +++ b/src/base/json_schema.rs @@ -1,6 +1,6 @@ use crate::prelude::*; - use crate::utils::immutable::RefList; +use indexmap::IndexMap; use schemars::schema::{ ArrayValidation, InstanceType, ObjectValidation, Schema, SchemaObject, SingleOrVec, SubschemaValidation, @@ -74,6 +74,9 @@ impl JsonSchemaBuilder { schema::BasicValueType::Str => { schema.instance_type = Some(SingleOrVec::Single(Box::new(InstanceType::String))); } + schema::BasicValueType::Enum => { + schema.instance_type = Some(SingleOrVec::Single(Box::new(InstanceType::String))); + } schema::BasicValueType::Bytes => { schema.instance_type = Some(SingleOrVec::Single(Box::new(InstanceType::String))); } @@ -245,15 +248,34 @@ impl JsonSchemaBuilder { field_path.prepend(&f.name), ); if self.options.fields_always_required && f.value_type.nullable { - if let Some(instance_type) = &mut field_schema.instance_type { - let mut types = match instance_type { - SingleOrVec::Single(t) => vec![**t], - SingleOrVec::Vec(t) => std::mem::take(t), + if field_schema.enum_values.is_some() { + // Keep the enum as-is and support null via oneOf + let non_null = Schema::Object(field_schema); + let null_branch = Schema::Object(SchemaObject { + instance_type: Some(SingleOrVec::Single(Box::new( + InstanceType::Null, + ))), + ..Default::default() + }); + field_schema = SchemaObject { + subschemas: Some(Box::new(SubschemaValidation { + one_of: Some(vec![non_null, null_branch]), + ..Default::default() + })), + ..Default::default() }; - types.push(InstanceType::Null); - *instance_type = SingleOrVec::Vec(types); + } else { + if let Some(instance_type) = &mut field_schema.instance_type { + let mut types = match instance_type { + SingleOrVec::Single(t) => vec![**t], + SingleOrVec::Vec(t) => std::mem::take(t), + }; + types.push(InstanceType::Null); + *instance_type = SingleOrVec::Vec(types); + } } } + (f.name.to_string(), field_schema.into()) }) .collect(), @@ -298,9 +320,26 @@ impl JsonSchemaBuilder { enriched_value_type: &schema::EnrichedValueType, field_path: RefList<'_, &'_ spec::FieldName>, ) -> SchemaObject { - self.for_value_type(schema_base, &enriched_value_type.typ, field_path) - } + let mut out = self.for_value_type(schema_base, &enriched_value_type.typ, field_path); + + if let schema::ValueType::Basic(schema::BasicValueType::Enum) = &enriched_value_type.typ { + if let Some(variants) = enriched_value_type.attrs.get("variants") { + if let Some(arr) = variants.as_array() { + let enum_values: Vec = arr + .iter() + .filter_map(|v| { + v.as_str().map(|s| serde_json::Value::String(s.to_string())) + }) + .collect(); + if !enum_values.is_empty() { + out.enum_values = Some(enum_values); + } + } + } + } + out + } fn build_extra_instructions(&self) -> Result> { if self.extra_instructions_per_field.is_empty() { return Ok(None); @@ -458,6 +497,53 @@ mod tests { .assert_eq(&serde_json::to_string_pretty(&json_schema).unwrap()); } + #[test] + fn test_basic_types_enum_without_variants() { + let value_type = EnrichedValueType { + typ: ValueType::Basic(BasicValueType::Enum), + nullable: false, + attrs: Arc::new(BTreeMap::new()), + }; + let options = create_test_options(); + let result = build_json_schema(value_type, options).unwrap(); + let json_schema = schema_to_json(&result.schema); + + expect![[r#" + { + "type": "string" + }"#]] + .assert_eq(&serde_json::to_string_pretty(&json_schema).unwrap()); + } + + #[test] + fn test_basic_types_enum_with_variants() { + let mut attrs = BTreeMap::new(); + attrs.insert( + "variants".to_string(), + serde_json::json!(["red", "green", "blue"]), + ); + + let value_type = EnrichedValueType { + typ: ValueType::Basic(BasicValueType::Enum), + nullable: false, + attrs: Arc::new(attrs), + }; + let options = create_test_options(); + let result = build_json_schema(value_type, options).unwrap(); + let json_schema = schema_to_json(&result.schema); + + expect![[r#" + { + "enum": [ + "red", + "green", + "blue" + ], + "type": "string" + }"#]] + .assert_eq(&serde_json::to_string_pretty(&json_schema).unwrap()); + } + #[test] fn test_basic_types_bool() { let value_type = EnrichedValueType { diff --git a/src/base/schema.rs b/src/base/schema.rs index 37898687..5d49532f 100644 --- a/src/base/schema.rs +++ b/src/base/schema.rs @@ -23,6 +23,9 @@ pub enum BasicValueType { /// String encoded in UTF-8. Str, + /// Enumerated symbolic value. + Enum, + /// A boolean value. Bool, @@ -71,6 +74,7 @@ impl std::fmt::Display for BasicValueType { match self { BasicValueType::Bytes => write!(f, "Bytes"), BasicValueType::Str => write!(f, "Str"), + BasicValueType::Enum => write!(f, "Enum"), BasicValueType::Bool => write!(f, "Bool"), BasicValueType::Int64 => write!(f, "Int64"), BasicValueType::Float32 => write!(f, "Float32"), diff --git a/src/base/value.rs b/src/base/value.rs index 882097b5..9b24262d 100644 --- a/src/base/value.rs +++ b/src/base/value.rs @@ -202,6 +202,7 @@ impl KeyPart { KeyPart::Bytes(Bytes::from(BASE64_STANDARD.decode(v)?)) } BasicValueType::Str => KeyPart::Str(Arc::from(v)), + BasicValueType::Enum => KeyPart::Str(Arc::from(v)), BasicValueType::Bool => KeyPart::Bool(v.parse()?), BasicValueType::Int64 => KeyPart::Int64(v.parse()?), BasicValueType::Range => { @@ -1136,6 +1137,7 @@ impl BasicValue { BasicValue::Bytes(Bytes::from(BASE64_STANDARD.decode(v)?)) } (serde_json::Value::String(v), BasicValueType::Str) => BasicValue::Str(Arc::from(v)), + (serde_json::Value::String(v), BasicValueType::Enum) => BasicValue::Str(Arc::from(v)), (serde_json::Value::Bool(v), BasicValueType::Bool) => BasicValue::Bool(v), (serde_json::Value::Number(v), BasicValueType::Int64) => BasicValue::Int64( v.as_i64() diff --git a/src/ops/targets/kuzu.rs b/src/ops/targets/kuzu.rs index d6b0bbc1..4e247fb6 100644 --- a/src/ops/targets/kuzu.rs +++ b/src/ops/targets/kuzu.rs @@ -101,6 +101,7 @@ fn basic_type_to_kuzu(basic_type: &BasicValueType) -> Result { Ok(match basic_type { BasicValueType::Bytes => "BLOB".to_string(), BasicValueType::Str => "STRING".to_string(), + BasicValueType::Enum => "STRING".to_string(), BasicValueType::Bool => "BOOL".to_string(), BasicValueType::Int64 => "INT64".to_string(), BasicValueType::Float32 => "FLOAT".to_string(), diff --git a/src/ops/targets/postgres.rs b/src/ops/targets/postgres.rs index ae808361..98d30de6 100644 --- a/src/ops/targets/postgres.rs +++ b/src/ops/targets/postgres.rs @@ -474,6 +474,7 @@ fn to_column_type_sql(column_type: &ValueType) -> String { ValueType::Basic(basic_type) => match basic_type { BasicValueType::Bytes => "bytea".into(), BasicValueType::Str => "text".into(), + BasicValueType::Enum => "text".into(), BasicValueType::Bool => "boolean".into(), BasicValueType::Int64 => "bigint".into(), BasicValueType::Float32 => "real".into(), diff --git a/src/py/convert.rs b/src/py/convert.rs index 67e25489..62ceb1ed 100644 --- a/src/py/convert.rs +++ b/src/py/convert.rs @@ -156,6 +156,7 @@ fn basic_value_from_py_object<'py>( value::BasicValue::Bytes(Bytes::from(v.extract::>()?)) } schema::BasicValueType::Str => value::BasicValue::Str(Arc::from(v.extract::()?)), + schema::BasicValueType::Enum => value::BasicValue::Str(Arc::from(v.extract::()?)), schema::BasicValueType::Bool => value::BasicValue::Bool(v.extract::()?), schema::BasicValueType::Int64 => value::BasicValue::Int64(v.extract::()?), schema::BasicValueType::Float32 => value::BasicValue::Float32(v.extract::()?), From 35dce84c6090f65eeda921ead19a4a7a670c2c28 Mon Sep 17 00:00:00 2001 From: MrAnayDongre Date: Thu, 13 Nov 2025 01:07:56 -0800 Subject: [PATCH 2/3] updated feat(enum): engine+python --- python/cocoindex/typing.py | 13 ------------- src/base/json_schema.rs | 39 ++++++++++++++++++++++++++----------- src/base/schema.rs | 8 ++++++-- src/base/value.rs | 6 ++++-- src/ops/targets/kuzu.rs | 2 +- src/ops/targets/postgres.rs | 2 +- src/py/convert.rs | 4 +++- 7 files changed, 43 insertions(+), 31 deletions(-) diff --git a/python/cocoindex/typing.py b/python/cocoindex/typing.py index 00244167..007ac905 100644 --- a/python/cocoindex/typing.py +++ b/python/cocoindex/typing.py @@ -66,19 +66,6 @@ def __init__(self, key: str, value: Any): LocalDateTime = Annotated[datetime.datetime, TypeKind("LocalDateTime")] OffsetDateTime = Annotated[datetime.datetime, TypeKind("OffsetDateTime")] - -def Enum(*, variants: Optional[Sequence[str]] = None) -> Any: - """ - String-like enumerated type. Use `variants` to hint allowed values. - Example: - color: Enum(variants=["red", "green", "blue"]) - At runtime this is a plain `str`; `variants` are emitted as schema attrs. - """ - if variants is not None: - return Annotated[str, TypeKind("Enum"), TypeAttr("variants", list(variants))] - return Annotated[str, TypeKind("Enum")] - - if TYPE_CHECKING: T_co = TypeVar("T_co", covariant=True) Dim_co = TypeVar("Dim_co", bound=int | None, covariant=True, default=None) diff --git a/src/base/json_schema.rs b/src/base/json_schema.rs index 3c14ea7e..3c833c71 100644 --- a/src/base/json_schema.rs +++ b/src/base/json_schema.rs @@ -74,7 +74,7 @@ impl JsonSchemaBuilder { schema::BasicValueType::Str => { schema.instance_type = Some(SingleOrVec::Single(Box::new(InstanceType::String))); } - schema::BasicValueType::Enum => { + schema::BasicValueType::Enum(_) => { schema.instance_type = Some(SingleOrVec::Single(Box::new(InstanceType::String))); } schema::BasicValueType::Bytes => { @@ -322,20 +322,33 @@ impl JsonSchemaBuilder { ) -> SchemaObject { let mut out = self.for_value_type(schema_base, &enriched_value_type.typ, field_path); - if let schema::ValueType::Basic(schema::BasicValueType::Enum) = &enriched_value_type.typ { - if let Some(variants) = enriched_value_type.attrs.get("variants") { - if let Some(arr) = variants.as_array() { - let enum_values: Vec = arr + if let schema::ValueType::Basic(schema::BasicValueType::Enum(enum_t)) = + &enriched_value_type.typ + { + let mut vals: Vec = enum_t + .variants + .iter() + .map(|s| serde_json::Value::String(s.to_string())) + .collect(); + + if vals.is_empty() { + if let Some(a) = enriched_value_type + .attrs + .get("variants") + .and_then(|v| v.as_array()) + { + vals = a .iter() .filter_map(|v| { v.as_str().map(|s| serde_json::Value::String(s.to_string())) }) .collect(); - if !enum_values.is_empty() { - out.enum_values = Some(enum_values); - } } } + + if !vals.is_empty() { + out.enum_values = Some(vals); + } } out @@ -500,10 +513,11 @@ mod tests { #[test] fn test_basic_types_enum_without_variants() { let value_type = EnrichedValueType { - typ: ValueType::Basic(BasicValueType::Enum), + typ: ValueType::Basic(BasicValueType::Enum(EnumTypeSchema::default())), nullable: false, attrs: Arc::new(BTreeMap::new()), }; + let options = create_test_options(); let result = build_json_schema(value_type, options).unwrap(); let json_schema = schema_to_json(&result.schema); @@ -524,10 +538,13 @@ mod tests { ); let value_type = EnrichedValueType { - typ: ValueType::Basic(BasicValueType::Enum), + typ: ValueType::Basic(BasicValueType::Enum(EnumTypeSchema { + variants: vec!["red".into(), "green".into(), "blue".into()], + })), nullable: false, - attrs: Arc::new(attrs), + attrs: Arc::new(BTreeMap::new()), }; + let options = create_test_options(); let result = build_json_schema(value_type, options).unwrap(); let json_schema = schema_to_json(&result.schema); diff --git a/src/base/schema.rs b/src/base/schema.rs index 5d49532f..4bcf5485 100644 --- a/src/base/schema.rs +++ b/src/base/schema.rs @@ -13,6 +13,10 @@ pub struct VectorTypeSchema { pub struct UnionTypeSchema { pub types: Vec, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +pub struct EnumTypeSchema { + pub variants: Vec>, +} #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] #[serde(tag = "kind")] @@ -24,7 +28,7 @@ pub enum BasicValueType { Str, /// Enumerated symbolic value. - Enum, + Enum(EnumTypeSchema), /// A boolean value. Bool, @@ -74,7 +78,7 @@ impl std::fmt::Display for BasicValueType { match self { BasicValueType::Bytes => write!(f, "Bytes"), BasicValueType::Str => write!(f, "Str"), - BasicValueType::Enum => write!(f, "Enum"), + BasicValueType::Enum(_) => write!(f, "Enum"), BasicValueType::Bool => write!(f, "Bool"), BasicValueType::Int64 => write!(f, "Int64"), BasicValueType::Float32 => write!(f, "Float32"), diff --git a/src/base/value.rs b/src/base/value.rs index 9b24262d..53935e81 100644 --- a/src/base/value.rs +++ b/src/base/value.rs @@ -202,7 +202,7 @@ impl KeyPart { KeyPart::Bytes(Bytes::from(BASE64_STANDARD.decode(v)?)) } BasicValueType::Str => KeyPart::Str(Arc::from(v)), - BasicValueType::Enum => KeyPart::Str(Arc::from(v)), + BasicValueType::Enum(_) => KeyPart::Str(Arc::from(v)), BasicValueType::Bool => KeyPart::Bool(v.parse()?), BasicValueType::Int64 => KeyPart::Int64(v.parse()?), BasicValueType::Range => { @@ -1137,7 +1137,9 @@ impl BasicValue { BasicValue::Bytes(Bytes::from(BASE64_STANDARD.decode(v)?)) } (serde_json::Value::String(v), BasicValueType::Str) => BasicValue::Str(Arc::from(v)), - (serde_json::Value::String(v), BasicValueType::Enum) => BasicValue::Str(Arc::from(v)), + (serde_json::Value::String(v), BasicValueType::Enum(_)) => { + BasicValue::Str(Arc::from(v)) + } (serde_json::Value::Bool(v), BasicValueType::Bool) => BasicValue::Bool(v), (serde_json::Value::Number(v), BasicValueType::Int64) => BasicValue::Int64( v.as_i64() diff --git a/src/ops/targets/kuzu.rs b/src/ops/targets/kuzu.rs index 4e247fb6..0145c3fa 100644 --- a/src/ops/targets/kuzu.rs +++ b/src/ops/targets/kuzu.rs @@ -101,7 +101,7 @@ fn basic_type_to_kuzu(basic_type: &BasicValueType) -> Result { Ok(match basic_type { BasicValueType::Bytes => "BLOB".to_string(), BasicValueType::Str => "STRING".to_string(), - BasicValueType::Enum => "STRING".to_string(), + BasicValueType::Enum(_) => "STRING".to_string(), BasicValueType::Bool => "BOOL".to_string(), BasicValueType::Int64 => "INT64".to_string(), BasicValueType::Float32 => "FLOAT".to_string(), diff --git a/src/ops/targets/postgres.rs b/src/ops/targets/postgres.rs index 98d30de6..0eae2958 100644 --- a/src/ops/targets/postgres.rs +++ b/src/ops/targets/postgres.rs @@ -474,7 +474,7 @@ fn to_column_type_sql(column_type: &ValueType) -> String { ValueType::Basic(basic_type) => match basic_type { BasicValueType::Bytes => "bytea".into(), BasicValueType::Str => "text".into(), - BasicValueType::Enum => "text".into(), + BasicValueType::Enum(_) => "text".into(), BasicValueType::Bool => "boolean".into(), BasicValueType::Int64 => "bigint".into(), BasicValueType::Float32 => "real".into(), diff --git a/src/py/convert.rs b/src/py/convert.rs index 62ceb1ed..0793e0da 100644 --- a/src/py/convert.rs +++ b/src/py/convert.rs @@ -156,7 +156,9 @@ fn basic_value_from_py_object<'py>( value::BasicValue::Bytes(Bytes::from(v.extract::>()?)) } schema::BasicValueType::Str => value::BasicValue::Str(Arc::from(v.extract::()?)), - schema::BasicValueType::Enum => value::BasicValue::Str(Arc::from(v.extract::()?)), + schema::BasicValueType::Enum(_) => { + value::BasicValue::Str(Arc::from(v.extract::()?)) + } schema::BasicValueType::Bool => value::BasicValue::Bool(v.extract::()?), schema::BasicValueType::Int64 => value::BasicValue::Int64(v.extract::()?), schema::BasicValueType::Float32 => value::BasicValue::Float32(v.extract::()?), From 1764594241df881eb999ecb7b1b3d6c86d03882b Mon Sep 17 00:00:00 2001 From: MrAnayDongre Date: Sun, 16 Nov 2025 14:57:56 -0800 Subject: [PATCH 3/3] live_updater: show 1-min processing rate. Addresses #342 --- src/execution/live_updater.rs | 71 +++++++++++++++++++++++++++++++++-- 1 file changed, 68 insertions(+), 3 deletions(-) diff --git a/src/execution/live_updater.rs b/src/execution/live_updater.rs index 760219cf..b3fe4829 100644 --- a/src/execution/live_updater.rs +++ b/src/execution/live_updater.rs @@ -10,6 +10,8 @@ use super::stats; use futures::future::try_join_all; use indicatif::ProgressBar; use sqlx::PgPool; +use std::collections::VecDeque; +use std::time::Instant; use tokio::{sync::watch, task::JoinSet, time::MissedTickBehavior}; pub struct FlowLiveUpdaterUpdates { @@ -95,8 +97,9 @@ struct SourceUpdateTask { status_tx: watch::Sender, num_remaining_tasks_tx: watch::Sender, + start_time: Instant, + rate_history: std::sync::Mutex>, } - impl Drop for SourceUpdateTask { fn drop(&mut self) { self.status_tx.send_modify(|update| { @@ -303,19 +306,56 @@ impl SourceUpdateTask { try_join_all(futs).await?; Ok(()) } + fn rate_bracket(&self) -> String { + let now = std::time::Instant::now(); + let total = self.source_update_stats.num_insertions.get() + + self.source_update_stats.num_deletions.get() + + self.source_update_stats.num_reprocesses.get(); + + let mut hist = self.rate_history.lock().unwrap(); + + // Drop entries older than 60s, but keep at least one + while let Some((t, _)) = hist.front().copied() { + if now.duration_since(t) > std::time::Duration::from_secs(60) && hist.len() > 1 { + hist.pop_front(); + } else { + break; + } + } + + // Push current sample + hist.push_back((now, total)); + + // Baseline is earliest kept sample (or now) + let (base_t, base_c) = hist.front().copied().unwrap_or((now, total)); + let secs = now.duration_since(base_t).as_secs_f64(); + let rate = if secs > 0.0 { + (total - base_c) as f64 / secs + } else { + 0.0 + }; + + format!( + " [elapsed: {:.0}s, {:.3}/s now]", + self.start_time.elapsed().as_secs_f64(), + rate + ) + } fn report_stats(&self, stats: &stats::UpdateStats, update_title: &str) { self.source_update_stats.merge(stats); + let bracket = self.rate_bracket(); + if self.options.print_stats { println!( - "{}.{} ({update_title}): {}", + "{}.{} ({update_title}): {}{bracket}", self.flow.flow_instance.name, self.import_op().name, stats ); } else { trace!( - "{}.{} ({update_title}): {}", + "{}.{} ({update_title}): {}{bracket}", self.flow.flow_instance.name, self.import_op().name, stats @@ -323,6 +363,29 @@ impl SourceUpdateTask { } } + fn report_stats_with_extras( + &self, + stats: &stats::UpdateStats, + update_title: &str, + elapsed_secs: f64, + recent_rate: f64, + ) { + self.source_update_stats.merge(stats); + let line = format!( + "{}.{} ({update_title}): {} [elapsed: {:.3}s, {:.3}/s now]", + self.flow.flow_instance.name, + self.import_op().name, + stats, + elapsed_secs, + recent_rate + ); + if self.options.print_stats { + println!("{line}"); + } else { + trace!("{line}"); + } + } + async fn update_one_pass( &self, source_indexing_context: &Arc, @@ -458,6 +521,8 @@ impl FlowLiveUpdater { options: options.clone(), status_tx: status_tx.clone(), num_remaining_tasks_tx: num_remaining_tasks_tx.clone(), + start_time: Instant::now(), + rate_history: std::sync::Mutex::new(VecDeque::from([(Instant::now(), 0)])), }; join_set.spawn(source_update_task.run()); stats_per_task.push(source_update_stats);