Skip to content

Commit 1573b3a

Browse files
committed
refactor: eventstore trait
1 parent 200bb59 commit 1573b3a

File tree

3 files changed

+79
-16
lines changed

3 files changed

+79
-16
lines changed
Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
mod in_memory_event_store;
2-
use async_trait::async_trait;
3-
pub use in_memory_event_store::*;
42

53
use crate::{EventId, SessionId, StreamId};
4+
use async_trait::async_trait;
5+
pub use in_memory_event_store::*;
66

77
#[derive(Debug, Clone)]
88
pub struct EventStoreMessages {
@@ -11,17 +11,77 @@ pub struct EventStoreMessages {
1111
pub messages: Vec<String>,
1212
}
1313

14+
/// Trait defining the interface for event storage and retrieval, used by the MCP server
15+
/// to store and replay events for state reconstruction after client reconnection
1416
#[async_trait]
1517
pub trait EventStore: Send + Sync {
18+
/// Stores a new event in the store and returns the generated event ID.
19+
/// For MCP, this stores protocol messages, timestamp is the number of microseconds since UNIX_EPOCH.
20+
/// The timestamp helps determine the order in which messages arrived.
21+
///
22+
/// # Parameters
23+
/// - `session_id`: The session identifier for the event.
24+
/// - `stream_id`: The stream identifier within the session.
25+
/// - `timestamp`: The u128 timestamp of the event.
26+
/// - `message`: The event payload as json string.
27+
///
28+
/// # Returns
29+
/// - `Ok(EventId)`: The generated ID (format: session_id:stream_id:timestamp) on success.
30+
/// - `Err(Self::Error)`: If input is invalid or storage fails.
1631
async fn store_event(
1732
&self,
1833
session_id: SessionId,
1934
stream_id: StreamId,
20-
time_stamp: u128,
35+
timestamp: u128,
2136
message: String,
22-
) -> EventId;
37+
) -> Option<EventId>;
38+
39+
/// Removes all events associated with a given session ID.
40+
/// Used to clean up all events for a session when it is no longer needed (e.g., session ended).
41+
///
42+
/// # Parameters
43+
/// - `session_id`: The session ID whose events should be removed.
44+
///
2345
async fn remove_by_session_id(&self, session_id: SessionId);
46+
/// Removes all events for a specific stream within a session.
47+
/// Useful for cleaning up a specific stream without affecting others.
48+
///
49+
/// # Parameters
50+
/// - `session_id`: The session ID containing the stream.
51+
/// - `stream_id`: The stream ID whose events should be removed.
52+
///
53+
/// # Returns
54+
/// - `Ok(())`: On successful deletion.
55+
/// - `Err(Self::Error)`: If deletion fails.
2456
async fn remove_stream_in_session(&self, session_id: SessionId, stream_id: StreamId);
57+
/// Clears all events from the store.
58+
/// Used for resetting the store.
59+
///
2560
async fn clear(&self);
61+
/// Retrieves events after a given event ID for a session and stream.
62+
/// Critical for MCP server to replay events after a client reconnects, starting from the last known event.
63+
/// Events are returned in chronological order (ascending timestamp) to reconstruct state.
64+
///
65+
/// # Parameters
66+
/// - `last_event_id`: The event ID to fetch events after.
67+
///
68+
/// # Returns
69+
/// - `Some(Some(EventStoreMessages))`: Events after the specified ID, if any.
70+
/// - `None`: If no events exist after it OR the event ID is invalid.
2671
async fn events_after(&self, last_event_id: EventId) -> Option<EventStoreMessages>;
72+
/// Prunes excess events to control storage usage.
73+
/// Implementations may apply custom logic, such as limiting
74+
/// the number of events per session or removing events older than a certain timestamp.
75+
/// Default implementation logs a warning if not overridden by the store.
76+
///
77+
/// # Parameters
78+
/// - `session_id`: Optional session ID to prune a specific session; if None, prunes all sessions.
79+
async fn prune_excess_events(&self, _session_id: Option<SessionId>) {
80+
tracing::warn!("prune_excess_events() is not implemented for the event store.");
81+
}
82+
/// Counts the total number of events in the store.
83+
///
84+
/// # Returns
85+
/// - The number of events across all sessions and streams.
86+
async fn count(&self) -> usize;
2787
}

crates/rust-mcp-transport/src/event_store/in_memory_event_store.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ impl EventStore for InMemoryEventStore {
147147
stream_id: StreamId,
148148
time_stamp: u128,
149149
message: String,
150-
) -> EventId {
150+
) -> Option<EventId> {
151151
let event_id = self.generate_event_id(&session_id, &stream_id, time_stamp);
152152

153153
let mut storage_map = self.storage_map.write().await;
@@ -172,7 +172,7 @@ impl EventStore for InMemoryEventStore {
172172

173173
session_map.push_back(entry);
174174

175-
event_id
175+
Some(event_id)
176176
}
177177

178178
/// Removes all events associated with a given stream ID within a specific session.
@@ -271,4 +271,9 @@ impl EventStore for InMemoryEventStore {
271271
let mut storage_map = self.storage_map.write().await;
272272
storage_map.clear();
273273
}
274+
275+
async fn count(&self) -> usize {
276+
let storage_map = self.storage_map.read().await;
277+
storage_map.len()
278+
}
274279
}

crates/rust-mcp-transport/src/message_dispatcher.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -412,16 +412,14 @@ impl McpDispatch<ClientMessages, ServerMessages, ClientMessage, ServerMessage>
412412
self.stream_id.as_ref(),
413413
self.event_store.as_ref(),
414414
) {
415-
event_id = Some(
416-
event_store
417-
.store_event(
418-
session_id.clone(),
419-
stream_id.clone(),
420-
current_timestamp(),
421-
payload.to_owned(),
422-
)
423-
.await,
424-
)
415+
event_id = event_store
416+
.store_event(
417+
session_id.clone(),
418+
stream_id.clone(),
419+
current_timestamp(),
420+
payload.to_owned(),
421+
)
422+
.await
425423
};
426424
}
427425

0 commit comments

Comments
 (0)