Skip to content

Commit 87791a1

Browse files
authored
fix: add send object serialization support (#122) (#123)
* fix: Add Send object serialization support Fixes the serialization issue with langgraph.types.Send objects reported in issue #94. Previously, Send objects were not being correctly serialized and deserialized, which caused them to be returned as plain dictionaries after a roundtrip. This led to `isinstance(packet, Send)` checks failing in LangGraph's `prepare_single_task` function, resulting in tasks not being added when handling interrupts. Changes: - Added Send object serialization in JsonPlusRedisSerializer._default_handler with a `__send__` type marker (similar to Interrupt handling) - Added Send object preprocessing in _preprocess_interrupts method - Added Send object deserialization in _revive_if_needed method - Added Send object deserialization in base.py's _recursive_deserialize method - Added comprehensive test suite (tests/test_send_serialization.py) with 6 tests covering various Send object scenarios The fix ensures that Send objects maintain their type identity throughout the serialization/deserialization cycle, allowing interrupt handling to work correctly. Resolves: #94 * release: bump version for 0.2.1 release
1 parent ac683ee commit 87791a1

File tree

4 files changed

+256
-3
lines changed

4 files changed

+256
-3
lines changed

langgraph/checkpoint/redis/base.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,25 @@ def _recursive_deserialize(self, obj: Any) -> Any:
394394
# Decode base64-encoded bytes
395395
return self._decode_blob(obj["__bytes__"])
396396

397+
# Check if this is a Send object marker (issue #94)
398+
if (
399+
obj.get("__send__") is True
400+
and "node" in obj
401+
and "arg" in obj
402+
and len(obj) == 3
403+
):
404+
try:
405+
from langgraph.types import Send
406+
407+
return Send(
408+
node=obj["node"],
409+
arg=self._recursive_deserialize(obj["arg"]),
410+
)
411+
except (ImportError, TypeError, ValueError) as e:
412+
logger.debug(
413+
"Failed to deserialize Send object: %s", e, exc_info=True
414+
)
415+
397416
# Check if this is a LangChain serialized object
398417
if obj.get("lc") in (1, 2) and obj.get("type") == "constructor":
399418
try:

langgraph/checkpoint/redis/jsonplus_redis.py

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,16 @@ def _default_handler(self, obj: Any) -> Any:
4848
"id": obj.id,
4949
}
5050

51+
# Handle Send objects with a type marker (issue #94)
52+
from langgraph.types import Send
53+
54+
if isinstance(obj, Send):
55+
return {
56+
"__send__": True,
57+
"node": obj.node,
58+
"arg": obj.arg,
59+
}
60+
5161
# Try to encode using parent's constructor args encoder
5262
# This creates the {"lc": 2, "type": "constructor", ...} format
5363
try:
@@ -69,14 +79,14 @@ def _default_handler(self, obj: Any) -> Any:
6979
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
7080

