-
|
Thank you so far for this great work. @cocoindex.flow_def(name="CodeEmbedding")
def code_embedding_flow(
flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope
) -> None:
""" Ingest from multiple folders and create embeddings """
code_embeddings = data_scope.add_collector()
# First folder
data_scope["files1"] = flow_builder.add_source(
cocoindex.sources.LocalFile(
path="/some/valid/path1",
included_patterns=_INCLUDED_PATTERN,
excluded_patterns=_EXCLUDED_PATTERN)
)
with data_scope["files1"].row() as file:
file["extension"] = file["filename"].transform(extract_extension)
file["chunks"] = file["content"].transform(
cocoindex.functions.SplitRecursively(),
language=file["extension"],
chunk_size=1000,
min_chunk_size=300,
chunk_overlap=300,
)
with file["chunks"].row() as chunk:
chunk["embedding"] = chunk["text"].call(code_to_embedding)
code_embeddings.collect(
filename=file["filename"],
location=chunk["location"],
code=chunk["text"],
embedding=chunk["embedding"],
start=chunk["start"],
end=chunk["end"],
)
# Seoncd folder -> Adding that will crash the app
data_scope["files2"] = flow_builder.add_source(
cocoindex.sources.LocalFile(
path="/some/valid/path2",
included_patterns=_INCLUDED_PATTERN,
excluded_patterns=_EXCLUDED_PATTERN)
)
with data_scope["files2"].row() as file:
file["extension"] = file["filename"].transform(extract_extension)
file["chunks"] = file["content"].transform(
cocoindex.functions.SplitRecursively(),
language=file["extension"],
chunk_size=1000,
min_chunk_size=300,
chunk_overlap=300,
)
with file["chunks"].row() as chunk:
chunk["embedding"] = chunk["text"].call(code_to_embedding)
code_embeddings.collect(
filename=file["filename"],
location=chunk["location"],
code=chunk["text"],
embedding=chunk["embedding"],
start=chunk["start"],
end=chunk["end"],
)
code_embeddings.export(
"coco_rag",
cocoindex.targets.Postgres(),
primary_key_fields=["filename", "location"],
vector_indexes=[
cocoindex.VectorIndexDef(
field_name="embedding",
metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY,
)
],
)With one source it works without any problem. As soon as I add one more (as final solution, this is loaded from a configuration) I run into the following exception when performing an update of the index (db was already updated with cocoindex setup ...): ...
thread 'tokio-runtime-worker' panicked at src/execution/evaluator.rs:233:14:
called `Option::unwrap()` on a `None` value
[2025-08-02T13:21:51Z ERROR cocoindex_engine::execution::source_indexer] JoinError::Panic(Id(50), "called `Option::unwrap()` on a `None` value", ...)
thread 'tokio-runtime-worker' panicked at src/execution/evaluator.rs:233:14:
called `Option::unwrap()` on a `None` value
[2025-08-02T13:21:51Z ERROR cocoindex_engine::execution::source_indexer] JoinError::Panic(Id(42), "called `Option::unwrap()` on a `None` value", ...)
thread 'tokio-runtime-worker' panicked at src/execution/evaluator.rs:233:14:
called `Option::unwrap()` on a `None` value
[2025-08-02T13:21:51Z ERROR cocoindex_engine::execution::source_indexer] JoinError::Panic(Id(63), "called `Option::unwrap()` on a `None` value", ...)
...Any idea how to solve that? |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 2 replies
-
|
It's a bug, we'll fix it soon. Note that cocoindex is designed to support multi source, but the support is not officially ready yet. We'll finalize this part soon and add a related example. |
Beta Was this translation helpful? Give feedback.
-
|
Hello, after updating to the newest cocoindex version (from 0.2 to 0.3.7) my flow, which was previously working perfect, unfortunately break. Issue is that if I try to import from multiple sources (this is done by configuration), I now run into an exception: with one source, it work as expected and as before. Here is the flow: """CocoRAG flow definitions for embedding and indexing.
This module defines the core data processing pipeline using CocoIndex framework:
- Sentence transformer embedding integration
- File extension extraction utilities
- Dataflow transformation definitions
- PostgreSQL table management for vector storage
- Integration with CocoIndex's incremental processing system
"""
import logging
import os
import cocoindex
import numpy as np
from numpy.typing import NDArray
from .config import get_config, get_table_name
logger = logging.getLogger(__name__)
@cocoindex.op.function()
def extract_extension(filename: str) -> str:
"""Extract the extension of a filename."""
return os.path.splitext(filename)[1]
@cocoindex.transform_flow()
def code_to_embedding(
text: cocoindex.DataSlice[str],
) -> cocoindex.DataSlice[NDArray[np.float32]]:
"""
Embed the text using sentence transformer model.
"""
# Use a simple sentence transformer model without any special args
return text.transform(cocoindex.functions.SentenceTransformerEmbed(model="sentence-transformers/all-MiniLM-L6-v2"))
@cocoindex.flow_def(name="CodeEmbedding")
def code_embedding_flow(flow_builder: cocoindex.FlowBuilder, data_scope: cocoindex.DataScope) -> None:
"""
Define a flow that embeds files from multiple sources into a vector database.
Args:
flow_builder: CocoIndex flow builder
data_scope: CocoIndex data scope
"""
# Get the global configuration instance (auto-initialized if needed)
config = get_config()
# Create a single collector for all sources but export it only once
# This ensures we have 1 export operation for the entire flow
code_embeddings = data_scope.add_collector()
# Process each source individually
# !! Works with multiple source in 0.2 without any problems,
# !! with 0.3, run into Exception: Import op count does not match export op count
# !! with only one source it works
#
for source_config in config.sources:
source_name = source_config.get("name", "unnamed_source")
topic = source_config.get("topic", None)
# Create a CocoIndex LocalFile source from the configuration
# This source contains file system connector with path, patterns, and exclusion rules
source = config.create_source_from_config(source_config)
# Add the source to the flow with a unique name to avoid conflicts
data_scope[f"files_{source_name}"] = flow_builder.add_source(source)
# Process files from this source
with data_scope[f"files_{source_name}"].row() as file:
file["extension"] = file["filename"].transform(extract_extension)
file["chunks"] = file["content"].transform(
cocoindex.functions.SplitRecursively(),
language=file["extension"],
chunk_size=config.chunk_size,
min_chunk_size=config.min_chunk_size,
chunk_overlap=config.chunk_overlap,
)
with file["chunks"].row() as chunk:
# Use the code_to_embedding transform flow directly
chunk["embedding"] = chunk["text"].call(code_to_embedding)
code_embeddings.collect(
source_name=source_name,
filename=file["filename"],
location=chunk["location"],
topic=topic,
code=chunk["text"],
embedding=chunk["embedding"],
start=chunk["start"],
end=chunk["end"],
)
# Export once for all sources combined
code_embeddings.export(
get_table_name(),
cocoindex.targets.Postgres(),
primary_key_fields=["source_name", "filename", "location"],
vector_indexes=[
cocoindex.VectorIndexDef(
field_name="embedding",
metric=cocoindex.VectorSimilarityMetric.COSINE_SIMILARITY,
)
],
)Any idea how to make that runnable again. |
Beta Was this translation helpful? Give feedback.
-
|
Works now again. Thank you for this super fast fix! |
Beta Was this translation helpful? Give feedback.
It's a bug, we'll fix it soon. Note that cocoindex is designed to support multi source, but the support is not officially ready yet. We'll finalize this part soon and add a related example.