@@ -183,31 +183,32 @@ def register_job(self, job: jobs.Job) -> MaybeMigrationNode:
183183 object_owner = JobOwnership (self ._admin_locator ).owner_of (JobInfo .from_job (job )),
184184 )
185185 self ._nodes [job_node .key ] = job_node
186- if job .settings :
187- for job_cluster in job .settings .job_clusters or []:
188- maybe_cluster_node = self ._register_job_cluster (job_cluster , job_node )
189- if maybe_cluster_node .node :
190- self ._outgoing_references [job_node .key ].add (maybe_cluster_node .node )
191- for task in job .settings .tasks or []:
192- maybe_task_node = self ._register_workflow_task (task , job_node )
193- problems .extend (maybe_task_node .problems )
194- if maybe_task_node .node :
195- self ._outgoing_references [job_node .key ].add (maybe_task_node .node )
196- # Only after registering all tasks, we can resolve the task dependencies
197- for task in job .settings .tasks or []:
198- task_key = ("TASK" , f"{ job .job_id } /{ task .task_key } " )
199- for task_dependency in task .depends_on or []:
200- task_dependency_key = ("TASK" , f"{ job .job_id } /{ task_dependency .task_key } " )
201- maybe_task_dependency = self ._nodes .get (task_dependency_key )
202- if maybe_task_dependency :
203- self ._outgoing_references [task_key ].add (maybe_task_dependency )
204- else :
205- # Verified that a job with a task having a depends on referring a non-existing task cannot be
206- # created. However, this code is just in case.
207- problem = DependencyProblem (
208- 'task-dependency-not-found' , f"Could not find task: { task_dependency_key [1 ]} "
209- )
210- problems .append (problem )
186+ if not job .settings :
187+ return MaybeMigrationNode (job_node , problems )
188+ for job_cluster in job .settings .job_clusters or []:
189+ maybe_cluster_node = self ._register_job_cluster (job_cluster , job_node )
190+ if maybe_cluster_node .node :
191+ self ._outgoing_references [job_node .key ].add (maybe_cluster_node .node )
192+ for task in job .settings .tasks or []:
193+ maybe_task_node = self ._register_workflow_task (task , job_node )
194+ problems .extend (maybe_task_node .problems )
195+ if maybe_task_node .node :
196+ self ._outgoing_references [job_node .key ].add (maybe_task_node .node )
197+ # Only after registering all tasks, we can resolve the task dependencies
198+ for task in job .settings .tasks or []:
199+ task_key = ("TASK" , f"{ job .job_id } /{ task .task_key } " )
200+ for task_dependency in task .depends_on or []:
201+ task_dependency_key = ("TASK" , f"{ job .job_id } /{ task_dependency .task_key } " )
202+ maybe_task_dependency = self ._nodes .get (task_dependency_key )
203+ if maybe_task_dependency :
204+ self ._outgoing_references [task_key ].add (maybe_task_dependency )
205+ else :
206+ # Verified that a job with a task having a depends on referring a non-existing task cannot be
207+ # created. However, this code is just in case.
208+ problem = DependencyProblem (
209+ 'task-dependency-not-found' , f"Could not find task: { task_dependency_key [1 ]} "
210+ )
211+ problems .append (problem )
211212 return MaybeMigrationNode (job_node , problems )
212213
213214 def _register_workflow_task (self , task : jobs .Task , parent : MigrationNode ) -> MaybeMigrationNode :
0 commit comments