Skip to content

Commit 1537fd1

Browse files
yehudit1987Yehudit Kerido
andauthored
Fix Milvus Vector Database Tests (#602)
Signed-off-by: Yehudit Kerido <ykerido@ykerido-thinkpadp1gen7.raanaii.csb> Co-authored-by: Yehudit Kerido <ykerido@ykerido-thinkpadp1gen7.raanaii.csb>
1 parent af59a95 commit 1537fd1

File tree

6 files changed

+132
-79
lines changed

6 files changed

+132
-79
lines changed

.github/workflows/test-and-build.yml

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,48 @@ jobs:
8383
HF_HUB_DISABLE_TELEMETRY: 1
8484
run: make download-models
8585

86+
- name: Start Milvus service
87+
run: |
88+
echo "Starting Milvus vector database..."
89+
docker run -d \
90+
--name milvus-semantic-cache \
91+
--security-opt seccomp:unconfined \
92+
-e ETCD_USE_EMBED=true \
93+
-e ETCD_DATA_DIR=/var/lib/milvus/etcd \
94+
-e ETCD_CONFIG_PATH=/milvus/configs/advanced/etcd.yaml \
95+
-e COMMON_STORAGETYPE=local \
96+
-e CLUSTER_ENABLED=false \
97+
-p 19530:19530 \
98+
-p 9091:9091 \
99+
milvusdb/milvus:v2.3.3 \
100+
milvus run standalone
101+
102+
echo "Waiting for Milvus to be ready..."
103+
sleep 20
104+
105+
# Verify Milvus is responsive
106+
timeout 30 bash -c 'until docker logs milvus-semantic-cache 2>&1 | grep -q "Proxy successfully started"; do sleep 2; done' || true
107+
108+
echo "Milvus is ready at localhost:19530"
109+
docker ps --filter "name=milvus-semantic-cache"
110+
86111
- name: Run semantic router tests
87112
run: make test
88113
env:
89114
CI: true
90115
CI_MINIMAL_MODELS: ${{ github.event_name == 'pull_request' }}
91116
CGO_ENABLED: 1
92117
LD_LIBRARY_PATH: ${{ github.workspace }}/candle-binding/target/release
118+
MILVUS_URI: localhost:19530
119+
SKIP_MILVUS_TESTS: false
120+
121+
- name: Stop Milvus service
122+
if: always()
123+
run: |
124+
echo "Stopping Milvus container..."
125+
docker stop milvus-semantic-cache || true
126+
docker rm milvus-semantic-cache || true
127+
echo "Milvus container cleaned up"
93128
94129
- name: Upload test artifacts on failure
95130
if: failure()

src/semantic-router/pkg/cache/cache_test.go

Lines changed: 72 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1203,28 +1203,14 @@ func TestHybridCacheDisabled(t *testing.T) {
12031203

12041204
// TestHybridCacheBasicOperations tests basic cache operations
12051205
func TestHybridCacheBasicOperations(t *testing.T) {
1206-
// Skip if Milvus is not configured
1207-
if os.Getenv("MILVUS_URI") == "" {
1208-
t.Skip("Skipping: MILVUS_URI not set")
1209-
}
1206+
t.Log("Starting TestHybridCacheBasicOperations - this may take 30-60 seconds...")
12101207

12111208
// Create a test Milvus config
1212-
milvusConfig := "/tmp/test_milvus_config.yaml"
1213-
err := os.WriteFile(milvusConfig, []byte(`
1214-
milvus:
1215-
address: "localhost:19530"
1216-
collection_name: "test_hybrid_cache"
1217-
dimension: 384
1218-
index_type: "HNSW"
1219-
metric_type: "IP"
1220-
params:
1221-
M: 16
1222-
efConstruction: 200
1223-
`), 0o644)
1209+
milvusConfig, cleanup, err := createTestMilvusConfig("test_hybrid_cache", 200, true)
12241210
if err != nil {
12251211
t.Fatalf("Failed to create test config: %v", err)
12261212
}
1227-
defer os.Remove(milvusConfig)
1213+
defer cleanup()
12281214

12291215
cache, err := NewHybridCache(HybridCacheOptions{
12301216
Enabled: true,
@@ -1260,7 +1246,8 @@ milvus:
12601246
}
12611247

12621248
// Test FindSimilar with exact same query (should hit)
1263-
time.Sleep(100 * time.Millisecond) // Allow indexing to complete
1249+
// Wait for Milvus to index the entry
1250+
time.Sleep(2 * time.Second)
12641251

12651252
response, found, err := cache.FindSimilar("gpt-4", testQuery)
12661253
if err != nil {
@@ -1303,25 +1290,13 @@ milvus:
13031290

13041291
// TestHybridCachePendingRequest tests pending request flow
13051292
func TestHybridCachePendingRequest(t *testing.T) {
1306-
// Skip if Milvus is not configured
1307-
if os.Getenv("MILVUS_URI") == "" {
1308-
t.Skip("Skipping: MILVUS_URI not set")
1309-
}
1293+
t.Log("Starting TestHybridCachePendingRequest - this may take 30-60 seconds...")
13101294

1311-
milvusConfig := "/tmp/test_milvus_pending_config.yaml"
1312-
err := os.WriteFile(milvusConfig, []byte(`
1313-
milvus:
1314-
address: "localhost:19530"
1315-
collection_name: "test_hybrid_pending"
1316-
dimension: 384
1317-
index_type: "HNSW"
1318-
metric_type: "IP"
1319-
`),
1320-
0o644)
1295+
milvusConfig, cleanup, err := createTestMilvusConfig("test_hybrid_pending", 64, true)
13211296
if err != nil {
13221297
t.Fatalf("Failed to create test config: %v", err)
13231298
}
1324-
defer os.Remove(milvusConfig)
1299+
defer cleanup()
13251300

13261301
cache, err := NewHybridCache(HybridCacheOptions{
13271302
Enabled: true,
@@ -1367,25 +1342,13 @@ milvus:
13671342

13681343
// TestHybridCacheEviction tests memory eviction behavior
13691344
func TestHybridCacheEviction(t *testing.T) {
1370-
// Skip if Milvus is not configured
1371-
if os.Getenv("MILVUS_URI") == "" {
1372-
t.Skip("Skipping: MILVUS_URI not set")
1373-
}
1345+
t.Log("Starting TestHybridCacheEviction - this may take 30-60 seconds...")
13741346

1375-
milvusConfig := "/tmp/test_milvus_eviction_config.yaml"
1376-
err := os.WriteFile(milvusConfig, []byte(`
1377-
milvus:
1378-
address: "localhost:19530"
1379-
collection_name: "test_hybrid_eviction"
1380-
dimension: 384
1381-
index_type: "HNSW"
1382-
metric_type: "IP"
1383-
`),
1384-
0o644)
1347+
milvusConfig, cleanup, err := createTestMilvusConfig("test_hybrid_eviction", 64, true)
13851348
if err != nil {
13861349
t.Fatalf("Failed to create test config: %v", err)
13871350
}
1388-
defer os.Remove(milvusConfig)
1351+
defer cleanup()
13891352

13901353
// Create cache with very small memory limit
13911354
cache, err := NewHybridCache(HybridCacheOptions{
@@ -1418,7 +1381,8 @@ milvus:
14181381

14191382
// All entries should still be in Milvus
14201383
// Try to find a recent entry (should be in memory)
1421-
time.Sleep(100 * time.Millisecond)
1384+
// Wait for Milvus to index all entries
1385+
time.Sleep(2 * time.Second)
14221386
_, found, err := cache.FindSimilar("gpt-4", "Query number 9")
14231387
if err != nil {
14241388
t.Fatalf("FindSimilar failed: %v", err)
@@ -1438,25 +1402,13 @@ milvus:
14381402

14391403
// TestHybridCacheLocalCacheHit tests local cache hot path
14401404
func TestHybridCacheLocalCacheHit(t *testing.T) {
1441-
// Skip if Milvus is not configured
1442-
if os.Getenv("MILVUS_URI") == "" {
1443-
t.Skip("Skipping: MILVUS_URI not set")
1444-
}
1405+
t.Log("Starting TestHybridCacheLocalCacheHit - this may take 30-60 seconds...")
14451406

1446-
milvusConfig := "/tmp/test_milvus_local_config.yaml"
1447-
err := os.WriteFile(milvusConfig, []byte(`
1448-
milvus:
1449-
address: "localhost:19530"
1450-
collection_name: "test_hybrid_local"
1451-
dimension: 384
1452-
index_type: "HNSW"
1453-
metric_type: "IP"
1454-
`),
1455-
0o644)
1407+
milvusConfig, cleanup, err := createTestMilvusConfig("test_hybrid_local", 64, true)
14561408
if err != nil {
14571409
t.Fatalf("Failed to create test config: %v", err)
14581410
}
1459-
defer os.Remove(milvusConfig)
1411+
defer cleanup()
14601412

14611413
cache, err := NewHybridCache(HybridCacheOptions{
14621414
Enabled: true,
@@ -1478,16 +1430,18 @@ milvus:
14781430
t.Fatalf("Failed to add entry: %v", err)
14791431
}
14801432

1481-
time.Sleep(100 * time.Millisecond)
1433+
// Wait longer for Milvus to index the entry
1434+
time.Sleep(2 * time.Second)
14821435

14831436
// First search - should populate local cache
1484-
_, found, err := cache.FindSimilar("gpt-4", testQuery)
1437+
response1, found, err := cache.FindSimilar("gpt-4", testQuery)
14851438
if err != nil {
14861439
t.Fatalf("FindSimilar failed: %v", err)
14871440
}
14881441
if !found {
14891442
t.Fatal("Expected to find entry")
14901443
}
1444+
t.Logf("First search returned: %s", string(response1))
14911445

14921446
// Second search - should hit local cache (much faster)
14931447
startTime := time.Now()
@@ -1499,6 +1453,7 @@ milvus:
14991453
if !found {
15001454
t.Fatal("Expected to find entry in local cache")
15011455
}
1456+
t.Logf("Second search returned: %s", string(response))
15021457
if string(response) != string(testResponse) {
15031458
t.Errorf("Response mismatch: got %s, want %s", string(response), string(testResponse))
15041459
}
@@ -1776,6 +1731,47 @@ func (dcc *DatabaseCallCounter) Reset() {
17761731
atomic.StoreInt64(&dcc.calls, 0)
17771732
}
17781733

1734+
// createTestMilvusConfig creates a temporary Milvus config file for testing
1735+
// Returns the path to the config file and cleanup function
1736+
func createTestMilvusConfig(collectionName string, efConstruction int, dropOnStartup bool) (string, func(), error) {
1737+
configPath := fmt.Sprintf("/tmp/test_milvus_%s_config.yaml", collectionName)
1738+
1739+
configYAML := fmt.Sprintf(`connection:
1740+
host: "localhost"
1741+
port: 19530
1742+
timeout: 30
1743+
collection:
1744+
name: "%s"
1745+
vector_field:
1746+
name: "embedding"
1747+
dimension: 384
1748+
metric_type: "IP"
1749+
index:
1750+
type: "HNSW"
1751+
params:
1752+
M: 16
1753+
efConstruction: %d
1754+
search:
1755+
params:
1756+
ef: 64
1757+
topk: 10
1758+
development:
1759+
auto_create_collection: true
1760+
drop_collection_on_startup: %t
1761+
`, collectionName, efConstruction, dropOnStartup)
1762+
1763+
err := os.WriteFile(configPath, []byte(configYAML), 0o644)
1764+
if err != nil {
1765+
return "", nil, fmt.Errorf("failed to create test config: %w", err)
1766+
}
1767+
1768+
cleanup := func() {
1769+
os.Remove(configPath)
1770+
}
1771+
1772+
return configPath, cleanup, nil
1773+
}
1774+
17791775
// getMilvusConfigPath returns the path to milvus.yaml config file
17801776
func getMilvusConfigPath() string {
17811777
// Check for environment variable first
@@ -2475,7 +2471,14 @@ func writeBenchmarkResultToCSV(file *os.File, result BenchmarkResult) {
24752471

24762472
// TestHybridVsMilvusSmoke is a quick smoke test to verify both caches work
24772473
func TestHybridVsMilvusSmoke(t *testing.T) {
2478-
t.Skip("Skipping smoke test in short mode")
2474+
t.Log("Starting TestHybridVsMilvusSmoke - this may take 2-3 minutes...")
2475+
2476+
// Create test Milvus config
2477+
milvusConfig, cleanup, err := createTestMilvusConfig("test_smoke_cache", 64, true)
2478+
if err != nil {
2479+
t.Fatalf("Failed to create test config: %v", err)
2480+
}
2481+
defer cleanup()
24792482

24802483
// Initialize BERT model
24812484
useCPU := os.Getenv("USE_CPU") != "false"
@@ -2490,7 +2493,7 @@ func TestHybridVsMilvusSmoke(t *testing.T) {
24902493
Enabled: true,
24912494
SimilarityThreshold: 0.85,
24922495
TTLSeconds: 3600,
2493-
ConfigPath: getMilvusConfigPath(),
2496+
ConfigPath: milvusConfig,
24942497
})
24952498
if err != nil {
24962499
t.Fatalf("Failed to create Milvus cache: %v", err)
@@ -2531,7 +2534,7 @@ func TestHybridVsMilvusSmoke(t *testing.T) {
25312534
MaxMemoryEntries: 1000,
25322535
HNSWM: 16,
25332536
HNSWEfConstruction: 200,
2534-
MilvusConfigPath: getMilvusConfigPath(),
2537+
MilvusConfigPath: milvusConfig,
25352538
})
25362539
if err != nil {
25372540
t.Fatalf("Failed to create Hybrid cache: %v", err)

src/semantic-router/pkg/cache/hybrid_cache.go

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -761,28 +761,37 @@ func (h *HybridCache) evictOneUnsafe() {
761761
return
762762
}
763763

764-
// Simple FIFO eviction: remove oldest entry
764+
// Simple FIFO eviction: remove oldest entry (index 0)
765765
victimIdx := 0
766766

767-
// Could use LRU/LFU here by tracking access times/counts
768-
// For now, just evict the first entry
769-
770767
// Get milvusID before removing from map (for logging)
771768
milvusID := h.idMap[victimIdx]
772769

773-
// Remove from structures
774-
delete(h.idMap, victimIdx)
770+
// Remove the embedding from the slice
771+
h.embeddings = h.embeddings[1:]
772+
773+
// Rebuild idMap with adjusted indices (all indices shift down by 1)
774+
newIDMap := make(map[int]string, len(h.idMap)-1)
775+
for idx, id := range h.idMap {
776+
if idx > victimIdx {
777+
newIDMap[idx-1] = id // Shift index down
778+
}
779+
// Skip victimIdx (it's being evicted)
780+
}
781+
h.idMap = newIDMap
775782

776-
// Note: We don't remove from Milvus (data persists there)
777-
// We also don't rebuild HNSW (mark as stale)
783+
// Mark HNSW index as stale (needs rebuild with new indices)
778784
h.hnswIndex.markStale()
779785

780786
atomic.AddInt64(&h.evictCount, 1)
781787

788+
logging.Debugf("HybridCache.evictOne: evicted entry at index %d (milvus_id=%s), new size=%d",
789+
victimIdx, milvusID, len(h.embeddings))
782790
logging.LogEvent("hybrid_cache_evicted", map[string]interface{}{
783791
"backend": "hybrid",
784792
"milvus_id": milvusID,
785793
"hnsw_index": victimIdx,
794+
"new_size": len(h.embeddings),
786795
"max_entries": h.maxMemoryEntries,
787796
})
788797
}

src/semantic-router/pkg/cache/milvus_cache.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,7 @@ func (c *MilvusCache) createCollection() error {
374374
}
375375

376376
// Create index with updated API
377-
index, err := entity.NewIndexHNSW(entity.MetricType(c.config.Collection.VectorField.MetricType), c.config.Collection.Index.Params.EfConstruction, c.config.Collection.Index.Params.M)
377+
index, err := entity.NewIndexHNSW(entity.MetricType(c.config.Collection.VectorField.MetricType), c.config.Collection.Index.Params.M, c.config.Collection.Index.Params.EfConstruction)
378378
if err != nil {
379379
return fmt.Errorf("failed to create HNSW index: %w", err)
380380
}
@@ -843,11 +843,12 @@ func (c *MilvusCache) GetByID(ctx context.Context, requestID string) ([]byte, er
843843
logging.Debugf("MilvusCache.GetByID: fetching requestID='%s'", requestID)
844844

845845
// Query Milvus by request_id (primary key)
846+
// Filter for non-empty responses to avoid race condition with pending entries
846847
queryResult, err := c.client.Query(
847848
ctx,
848849
c.collectionName,
849850
[]string{}, // Empty partitions means search all
850-
fmt.Sprintf("request_id == \"%s\"", requestID),
851+
fmt.Sprintf("request_id == \"%s\" && response_body != \"\"", requestID),
851852
[]string{"response_body"}, // Only fetch document, not embedding!
852853
)
853854
if err != nil {

tools/make/build-run-test.mk

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ test-semantic-router: build-router
3737
@$(LOG_TARGET)
3838
@export LD_LIBRARY_PATH=${PWD}/candle-binding/target/release && \
3939
export SKIP_MILVUS_TESTS=$${SKIP_MILVUS_TESTS:-true} && \
40+
export SR_TEST_MODE=true && \
4041
cd src/semantic-router && CGO_ENABLED=1 go test -v ./...
4142

4243
# Test the Rust library and the Go binding

0 commit comments

Comments
 (0)