Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
from typing import TYPE_CHECKING, Any
import time
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any, Optional

import torch
from lmcache.integration.vllm.vllm_v1_adapter import (
LMCacheConnectorV1Impl as LMCacheConnectorLatestImpl,
)

from vllm.config import VllmConfig
from vllm.distributed.kv_events import BlockStored, KVCacheEvent, KVEventBatch
from vllm.distributed.kv_transfer.kv_connector.v1.base import (
KVConnectorBase_V1,
KVConnectorMetadata,
KVConnectorRole,
)
from vllm.logger import init_logger
from vllm.v1.core.sched.output import SchedulerOutput
from vllm.v1.outputs import KVConnectorOutput

if TYPE_CHECKING:
from vllm.attention.backends.abstract import AttentionMetadata
Expand Down Expand Up @@ -54,6 +58,8 @@

self._lmcache_engine = cls(vllm_config, role, self)

self._kv_events: list[KVCacheEvent] = []

# ==============================
# Worker-side methods
# ==============================
Expand Down Expand Up @@ -136,6 +142,30 @@
"""
return self._lmcache_engine.get_finished(finished_req_ids)

def get_kv_connector_kv_cache_events(self) -> Optional["KVEventBatch"]:
"""
Get the KV connector kv cache events collected during the last interval.
"""
events = self._lmcache_engine.get_kv_events()

Check failure on line 149 in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py

View workflow job for this annotation

GitHub Actions / pre-commit

"LMCacheConnectorV1Impl" has no attribute "get_kv_events" [attr-defined]

Check failure on line 149 in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py

View workflow job for this annotation

GitHub Actions / pre-commit

"LMCacheConnectorV1Impl" has no attribute "get_kv_events" [attr-defined]

Check failure on line 149 in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py

View workflow job for this annotation

GitHub Actions / pre-commit

"LMCacheConnectorV1Impl" has no attribute "get_kv_events" [attr-defined]

Check failure on line 149 in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py

View workflow job for this annotation

GitHub Actions / pre-commit

"LMCacheConnectorV1Impl" has no attribute "get_kv_events" [attr-defined]
if not events:
return None

lmcache_kv_events: KVEventBatch | None = None
for event in events:
if lmcache_kv_events is None:
lmcache_kv_events = KVEventBatch(ts=time.time(), events=[])
block = BlockStored(
block_hashes=event.block_hashes,
parent_block_hash=event.parent_block_hash,
token_ids=event.token_ids,
lora_id=event.lora_id,
block_size=event.block_size,
medium=event.medium,
)
lmcache_kv_events.events.append(block)

return lmcache_kv_events

# ==============================
# Scheduler-side methods
# ==============================
Expand Down Expand Up @@ -183,6 +213,25 @@
"""
return self._lmcache_engine.build_connector_meta(scheduler_output)

def update_connector_output(self, connector_output: KVConnectorOutput):
"""
Update KVConnector state from worker-side connectors output.

Args:
connector_output (KVConnectorOutput): the worker-side
connectors output.
"""
# Get the KV events
kv_events = connector_output.kv_cache_events

Check failure on line 225 in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py

View workflow job for this annotation

GitHub Actions / pre-commit

"KVConnectorOutput" has no attribute "kv_cache_events" [attr-defined]

Check failure on line 225 in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py

View workflow job for this annotation

GitHub Actions / pre-commit

"KVConnectorOutput" has no attribute "kv_cache_events" [attr-defined]

Check failure on line 225 in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py

View workflow job for this annotation

GitHub Actions / pre-commit

"KVConnectorOutput" has no attribute "kv_cache_events" [attr-defined]

Check failure on line 225 in vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py

View workflow job for this annotation

GitHub Actions / pre-commit

"KVConnectorOutput" has no attribute "kv_cache_events" [attr-defined]
if (
not kv_events
or not isinstance(kv_events, KVEventBatch)
or not kv_events.events
):
return
self._kv_events.extend(kv_events.events)
return

def request_finished(
self,
request: "Request",
Expand All @@ -199,3 +248,14 @@
returned by the engine.
"""
return self._lmcache_engine.request_finished(request, block_ids)

def take_events(self) -> Iterable["KVCacheEvent"]:
"""
Take the KV cache events from the connector.

Yields:
New KV cache events since the last call.
"""
if self._kv_events is not None:
yield from self._kv_events
self._kv_events.clear()
Loading