Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions engine/clients/client_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
PgVectorUploader,
)
from engine.clients.qdrant import QdrantConfigurator, QdrantSearcher, QdrantUploader
from engine.clients.qdrant_native import (
QdrantNativeConfigurator,
QdrantNativeSearcher,
QdrantNativeUploader,
)
from engine.clients.redis import RedisConfigurator, RedisSearcher, RedisUploader
from engine.clients.weaviate import (
WeaviateConfigurator,
Expand All @@ -33,6 +38,7 @@

ENGINE_CONFIGURATORS = {
"qdrant": QdrantConfigurator,
"qdrant_native": QdrantNativeConfigurator,
"weaviate": WeaviateConfigurator,
"milvus": MilvusConfigurator,
"elasticsearch": ElasticConfigurator,
Expand All @@ -43,6 +49,7 @@

ENGINE_UPLOADERS = {
"qdrant": QdrantUploader,
"qdrant_native": QdrantNativeUploader,
"weaviate": WeaviateUploader,
"milvus": MilvusUploader,
"elasticsearch": ElasticUploader,
Expand All @@ -53,6 +60,7 @@

ENGINE_SEARCHERS = {
"qdrant": QdrantSearcher,
"qdrant_native": QdrantNativeSearcher,
"weaviate": WeaviateSearcher,
"milvus": MilvusSearcher,
"elasticsearch": ElasticSearcher,
Expand Down
9 changes: 9 additions & 0 deletions engine/clients/qdrant_native/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .configure import QdrantNativeConfigurator
from .search import QdrantNativeSearcher
from .upload import QdrantNativeUploader

__all__ = [
"QdrantNativeConfigurator",
"QdrantNativeUploader",
"QdrantNativeSearcher",
]
4 changes: 4 additions & 0 deletions engine/clients/qdrant_native/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import os

QDRANT_COLLECTION_NAME = os.getenv("QDRANT_COLLECTION_NAME", "benchmark")
QDRANT_API_KEY = os.getenv("QDRANT_API_KEY", None)
132 changes: 132 additions & 0 deletions engine/clients/qdrant_native/configure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import httpx

from benchmark.dataset import Dataset
from engine.base_client.configure import BaseConfigurator
from engine.base_client.distances import Distance
from engine.clients.qdrant_native.config import QDRANT_API_KEY, QDRANT_COLLECTION_NAME


class QdrantNativeConfigurator(BaseConfigurator):
SPARSE_VECTOR_SUPPORT = True
DISTANCE_MAPPING = {
Distance.L2: "Euclid",
Distance.COSINE: "Cosine",
Distance.DOT: "Dot",
}
INDEX_TYPE_MAPPING = {
"int": "integer",
"keyword": "keyword",
"text": "text",
"float": "float",
"geo": "geo",
}

def __init__(self, host, collection_params: dict, connection_params: dict):
super().__init__(host, collection_params, connection_params)

self.host = f"http://{host.rstrip('/')}:6333"
self.connection_params = connection_params

self.headers = {"Content-Type": "application/json"}
if QDRANT_API_KEY:
self.headers["api-key"] = QDRANT_API_KEY

timeout = connection_params.get("timeout", 30)
self.client = httpx.Client(
headers=self.headers,
timeout=httpx.Timeout(timeout=timeout),
)

def clean(self):
"""Delete the collection"""
url = f"{self.host}/collections/{QDRANT_COLLECTION_NAME}"
response = self.client.delete(url)
# 404 is ok if collection doesn't exist
if response.status_code not in [200, 404]:
response.raise_for_status()

def recreate(self, dataset: Dataset, collection_params):
"""Create collection with proper configuration"""
url = f"{self.host}/collections/{QDRANT_COLLECTION_NAME}"

# Build vectors configuration
if dataset.config.type == "sparse":
vectors_config = {}
sparse_vectors_config = {
"sparse": {
"index": {
"on_disk": False,
}
}
}
else:
is_vectors_on_disk = self.collection_params.get("vectors_config", {}).get(
"on_disk", False
)
self.collection_params.pop("vectors_config", None)

vectors_config = {
"size": dataset.config.vector_size,
"distance": self.DISTANCE_MAPPING.get(dataset.config.distance),
"on_disk": is_vectors_on_disk,
}
sparse_vectors_config = None

payload_index_params = self.collection_params.pop("payload_index_params", {})
if not set(payload_index_params.keys()).issubset(dataset.config.schema.keys()):
raise ValueError("payload_index_params are not found in dataset schema")

# Set optimizers config - disable index building during upload by default
optimizers_config = self.collection_params.setdefault("optimizers_config", {})
optimizers_config.setdefault("max_optimization_threads", 0)

# Build the collection creation payload
payload = {}
if vectors_config:
payload["vectors"] = vectors_config
if sparse_vectors_config:
payload["sparse_vectors"] = sparse_vectors_config

for key, value in self.collection_params.items():
payload[key] = value

response = self.client.put(url, json=payload)
response.raise_for_status()

for field_name, field_type in dataset.config.schema.items():
self._create_payload_index(field_name, field_type, payload_index_params)

def _create_payload_index(
self, field_name: str, field_type: str, payload_index_params: dict
):
"""Create a payload index for a specific field"""
url = f"{self.host}/collections/{QDRANT_COLLECTION_NAME}/index"

# Build the field schema based on type
if field_type in ["keyword", "uuid"]:
field_schema = {
"type": self.INDEX_TYPE_MAPPING.get(field_type, "keyword"),
}

# Add optional parameters if provided
params = payload_index_params.get(field_name, {})
if "is_tenant" in params and params["is_tenant"] is not None:
field_schema["is_tenant"] = params["is_tenant"]
if "on_disk" in params and params["on_disk"] is not None:
field_schema["on_disk"] = params["on_disk"]
else:
# For other types, just use the type string
field_schema = self.INDEX_TYPE_MAPPING.get(field_type, field_type)

payload = {
"field_name": field_name,
"field_schema": field_schema,
}

response = self.client.put(url, json=payload)
response.raise_for_status()

def delete_client(self):
"""Cleanup HTTP client"""
if hasattr(self, "client") and self.client is not None:
self.client.close()
70 changes: 70 additions & 0 deletions engine/clients/qdrant_native/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from typing import Any, List, Optional

from engine.base_client.parser import BaseConditionParser, FieldValue


class QdrantNativeConditionParser(BaseConditionParser):
"""
Parser that converts internal filter format to Qdrant REST API JSON format.
Returns plain dictionaries instead of Pydantic models.
"""

def build_condition(
self, and_subfilters: Optional[List[Any]], or_subfilters: Optional[List[Any]]
) -> Optional[Any]:
"""Build a filter condition combining AND/OR subfilters"""
filter_dict = {}

if and_subfilters:
filter_dict["must"] = and_subfilters

if or_subfilters:
filter_dict["should"] = or_subfilters

return filter_dict if filter_dict else None

def build_exact_match_filter(self, field_name: str, value: FieldValue) -> Any:
"""Build an exact match filter"""
return {
"key": field_name,
"match": {"value": value},
}

def build_range_filter(
self,
field_name: str,
lt: Optional[FieldValue],
gt: Optional[FieldValue],
lte: Optional[FieldValue],
gte: Optional[FieldValue],
) -> Any:
"""Build a range filter"""
range_dict = {}
if lt is not None:
range_dict["lt"] = lt
if gt is not None:
range_dict["gt"] = gt
if lte is not None:
range_dict["lte"] = lte
if gte is not None:
range_dict["gte"] = gte

return {
"key": field_name,
"range": range_dict,
}

def build_geo_filter(
self, field_name: str, lat: float, lon: float, radius: float
) -> Any:
"""Build a geo radius filter"""
return {
"key": field_name,
"geo_radius": {
"center": {
"lon": lon,
"lat": lat,
},
"radius": radius,
},
}
93 changes: 93 additions & 0 deletions engine/clients/qdrant_native/search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from typing import List, Tuple

import httpx

from dataset_reader.base_reader import Query
from engine.base_client.search import BaseSearcher
from engine.clients.qdrant_native.config import QDRANT_API_KEY, QDRANT_COLLECTION_NAME
from engine.clients.qdrant_native.parser import QdrantNativeConditionParser


class QdrantNativeSearcher(BaseSearcher):
search_params = {}
client: httpx.Client = None
parser = QdrantNativeConditionParser()
host = None
headers = {}

@classmethod
def init_client(cls, host, distance, connection_params: dict, search_params: dict):
cls.host = f"http://{host.rstrip('/')}:6333"
cls.search_params = search_params

# Build headers
cls.headers = {"Content-Type": "application/json"}
if QDRANT_API_KEY:
cls.headers["api-key"] = QDRANT_API_KEY

# Create HTTP client
timeout = connection_params.get("timeout", 30)
cls.client = httpx.Client(
headers=cls.headers,
timeout=httpx.Timeout(timeout=timeout),
limits=httpx.Limits(max_connections=None, max_keepalive_connections=0),
)

@classmethod
def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]:
"""Execute a single search query using REST API"""
url = f"{cls.host}/collections/{QDRANT_COLLECTION_NAME}/points/query"

if query.sparse_vector is None:
query_vector = query.vector
else:
query_vector = {
"indices": query.sparse_vector.indices,
"values": query.sparse_vector.values,
}

payload = {
"query": query_vector,
"limit": top,
}

if query.sparse_vector is not None:
payload["using"] = "sparse"

query_filter = cls.parser.parse(query.meta_conditions)
if query_filter:
payload["filter"] = query_filter

search_config = cls.search_params.get("config", {})
if search_config:
payload["params"] = search_config

prefetch_config = cls.search_params.get("prefetch")
if prefetch_config:
prefetch = {
**prefetch_config,
"query": query_vector,
}
payload["prefetch"] = [prefetch]

with_payload = cls.search_params.get("with_payload", False)
payload["with_payload"] = with_payload

try:
response = cls.client.post(url, json=payload)
response.raise_for_status()
result = response.json()

points = result["result"]["points"]
return [(point["id"], point["score"]) for point in points]

except Exception as ex:
print(f"Something went wrong during search: {ex}")
raise ex

@classmethod
def delete_client(cls):
"""Cleanup HTTP client"""
if cls.client is not None:
cls.client.close()
cls.client = None
Loading