7181
def _preprocess_interrupts(self, obj: Any) -> Any:
72-
"""Recursively add type markers to Interrupt objects before serialization.
82+
"""Recursively add type markers to Interrupt and Send objects before serialization.
7383
7484
This prevents false positives where user data with {value, id} fields
7585
could be incorrectly deserialized as Interrupt objects.
7686
7787
Also handles dataclass instances to preserve type information during serialization.
7888
"""
79-
from langgraph.types import Interrupt
89+
from langgraph.types import Interrupt, Send
8090

8191
if isinstance(obj, Interrupt):
8292
# Add type marker to distinguish from plain dicts
@@ -85,6 +95,13 @@ def _preprocess_interrupts(self, obj: Any) -> Any:
8595
"value": self._preprocess_interrupts(obj.value),
8696
"id": obj.id,
8797
}
98+
elif isinstance(obj, Send):
99+
# Add type marker to distinguish from plain dicts (issue #94)
100+
return {
101+
"__send__": True,
102+
"node": obj.node,
103+
"arg": self._preprocess_interrupts(obj.arg),
104+
}
88105
elif isinstance(obj, set):
89106
# Handle sets by converting to list for JSON serialization
90107
# Will be reconstructed back to set on deserialization
@@ -277,6 +294,28 @@ def _revive_if_needed(self, obj: Any) -> Any:
277294
"Failed to deserialize Interrupt object: %s", e, exc_info=True
278295
)
279296

297+
# Check if this is a serialized Send object with type marker (issue #94)
298+
# Send objects serialize to {"__send__": True, "node": ..., "arg": ...}
299+
if (
300+
obj.get("__send__") is True
301+
and "node" in obj
302+
and "arg" in obj
303+
and len(obj) == 3
304+
):
305+
# Try to reconstruct as a Send object
306+
try:
307+
from langgraph.types import Send
308+
309+
return Send(
310+
node=obj["node"],
311+
arg=self._revive_if_needed(obj["arg"]),
312+
)
313+
except (ImportError, TypeError, ValueError) as e:
314+
# If we can't import or construct Send, log and fall through
315+
logger.debug(
316+
"Failed to deserialize Send object: %s", e, exc_info=True
317+
)
318+
280319
# Recursively process nested dicts
281320
return {k: self._revive_if_needed(v) for k, v in obj.items()}
282321
elif isinstance(obj, list):

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "langgraph-checkpoint-redis"
3-
version = "0.2.0"
3+
version = "0.2.1"
44
description = "Redis implementation of the LangGraph agent checkpoint saver and store."
55
authors = ["Redis Inc. <applied.ai@redis.com>", "Brian Sam-Bodden <bsb@redis.io>"]
66
license = "MIT"

tests/test_send_serialization.py

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
"""Test Send object serialization fix for issue #94.
2+
3+
This test validates that langgraph.types.Send objects are properly serialized
4+
and deserialized by the JsonPlusRedisSerializer.
5+
6+
Before the fix, Send objects were not serialized correctly, which led to issues
7+
with handling Interrupts - namely the user's response would not be treated as
8+
the response to the Interrupt.
9+
10+
The issue occurs in `prepare_single_task` in pregel._algo.py where:
11+
```
12+
if not isinstance(packet, Send):
13+
logger.warning(
14+
f"Ignoring invalid packet type {type(packet)} in pending sends"
15+
)
16+
return. <<<<< task not added
17+
```
18+
19+
The fix adds custom serialization/deserialization for Send objects similar to
20+
how Interrupt objects are handled.
21+
"""
22+
23+
import pytest
24+
from langgraph.types import Send
25+
26+
from langgraph.checkpoint.redis.jsonplus_redis import JsonPlusRedisSerializer
27+
28+
29+
def test_send_object_serialization():
30+
"""Test that Send objects are properly serialized and deserialized.
31+
32+
Before the fix, Send objects would not serialize correctly, causing
33+
isinstance(packet, Send) checks to fail after deserialization.
34+
"""
35+
serializer = JsonPlusRedisSerializer()
36+
37+
# Create a Send object
38+
send_obj = Send(node="my_node", arg={"key": "value"})
39+
40+
# Serialize
41+
type_str, blob = serializer.dumps_typed(send_obj)
42+
assert type_str == "json"
43+
assert isinstance(blob, bytes)
44+
45+
# Deserialize
46+
deserialized = serializer.loads_typed((type_str, blob))
47+
48+
# Critical check: the deserialized object must be an instance of Send
49+
assert isinstance(deserialized, Send), (
50+
f"Expected Send instance, got {type(deserialized)}. "
51+
"This will cause isinstance(packet, Send) checks to fail!"
52+
)
53+
assert deserialized.node == "my_node"
54+
assert deserialized.arg == {"key": "value"}
55+
assert deserialized == send_obj
56+
57+
58+
def test_send_object_in_pending_sends_list():
59+
"""Test that Send objects in pending_sends lists are properly handled.
60+
61+
This simulates the scenario where Send objects are stored in checkpoint
62+
pending_sends and must be correctly deserialized for interrupt handling.
63+
"""
64+
serializer = JsonPlusRedisSerializer()
65+
66+
# Create multiple Send objects as they would appear in pending_sends
67+
pending_sends = [
68+
Send(node="node1", arg={"data": "first"}),
69+
Send(node="node2", arg={"data": "second"}),
70+
Send(node="node3", arg={"data": "third"}),
71+
]
72+
73+
# Serialize the list
74+
type_str, blob = serializer.dumps_typed(pending_sends)
75+
76+
# Deserialize
77+
deserialized = serializer.loads_typed((type_str, blob))
78+
79+
# Verify all items are still Send instances
80+
assert isinstance(deserialized, list)
81+
assert len(deserialized) == 3
82+
83+
for i, send_obj in enumerate(deserialized):
84+
assert isinstance(
85+
send_obj, Send
86+
), f"Item {i} is not a Send instance: {type(send_obj)}"
87+
88+
assert deserialized[0].node == "node1"
89+
assert deserialized[1].node == "node2"
90+
assert deserialized[2].node == "node3"
91+
92+
93+
def test_send_object_with_complex_args():
94+
"""Test Send objects with complex nested arguments."""
95+
serializer = JsonPlusRedisSerializer()
96+
97+
# Create Send with complex nested arg
98+
complex_arg = {
99+
"messages": ["msg1", "msg2"],
100+
"metadata": {
101+
"step": 1,
102+
"config": {
103+
"model": "gpt-4",
104+
"temperature": 0.7,
105+
},
106+
},
107+
"nested_list": [
108+
{"a": 1, "b": 2},
109+
{"c": 3, "d": 4},
110+
],
111+
}
112+
113+
send_obj = Send(node="processor", arg=complex_arg)
114+
115+
# Serialize and deserialize
116+
type_str, blob = serializer.dumps_typed(send_obj)
117+
deserialized = serializer.loads_typed((type_str, blob))
118+
119+
# Verify type and structure
120+
assert isinstance(deserialized, Send)
121+
assert deserialized.node == "processor"
122+
assert deserialized.arg == complex_arg
123+
assert deserialized.arg["metadata"]["config"]["model"] == "gpt-4"
124+
125+
126+
def test_send_object_in_checkpoint_structure():
127+
"""Test Send objects embedded in checkpoint-like structures.
128+
129+
This simulates how Send objects appear in actual checkpoint data.
130+
"""
131+
serializer = JsonPlusRedisSerializer()
132+
133+
# Simulate checkpoint structure with pending_sends
134+
checkpoint_data = {
135+
"v": 1,
136+
"id": "checkpoint_1",
137+
"pending_sends": [
138+
Send(node="task1", arg={"task_data": "A"}),
139+
Send(node="task2", arg={"task_data": "B"}),
140+
],
141+
"channel_values": {"messages": ["msg1", "msg2"]},
142+
}
143+
144+
# Serialize and deserialize
145+
type_str, blob = serializer.dumps_typed(checkpoint_data)
146+
deserialized = serializer.loads_typed((type_str, blob))
147+
148+
# Verify Send objects are preserved correctly
149+
assert "pending_sends" in deserialized
150+
assert len(deserialized["pending_sends"]) == 2
151+
152+
for send_obj in deserialized["pending_sends"]:
153+
assert isinstance(
154+
send_obj, Send
155+
), f"pending_sends contains non-Send object: {type(send_obj)}"
156+
157+
158+
def test_send_object_equality_after_roundtrip():
159+
"""Test that Send objects maintain equality after serialization roundtrip."""
160+
serializer = JsonPlusRedisSerializer()
161+
162+
send1 = Send(node="test_node", arg={"value": 42})
163+
164+
# Serialize and deserialize
165+
type_str, blob = serializer.dumps_typed(send1)
166+
send2 = serializer.loads_typed((type_str, blob))
167+
168+
# Send objects should be equal
169+
assert send1 == send2
170+
171+
# Test hash equality with hashable args
172+
send_hashable1 = Send(node="test", arg="hashable_string")
173+
type_str, blob = serializer.dumps_typed(send_hashable1)
174+
send_hashable2 = serializer.loads_typed((type_str, blob))
175+
assert hash(send_hashable1) == hash(send_hashable2)
176+
177+
178+
def test_send_object_with_none_arg():
179+
"""Test Send object with None as argument."""
180+
serializer = JsonPlusRedisSerializer()
181+
182+
send_obj = Send(node="null_handler", arg=None)
183+
184+
# Serialize and deserialize
185+
type_str, blob = serializer.dumps_typed(send_obj)
186+
deserialized = serializer.loads_typed((type_str, blob))
187+
188+
assert isinstance(deserialized, Send)
189+
assert deserialized.node == "null_handler"
190+
assert deserialized.arg is None
191+
192+
193+
if __name__ == "__main__":
194+
# Run tests
195+
pytest.main([__file__, "-v"])

0 commit comments

Comments
 (0)