@@ -1070,12 +1070,13 @@ async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
10701070 ret = False
10711071 for cmd in commands :
10721072 try :
1073- cmd . result = await self .parse_response (
1073+ result = await self .parse_response (
10741074 connection , cmd .args [0 ], ** cmd .kwargs
10751075 )
10761076 except Exception as e :
1077- cmd . result = e
1077+ result = e
10781078 ret = True
1079+ cmd .set_node_result (self .name , result )
10791080
10801081 # Release connection
10811082 self ._free .append (connection )
@@ -1530,12 +1531,11 @@ async def _execute(
15301531 raise RedisClusterException (
15311532 f"No targets were found to execute { cmd .args } command on"
15321533 )
1533- if len (target_nodes ) > 1 :
1534- raise RedisClusterException (f"Too many targets for command { cmd .args } " )
1535- node = target_nodes [0 ]
1536- if node .name not in nodes :
1537- nodes [node .name ] = (node , [])
1538- nodes [node .name ][1 ].append (cmd )
1534+ cmd .target_nodes = target_nodes
1535+ for node in target_nodes :
1536+ if node .name not in nodes :
1537+ nodes [node .name ] = (node , [])
1538+ nodes [node .name ][1 ].append (cmd )
15391539
15401540 errors = await asyncio .gather (
15411541 * (
@@ -1550,20 +1550,27 @@ async def _execute(
15501550 for cmd in todo :
15511551 if isinstance (cmd .result , (TryAgainError , MovedError , AskError )):
15521552 try :
1553- cmd . result = await client .execute_command (
1553+ result = await client .execute_command (
15541554 * cmd .args , ** cmd .kwargs
15551555 )
15561556 except Exception as e :
1557- cmd .result = e
1557+ result = e
1558+
1559+ if isinstance (result , dict ):
1560+ cmd .result = result
1561+ else :
1562+ cmd .set_node_result (cmd .target_nodes [0 ].name , result )
15581563
15591564 if raise_on_error :
15601565 for cmd in todo :
1561- result = cmd .result
1562- if isinstance (result , Exception ):
1566+ name_exc = cmd .get_first_exception ()
1567+ if name_exc :
1568+ name , exc = name_exc
15631569 command = " " .join (map (safe_str , cmd .args ))
15641570 msg = (
15651571 f"Command # { cmd .position + 1 } ({ command } ) of pipeline "
1566- f"caused error: { result .args } "
1572+ f"caused error on node { name } : "
1573+ f"{ result .args } "
15671574 )
15681575 result .args = (msg ,) + result .args [1 :]
15691576 raise result
@@ -1581,7 +1588,7 @@ async def _execute(
15811588 client .replace_default_node ()
15821589 break
15831590
1584- return [cmd .result for cmd in stack ]
1591+ return [cmd .unwrap_result () for cmd in stack ]
15851592
15861593 def _split_command_across_slots (
15871594 self , command : str , * keys : KeyT
@@ -1620,7 +1627,25 @@ def __init__(self, position: int, *args: Any, **kwargs: Any) -> None:
16201627 self .args = args
16211628 self .kwargs = kwargs
16221629 self .position = position
1623- self .result : Union [Any , Exception ] = None
1630+ self .result : Dict [str , Union [Any , Exception ]] = {}
1631+ self .target_nodes = None
1632+
1633+ def set_node_result (self , node_name : str , result : Union [Any , Exception ]):
1634+ self .result [node_name ] = result
1635+
1636+ def unwrap_result (
1637+ self ,
1638+ ) -> Optional [Union [Union [Any , Exception ], Dict [str , Union [Any , Exception ]]]]:
1639+ if len (self .result ) == 0 :
1640+ return None
1641+ if len (self .result ) == 1 :
1642+ return next (iter (self .result .values ()))
1643+ return self .result
1644+
1645+ def get_first_exception (self ) -> Optional [Tuple [str , Exception ]]:
1646+ return next (
1647+ ((n , r ) for n , r in self .result .items () if isinstance (r , Exception )), None
1648+ )
16241649
16251650 def __repr__ (self ) -> str :
16261651 return f"[{ self .position } ] { self .args } ({ self .kwargs } )"
0 commit comments