Skip to content

Commit 150e3a0

Browse files
authored
refactor: eventstore with better error handling and stability (#109)
* refactor: eventstore trait * feat: enhance EventStore with better error handling and stability
1 parent 200bb59 commit 150e3a0

File tree

5 files changed

+169
-50
lines changed

5 files changed

+169
-50
lines changed

crates/rust-mcp-sdk/src/mcp_http/mcp_http_utils.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,15 @@ async fn create_sse_stream(
187187
tokio::spawn(async move {
188188
if let Some(last_event_id) = last_event_id {
189189
if let Some(event_store) = state.event_store.as_ref() {
190-
if let Some(events) = event_store.events_after(last_event_id).await {
190+
let events = event_store
191+
.events_after(last_event_id)
192+
.await
193+
.unwrap_or_else(|err| {
194+
tracing::error!("{err}");
195+
None
196+
});
197+
198+
if let Some(events) = events {
191199
for message_payload in events.messages {
192200
// skip storing replay messages
193201
let error = transport.write_str(&message_payload, true).await;

crates/rust-mcp-sdk/tests/test_streamable_http_server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1428,6 +1428,7 @@ async fn should_store_and_include_event_ids_in_server_sse_messages() {
14281428
.unwrap()
14291429
.events_after(first_id)
14301430
.await
1431+
.unwrap()
14311432
.unwrap();
14321433
assert_eq!(events.messages.len(), 1);
14331434

Lines changed: 100 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,118 @@
11
mod in_memory_event_store;
2-
use async_trait::async_trait;
3-
pub use in_memory_event_store::*;
42

53
use crate::{EventId, SessionId, StreamId};
4+
use async_trait::async_trait;
5+
pub use in_memory_event_store::*;
6+
use thiserror::Error;
67

78
#[derive(Debug, Clone)]
8-
pub struct EventStoreMessages {
9+
pub struct EventStoreEntry {
910
pub session_id: SessionId,
1011
pub stream_id: StreamId,
1112
pub messages: Vec<String>,
1213
}
1314

15+
#[derive(Debug, Error)]
16+
#[error("{message}")]
17+
pub struct EventStoreError {
18+
pub message: String,
19+
}
20+
21+
impl From<&str> for EventStoreError {
22+
fn from(s: &str) -> Self {
23+
EventStoreError {
24+
message: s.to_string(),
25+
}
26+
}
27+
}
28+
29+
impl From<String> for EventStoreError {
30+
fn from(s: String) -> Self {
31+
EventStoreError { message: s }
32+
}
33+
}
34+
35+
type EventStoreResult<T> = Result<T, EventStoreError>;
36+
37+
/// Trait defining the interface for event storage and retrieval, used by the MCP server
38+
/// to store and replay events for state reconstruction after client reconnection
1439
#[async_trait]
1540
pub trait EventStore: Send + Sync {
41+
/// Stores a new event in the store and returns the generated event ID.
42+
/// For MCP, this stores protocol messages, timestamp is the number of microseconds since UNIX_EPOCH.
43+
/// The timestamp helps determine the order in which messages arrived.
44+
///
45+
/// # Parameters
46+
/// - `session_id`: The session identifier for the event.
47+
/// - `stream_id`: The stream identifier within the session.
48+
/// - `timestamp`: The u128 timestamp of the event.
49+
/// - `message`: The event payload as json string.
50+
///
51+
/// # Returns
52+
/// - `Ok(EventId)`: The generated ID (format: session_id:stream_id:timestamp) on success.
53+
/// - `Err(Self::Error)`: If input is invalid or storage fails.
1654
async fn store_event(
1755
&self,
1856
session_id: SessionId,
1957
stream_id: StreamId,
20-
time_stamp: u128,
58+
timestamp: u128,
2159
message: String,
22-
) -> EventId;
23-
async fn remove_by_session_id(&self, session_id: SessionId);
24-
async fn remove_stream_in_session(&self, session_id: SessionId, stream_id: StreamId);
25-
async fn clear(&self);
26-
async fn events_after(&self, last_event_id: EventId) -> Option<EventStoreMessages>;
60+
) -> EventStoreResult<EventId>;
61+
62+
/// Removes all events associated with a given session ID.
63+
/// Used to clean up all events for a session when it is no longer needed (e.g., session ended).
64+
///
65+
/// # Parameters
66+
/// - `session_id`: The session ID whose events should be removed.
67+
///
68+
async fn remove_by_session_id(&self, session_id: SessionId) -> EventStoreResult<()>;
69+
/// Removes all events for a specific stream within a session.
70+
/// Useful for cleaning up a specific stream without affecting others.
71+
///
72+
/// # Parameters
73+
/// - `session_id`: The session ID containing the stream.
74+
/// - `stream_id`: The stream ID whose events should be removed.
75+
///
76+
/// # Returns
77+
/// - `Ok(())`: On successful deletion.
78+
/// - `Err(Self::Error)`: If deletion fails.
79+
async fn remove_stream_in_session(
80+
&self,
81+
session_id: SessionId,
82+
stream_id: StreamId,
83+
) -> EventStoreResult<()>;
84+
/// Clears all events from the store.
85+
/// Used for resetting the store.
86+
///
87+
async fn clear(&self) -> EventStoreResult<()>;
88+
/// Retrieves events after a given event ID for a session and stream.
89+
/// Critical for MCP server to replay events after a client reconnects, starting from the last known event.
90+
/// Events are returned in chronological order (ascending timestamp) to reconstruct state.
91+
///
92+
/// # Parameters
93+
/// - `last_event_id`: The event ID to fetch events after.
94+
///
95+
/// # Returns
96+
/// - `Some(Some(EventStoreEntry))`: Events after the specified ID, if any.
97+
/// - `None`: If no events exist after it OR the event ID is invalid.
98+
async fn events_after(
99+
&self,
100+
last_event_id: EventId,
101+
) -> EventStoreResult<Option<EventStoreEntry>>;
102+
/// Prunes excess events to control storage usage.
103+
/// Implementations may apply custom logic, such as limiting
104+
/// the number of events per session or removing events older than a certain timestamp.
105+
/// Default implementation logs a warning if not overridden by the store.
106+
///
107+
/// # Parameters
108+
/// - `session_id`: Optional session ID to prune a specific session; if None, prunes all sessions.
109+
async fn prune_excess_events(&self, _session_id: Option<SessionId>) -> EventStoreResult<()> {
110+
tracing::warn!("prune_excess_events() is not implemented for the event store.");
111+
Ok(())
112+
}
113+
/// Counts the total number of events in the store.
114+
///
115+
/// # Returns
116+
/// - The number of events across all sessions and streams.
117+
async fn count(&self) -> EventStoreResult<usize>;
27118
}

crates/rust-mcp-transport/src/event_store/in_memory_event_store.rs

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1+
use crate::event_store::EventStoreResult;
2+
use crate::{
3+
event_store::{EventStore, EventStoreEntry},
4+
EventId, SessionId, StreamId,
5+
};
16
use async_trait::async_trait;
27
use std::collections::HashMap;
38
use std::collections::VecDeque;
49
use tokio::sync::RwLock;
510

6-
use crate::{
7-
event_store::{EventStore, EventStoreMessages},
8-
EventId, SessionId, StreamId,
9-
};
10-
1111
const MAX_EVENTS_PER_SESSION: usize = 64;
1212
const ID_SEPARATOR: &str = "-.-";
1313

@@ -101,16 +101,19 @@ impl InMemoryEventStore {
101101
/// );
102102
/// assert_eq!(store.parse_event_id("invalid"), None);
103103
/// ```
104-
pub fn parse_event_id<'a>(&self, event_id: &'a str) -> Option<(&'a str, &'a str, &'a str)> {
104+
pub fn parse_event_id<'a>(
105+
&self,
106+
event_id: &'a str,
107+
) -> EventStoreResult<(&'a str, &'a str, u128)> {
105108
// Check for empty input or invalid characters (e.g., NULL)
106109
if event_id.is_empty() || event_id.contains('\0') {
107-
return None;
110+
return Err("Event ID is empty!".into());
108111
}
109112

110113
// Split into exactly three parts
111114
let parts: Vec<&'a str> = event_id.split(ID_SEPARATOR).collect();
112115
if parts.len() != 3 {
113-
return None;
116+
return Err("Invalid Event ID format.".into());
114117
}
115118

116119
let session_id = parts[0];
@@ -119,10 +122,14 @@ impl InMemoryEventStore {
119122

120123
// Ensure no part is empty
121124
if session_id.is_empty() || stream_id.is_empty() || time_stamp.is_empty() {
122-
return None;
125+
return Err("Invalid Event ID format.".into());
123126
}
124127

125-
Some((session_id, stream_id, time_stamp))
128+
let time_stamp: u128 = time_stamp
129+
.parse()
130+
.map_err(|err| format!("Error parsing timestamp: {err}"))?;
131+
132+
Ok((session_id, stream_id, time_stamp))
126133
}
127134
}
128135

@@ -147,7 +154,7 @@ impl EventStore for InMemoryEventStore {
147154
stream_id: StreamId,
148155
time_stamp: u128,
149156
message: String,
150-
) -> EventId {
157+
) -> EventStoreResult<EventId> {
151158
let event_id = self.generate_event_id(&session_id, &stream_id, time_stamp);
152159

153160
let mut storage_map = self.storage_map.write().await;
@@ -172,7 +179,7 @@ impl EventStore for InMemoryEventStore {
172179

173180
session_map.push_back(entry);
174181

175-
event_id
182+
Ok(event_id)
176183
}
177184

178185
/// Removes all events associated with a given stream ID within a specific session.
@@ -184,7 +191,11 @@ impl EventStore for InMemoryEventStore {
184191
/// # Arguments
185192
/// - `session_id`: The session identifier to target.
186193
/// - `stream_id`: The stream identifier to remove.
187-
async fn remove_stream_in_session(&self, session_id: SessionId, stream_id: StreamId) {
194+
async fn remove_stream_in_session(
195+
&self,
196+
session_id: SessionId,
197+
stream_id: StreamId,
198+
) -> EventStoreResult<()> {
188199
let mut storage_map = self.storage_map.write().await;
189200

190201
// Check if session exists
@@ -194,9 +205,10 @@ impl EventStore for InMemoryEventStore {
194205
// Remove session if empty
195206
if events.is_empty() {
196207
storage_map.remove(&session_id);
197-
}
208+
};
198209
}
199210
// No action if session_id doesn’t exist (idempotent)
211+
Ok(())
200212
}
201213

202214
/// Removes all events associated with a given session ID.
@@ -205,9 +217,10 @@ impl EventStore for InMemoryEventStore {
205217
///
206218
/// # Arguments
207219
/// - `session_id`: The session identifier to remove.
208-
async fn remove_by_session_id(&self, session_id: SessionId) {
220+
async fn remove_by_session_id(&self, session_id: SessionId) -> EventStoreResult<()> {
209221
let mut storage_map = self.storage_map.write().await;
210222
storage_map.remove(&session_id);
223+
Ok(())
211224
}
212225

213226
/// Retrieves events after a given `event_id` for a specific session and stream.
@@ -221,23 +234,20 @@ impl EventStore for InMemoryEventStore {
221234
/// - `last_event_id`: The event ID (format: `session-.-stream-.-timestamp`) to start after.
222235
///
223236
/// # Returns
224-
/// An `Option` containing `EventStoreMessages` with the session ID, stream ID, and sorted messages,
237+
/// An `Option` containing `EventStoreEntry` with the session ID, stream ID, and sorted messages,
225238
/// or `None` if no events are found or the input is invalid.
226-
async fn events_after(&self, last_event_id: EventId) -> Option<EventStoreMessages> {
227-
let Some((session_id, stream_id, time_stamp)) = self.parse_event_id(&last_event_id) else {
228-
tracing::warn!("error parsing last event id: '{last_event_id}'");
229-
return None;
230-
};
239+
async fn events_after(
240+
&self,
241+
last_event_id: EventId,
242+
) -> EventStoreResult<Option<EventStoreEntry>> {
243+
let (session_id, stream_id, time_stamp) = self.parse_event_id(&last_event_id)?;
231244

232245
let storage_map = self.storage_map.read().await;
246+
247+
// fail silently if session id does not exists
233248
let Some(events) = storage_map.get(session_id) else {
234249
tracing::warn!("could not find the session_id in the store : '{session_id}'");
235-
return None;
236-
};
237-
238-
let Ok(time_stamp) = time_stamp.parse::<u128>() else {
239-
tracing::warn!("could not parse the timestamp: '{time_stamp}'");
240-
return None;
250+
return Ok(None);
241251
};
242252

243253
let events = match events
@@ -260,15 +270,21 @@ impl EventStore for InMemoryEventStore {
260270

261271
tracing::trace!("{} messages after '{last_event_id}'", events.len());
262272

263-
Some(EventStoreMessages {
273+
Ok(Some(EventStoreEntry {
264274
session_id: session_id.to_string(),
265275
stream_id: stream_id.to_string(),
266276
messages: events,
267-
})
277+
}))
268278
}
269279

270-
async fn clear(&self) {
280+
async fn clear(&self) -> EventStoreResult<()> {
271281
let mut storage_map = self.storage_map.write().await;
272282
storage_map.clear();
283+
Ok(())
284+
}
285+
286+
async fn count(&self) -> EventStoreResult<usize> {
287+
let storage_map = self.storage_map.read().await;
288+
Ok(storage_map.len())
273289
}
274290
}

crates/rust-mcp-transport/src/message_dispatcher.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -412,16 +412,19 @@ impl McpDispatch<ClientMessages, ServerMessages, ClientMessage, ServerMessage>
412412
self.stream_id.as_ref(),
413413
self.event_store.as_ref(),
414414
) {
415-
event_id = Some(
416-
event_store
417-
.store_event(
418-
session_id.clone(),
419-
stream_id.clone(),
420-
current_timestamp(),
421-
payload.to_owned(),
422-
)
423-
.await,
424-
)
415+
event_id = event_store
416+
.store_event(
417+
session_id.clone(),
418+
stream_id.clone(),
419+
current_timestamp(),
420+
payload.to_owned(),
421+
)
422+
.await
423+
.map(Some)
424+
.unwrap_or_else(|err| {
425+
tracing::error!("{err}");
426+
None
427+
});
425428
};
426429
}
427430

0 commit comments

Comments
 (0)