Skip to content

Commit c3258e6

Browse files
authored
chore: fix cache unit test (#613)
Signed-off-by: Huamin Chen <hchen@redhat.com>
1 parent f203719 commit c3258e6

File tree

1 file changed

+59
-19
lines changed

1 file changed

+59
-19
lines changed

src/semantic-router/pkg/cache/milvus_cache.go

Lines changed: 59 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -428,8 +428,10 @@ func (c *MilvusCache) UpdateWithResponse(requestID string, responseBody []byte)
428428

429429
logging.Debugf("MilvusCache.UpdateWithResponse: searching for pending entry with expr: %s", queryExpr)
430430

431+
// Note: We don't explicitly request "id" since Milvus auto-includes the primary key
432+
// We request model, query, request_body and will detect which column is which
431433
results, err := c.client.Query(ctx, c.collectionName, []string{}, queryExpr,
432-
[]string{"id", "model", "query", "request_body"})
434+
[]string{"model", "query", "request_body"})
433435
if err != nil {
434436
logging.Debugf("MilvusCache.UpdateWithResponse: query failed: %v", err)
435437
metrics.RecordCacheOperation("milvus", "update_response", "error", time.Since(start).Seconds())
@@ -442,31 +444,69 @@ func (c *MilvusCache) UpdateWithResponse(requestID string, responseBody []byte)
442444
return fmt.Errorf("no pending entry found")
443445
}
444446

445-
// Get the model and request body from the pending entry
446-
idColumn := results[0].(*entity.ColumnVarChar)
447-
modelColumn := results[1].(*entity.ColumnVarChar)
448-
queryColumn := results[2].(*entity.ColumnVarChar)
449-
requestColumn := results[3].(*entity.ColumnVarChar)
447+
// Milvus automatically includes the primary key in results but order is non-deterministic
448+
// We requested ["model", "query", "request_body"], expect 3-4 columns (primary key may be auto-included)
449+
// Strategy: Find the ID column (32-char hex string), then map remaining columns
450+
if len(results) < 3 {
451+
logging.Debugf("MilvusCache.UpdateWithResponse: unexpected result count: %d", len(results))
452+
metrics.RecordCacheOperation("milvus", "update_response", "error", time.Since(start).Seconds())
453+
return fmt.Errorf("incomplete query result: expected 3+ columns, got %d", len(results))
454+
}
450455

451-
if idColumn.Len() > 0 {
452-
id := idColumn.Data()[0]
453-
model := modelColumn.Data()[0]
454-
query := queryColumn.Data()[0]
455-
requestBody := requestColumn.Data()[0]
456+
var id, model, query, requestBody string
457+
idColIndex := -1
456458

457-
logging.Debugf("MilvusCache.UpdateWithResponse: found pending entry, adding complete entry (id: %s, model: %s)", id, model)
459+
// First pass: find the ID column (32-char hex string = MD5 hash)
460+
for i := 0; i < len(results); i++ {
461+
if col, ok := results[i].(*entity.ColumnVarChar); ok && col.Len() > 0 {
462+
val := col.Data()[0]
463+
if len(val) == 32 && isHexString(val) {
464+
id = val
465+
idColIndex = i
466+
break
467+
}
468+
}
469+
}
458470

459-
// Create the complete entry with response data
460-
err := c.addEntry(id, requestID, model, query, []byte(requestBody), responseBody)
461-
if err != nil {
462-
metrics.RecordCacheOperation("milvus", "update_response", "error", time.Since(start).Seconds())
463-
return fmt.Errorf("failed to add complete entry: %w", err)
471+
// Second pass: extract data fields in order, skipping the ID column
472+
dataFieldIndex := 0
473+
for i := 0; i < len(results); i++ {
474+
if i == idColIndex {
475+
continue // Skip the primary key column
476+
}
477+
if col, ok := results[i].(*entity.ColumnVarChar); ok && col.Len() > 0 {
478+
val := col.Data()[0]
479+
switch dataFieldIndex {
480+
case 0:
481+
model = val
482+
case 1:
483+
query = val
484+
case 2:
485+
requestBody = val
486+
}
487+
dataFieldIndex++
464488
}
489+
}
490+
491+
if id == "" || model == "" || query == "" {
492+
logging.Debugf("MilvusCache.UpdateWithResponse: failed to extract all required fields (id: %s, model: %s, query_len: %d)",
493+
id, model, len(query))
494+
metrics.RecordCacheOperation("milvus", "update_response", "error", time.Since(start).Seconds())
495+
return fmt.Errorf("failed to extract required fields from query result")
496+
}
465497

466-
logging.Debugf("MilvusCache.UpdateWithResponse: successfully added complete entry with response")
467-
metrics.RecordCacheOperation("milvus", "update_response", "success", time.Since(start).Seconds())
498+
logging.Debugf("MilvusCache.UpdateWithResponse: found pending entry, adding complete entry (id: %s, model: %s)", id, model)
499+
500+
// Create the complete entry with response data
501+
err = c.addEntry(id, requestID, model, query, []byte(requestBody), responseBody)
502+
if err != nil {
503+
metrics.RecordCacheOperation("milvus", "update_response", "error", time.Since(start).Seconds())
504+
return fmt.Errorf("failed to add complete entry: %w", err)
468505
}
469506

507+
logging.Debugf("MilvusCache.UpdateWithResponse: successfully added complete entry with response")
508+
metrics.RecordCacheOperation("milvus", "update_response", "success", time.Since(start).Seconds())
509+
470510
return nil
471511
}
472512

0 commit comments

Comments
 (0)