Skip to content
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 114 additions & 1 deletion vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from typing import TYPE_CHECKING, Any
from collections.abc import Iterable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Optional

import torch
from lmcache.integration.vllm.vllm_v1_adapter import (
LMCacheConnectorV1Impl as LMCacheConnectorLatestImpl,
)

from vllm.config import VllmConfig
from vllm.distributed.kv_events import BlockStored, KVCacheEvent
from vllm.distributed.kv_transfer.kv_connector.v1.base import (
KVConnectorBase_V1,
KVConnectorMetadata,
KVConnectorRole,
)
from vllm.distributed.kv_transfer.kv_connector.v1.metrics import KVConnectorStats
from vllm.logger import init_logger
from vllm.v1.core.sched.output import SchedulerOutput
from vllm.v1.outputs import KVConnectorOutput

if TYPE_CHECKING:
from vllm.attention.backends.abstract import AttentionMetadata
Expand All @@ -26,6 +31,50 @@
logger = init_logger(__name__)


@dataclass
class LMCacheKVEvents(KVConnectorStats):
"""
Maintain a list of KV events
"""

def aggregate(self, other: "KVConnectorStats") -> "LMCacheKVEvents":
if not other and not isinstance(other, LMCacheKVEvents):
raise TypeError("Can only aggregate with another LMCacheKVEvents")

if other.is_empty():
return self

if self.is_empty():
self.data["kv_events"] = []

other_events = other.get_kv_events()

Check failure on line 50 in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py

View workflow job for this annotation

GitHub Actions / pre-commit

"KVConnectorStats" has no attribute "get_kv_events" [attr-defined]

Check failure on line 50 in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py

View workflow job for this annotation

GitHub Actions / pre-commit

"KVConnectorStats" has no attribute "get_kv_events" [attr-defined]

Check failure on line 50 in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py

View workflow job for this annotation

GitHub Actions / pre-commit

"KVConnectorStats" has no attribute "get_kv_events" [attr-defined]

Check failure on line 50 in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py

View workflow job for this annotation

GitHub Actions / pre-commit

"KVConnectorStats" has no attribute "get_kv_events" [attr-defined]
for other_event in other_events:
self.data["kv_events"].append(other_event)

return self

def reset(self):
self.data.clear()

def reduce(self) -> dict[str, int | float]:
return {
"kv_events": 0,
}

def add_kv_event(self, event: BlockStored):
if self.is_empty():
self.data["kv_events"] = []
self.data["kv_events"].append(event)

def get_kv_events(self) -> list[BlockStored] | None:
if self.is_empty():
return None
return self.data["kv_events"]

def is_empty(self) -> bool:
return not self.data or self.data.get("kv_events", 0) == 0


class LMCacheConnectorV1(KVConnectorBase_V1):
def __init__(
self,
Expand Down Expand Up @@ -54,6 +103,8 @@

self._lmcache_engine = cls(vllm_config, role, self)

self._kv_events: list[KVCacheEvent] = []

# ==============================
# Worker-side methods
# ==============================
Expand Down Expand Up @@ -136,6 +187,32 @@
"""
return self._lmcache_engine.get_finished(finished_req_ids)

def get_kv_connector_stats(self) -> Optional["KVConnectorStats"]:
"""
Get the KV connector stats collected during the last interval.
"""
assert self._lmcache_engine is not None

events = self._lmcache_engine.get_kv_events()

Check failure on line 196 in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py

View workflow job for this annotation

GitHub Actions / pre-commit

"LMCacheConnectorV1Impl" has no attribute "get_kv_events" [attr-defined]

Check failure on line 196 in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py

View workflow job for this annotation

GitHub Actions / pre-commit

"LMCacheConnectorV1Impl" has no attribute "get_kv_events" [attr-defined]

Check failure on line 196 in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py

View workflow job for this annotation

GitHub Actions / pre-commit

"LMCacheConnectorV1Impl" has no attribute "get_kv_events" [attr-defined]
if not events:
return None

lmcache_kv_events: LMCacheKVEvents | None = None
for event in events:
if lmcache_kv_events is None:
lmcache_kv_events = LMCacheKVEvents()
block = BlockStored(
block_hashes=event.block_hashes,
parent_block_hash=event.parent_block_hash,
token_ids=event.token_ids,
lora_id=event.lora_id,
block_size=event.block_size,
medium=event.medium,
)
lmcache_kv_events.add_kv_event(block)

return lmcache_kv_events

# ==============================
# Scheduler-side methods
# ==============================
Expand Down Expand Up @@ -183,6 +260,25 @@
"""
return self._lmcache_engine.build_connector_meta(scheduler_output)

def update_connector_output(self, connector_output: KVConnectorOutput):
"""
Update KVConnector state from worker-side connectors output.
Args:
connector_output (KVConnectorOutput): the worker-side
connectors output.
"""
# Get the KV events
kv_events = connector_output.kv_connector_stats
if (
not kv_events
or not isinstance(kv_events, LMCacheKVEvents)
or kv_events.is_empty()
):
return
self._kv_events = kv_events.get_kv_events()

Check failure on line 279 in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py

View workflow job for this annotation

GitHub Actions / pre-commit

Incompatible types in assignment (expression has type "list[BlockStored] | None", variable has type "list[KVCacheEvent]") [assignment]

Check failure on line 279 in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py

View workflow job for this annotation

GitHub Actions / pre-commit

Incompatible types in assignment (expression has type "list[BlockStored] | None", variable has type "list[KVCacheEvent]") [assignment]

Check failure on line 279 in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py

View workflow job for this annotation

GitHub Actions / pre-commit

Incompatible types in assignment (expression has type "list[BlockStored] | None", variable has type "list[KVCacheEvent]") [assignment]
return

def request_finished(
self,
request: "Request",
Expand All @@ -199,3 +295,20 @@
returned by the engine.
"""
return self._lmcache_engine.request_finished(request, block_ids)

def take_events(self) -> Iterable["KVCacheEvent"]:
"""
Take the KV cache events from the connector.
Yields:
New KV cache events since the last call.
"""
if self._kv_events is not None:
yield from self._kv_events
self._kv_events.clear()

@classmethod
def build_kv_connector_stats(
cls, data: dict[str, Any] | None = None
) -> KVConnectorStats | None:
return LMCacheKVEvents(data=data) if data is not None else LMCacheKVEvents()