@@ -99,6 +99,53 @@ def upstream(self, node: T) -> t.Set[T]:
9999
100100 return self ._upstream [node ]
101101
102+ def _find_cycle_path (self , nodes_in_cycle : t .Dict [T , t .Set [T ]]) -> t .Optional [t .List [T ]]:
103+ """Find the exact cycle path using DFS when a cycle is detected.
104+
105+ Args:
106+ nodes_in_cycle: Dictionary of nodes that are part of the cycle and their dependencies
107+
108+ Returns:
109+ List of nodes forming the cycle path, or None if no cycle found
110+ """
111+ if not nodes_in_cycle :
112+ return None
113+
114+ # Use DFS to find a cycle path
115+ visited : t .Set [T ] = set ()
116+ path : t .List [T ] = []
117+
118+ def dfs (node : T ) -> t .Optional [t .List [T ]]:
119+ if node in path :
120+ # Found a cycle - extract the cycle path
121+ cycle_start = path .index (node )
122+ return path [cycle_start :] + [node ]
123+
124+ if node in visited :
125+ return None
126+
127+ visited .add (node )
128+ path .append (node )
129+
130+ # Only follow edges to nodes that are still in the unprocessed set
131+ for neighbor in nodes_in_cycle .get (node , set ()):
132+ if neighbor in nodes_in_cycle :
133+ cycle = dfs (neighbor )
134+ if cycle :
135+ return cycle
136+
137+ path .pop ()
138+ return None
139+
140+ # Try starting DFS from each unvisited node
141+ for start_node in nodes_in_cycle :
142+ if start_node not in visited :
143+ cycle = dfs (start_node )
144+ if cycle :
145+ return cycle [:- 1 ] # Remove the duplicate node at the end
146+
147+ return None
148+
102149 @property
103150 def roots (self ) -> t .Set [T ]:
104151 """Returns all nodes in the graph without any upstream dependencies."""
@@ -125,23 +172,31 @@ def sorted(self) -> t.List[T]:
125172 next_nodes = {node for node , deps in unprocessed_nodes .items () if not deps }
126173
127174 if not next_nodes :
128- # Sort cycle candidates to make the order deterministic
129- cycle_candidates_msg = (
130- "\n Possible candidates to check for circular references: "
131- + ", " .join (str (node ) for node in sorted (cycle_candidates ))
132- )
175+ # A cycle was detected - find the exact cycle path
176+ cycle_path = self ._find_cycle_path (unprocessed_nodes )
133177
134- if last_processed_nodes :
135- last_processed_msg = "\n Last nodes added to the DAG: " + ", " .join (
136- str (node ) for node in last_processed_nodes
178+ last_processed_msg = ""
179+ if cycle_path :
180+ node_output = " ->\n " .join (
181+ str (node ) for node in (cycle_path + [cycle_path [0 ]])
137182 )
183+ cycle_msg = f"\n Cycle:\n { node_output } "
138184 else :
139- last_processed_msg = ""
185+ # Fallback message in case a cycle can't be found
186+ cycle_candidates_msg = (
187+ "\n Possible candidates to check for circular references: "
188+ + ", " .join (str (node ) for node in sorted (cycle_candidates ))
189+ )
190+ cycle_msg = cycle_candidates_msg
191+ if last_processed_nodes :
192+ last_processed_msg = "\n Last nodes added to the DAG: " + ", " .join (
193+ str (node ) for node in last_processed_nodes
194+ )
140195
141196 raise SQLMeshError (
142197 "Detected a cycle in the DAG. "
143198 "Please make sure there are no circular references between nodes."
144- f"{ last_processed_msg } { cycle_candidates_msg } "
199+ f"{ last_processed_msg } { cycle_msg } "
145200 )
146201
147202 for node in next_nodes :
0 commit comments