@@ -317,42 +317,109 @@ var _ = ginkgo.Describe("MPIJob", func() {
317317 // Set up the scheduler-plugins.
318318 setUpSchedulerPlugins ()
319319 // Set up the mpi-operator so that the scheduler-plugins is used as gang-scheduler.
320- ginkgo .By ("Scale-In the deployment to 0" )
320+ setupMPIOperator (ctx , mpiJob , enableGangSchedulingFlag , unschedulableResources )
321+ })
322+
323+ ginkgo .AfterEach (func () {
321324 operator , err := k8sClient .AppsV1 ().Deployments (mpiOperator ).Get (ctx , mpiOperator , metav1.GetOptions {})
325+ oldOperator := operator .DeepCopy ()
322326 gomega .Expect (err ).Should (gomega .Succeed ())
323- operator .Spec .Replicas = newInt32 (0 )
324- _ , err = k8sClient .AppsV1 ().Deployments (mpiOperator ).Update (ctx , operator , metav1.UpdateOptions {})
325- gomega .Expect (err ).Should (gomega .Succeed ())
326- gomega .Eventually (func () bool {
327- isNotZero , err := ensureDeploymentAvailableReplicas (ctx , mpiOperator , mpiOperator )
327+ for i , arg := range operator .Spec .Template .Spec .Containers [0 ].Args {
328+ if arg == enableGangSchedulingFlag {
329+ operator .Spec .Template .Spec .Containers [0 ].Args = append (
330+ operator .Spec .Template .Spec .Containers [0 ].Args [:i ], operator .Spec .Template .Spec .Containers [0 ].Args [i + 1 :]... )
331+ break
332+ }
333+ }
334+ if diff := cmp .Diff (oldOperator , operator ); len (diff ) != 0 {
335+ _ , err = k8sClient .AppsV1 ().Deployments (mpiOperator ).Update (ctx , operator , metav1.UpdateOptions {})
328336 gomega .Expect (err ).Should (gomega .Succeed ())
329- return isNotZero
330- }, foreverTimeout , waitInterval ).Should (gomega .BeFalse ())
337+ gomega .Eventually (func () bool {
338+ ok , err := ensureDeploymentAvailableReplicas (ctx , mpiOperator , mpiOperator )
339+ gomega .Expect (err ).Should (gomega .Succeed ())
340+ return ok
341+ }, foreverTimeout , waitInterval ).Should (gomega .BeTrue ())
342+ }
343+ // Clean up the scheduler-plugins.
344+ cleanUpSchedulerPlugins ()
345+ })
346+
347+ ginkgo .It ("should create pending pods" , func () {
348+ ginkgo .By ("Creating MPIJob" )
349+ mpiJob := createJob (ctx , mpiJob )
350+ var jobCondition * kubeflow.JobCondition
351+ gomega .Eventually (func () * kubeflow.JobCondition {
352+ updatedMPIJob , err := mpiClient .KubeflowV2beta1 ().MPIJobs (mpiJob .Namespace ).Get (ctx , mpiJob .Name , metav1.GetOptions {})
353+ gomega .Expect (err ).Should (gomega .Succeed ())
354+ jobCondition = getJobCondition (updatedMPIJob , kubeflow .JobCreated )
355+ return jobCondition
356+ }, foreverTimeout , waitInterval ).ShouldNot (gomega .BeNil ())
357+ gomega .Expect (jobCondition .Status ).To (gomega .Equal (corev1 .ConditionTrue ))
358+
359+ ginkgo .By ("Waiting for Pods to created" )
360+ var pods * corev1.PodList
361+ gomega .Eventually (func () error {
362+ var err error
363+ pods , err = k8sClient .CoreV1 ().Pods (mpiJob .Namespace ).List (ctx , metav1.ListOptions {
364+ LabelSelector : labels .FormatLabels (map [string ]string {
365+ schedv1alpha1 .PodGroupLabel : mpiJob .Name ,
366+ }),
367+ })
368+ return err
369+ }, foreverTimeout , waitInterval ).Should (gomega .BeNil ())
370+ for _ , pod := range pods .Items {
371+ gomega .Expect (pod .Status .Phase ).Should (gomega .Equal (corev1 .PodPending ))
372+ }
373+ pg , err := schedClient .SchedulingV1alpha1 ().PodGroups (mpiJob .Namespace ).Get (ctx , mpiJob .Name , metav1.GetOptions {})
374+ gomega .Expect (err ).NotTo (gomega .HaveOccurred ())
375+ gomega .Expect (pg .Spec .MinResources .Cpu ().String ()).Should (gomega .BeComparableTo (unschedulableResources .Cpu ().String ()))
376+ gomega .Expect (pg .Spec .MinResources .Memory ().String ()).Should (gomega .BeComparableTo (unschedulableResources .Memory ().String ()))
331377
332- ginkgo .By ("Update the replicas and args " )
378+ ginkgo .By ("Updating MPIJob with schedulable schedulingPolicies " )
333379 gomega .Eventually (func () error {
334- updatedOperator , err := k8sClient . AppsV1 ().Deployments ( mpiOperator ).Get (ctx , mpiOperator , metav1.GetOptions {})
380+ updatedJob , err := mpiClient . KubeflowV2beta1 ().MPIJobs ( mpiJob . Namespace ).Get (ctx , mpiJob . Name , metav1.GetOptions {})
335381 gomega .Expect (err ).Should (gomega .Succeed ())
336- updatedOperator .Spec .Template .Spec .Containers [0 ].Args = append (updatedOperator .Spec .Template .Spec .Containers [0 ].Args , enableGangSchedulingFlag )
337- updatedOperator .Spec .Replicas = newInt32 (1 )
338- _ , err = k8sClient .AppsV1 ().Deployments (mpiOperator ).Update (ctx , updatedOperator , metav1.UpdateOptions {})
382+ updatedJob .Spec .RunPolicy .SchedulingPolicy .MinResources = nil
383+ _ , err = mpiClient .KubeflowV2beta1 ().MPIJobs (updatedJob .Namespace ).Update (ctx , updatedJob , metav1.UpdateOptions {})
339384 return err
340385 }, foreverTimeout , waitInterval ).Should (gomega .BeNil ())
341386
342- ginkgo .By ("Should be replicas is 1 " )
343- gomega .Eventually (func () bool {
344- isNotZero , err := ensureDeploymentAvailableReplicas ( ctx , mpiOperator , mpiOperator )
387+ ginkgo .By ("Waiting for MPIJob to running " )
388+ gomega .Eventually (func () corev1. ConditionStatus {
389+ updatedJob , err := mpiClient . KubeflowV2beta1 (). MPIJobs ( mpiJob . Namespace ). Get ( ctx , mpiJob . Name , metav1. GetOptions {} )
345390 gomega .Expect (err ).Should (gomega .Succeed ())
346- return isNotZero
347- }, foreverTimeout , waitInterval ).Should (gomega .BeTrue ())
348- createMPIJobWithOpenMPI (mpiJob )
349- mpiJob .Spec .RunPolicy .SchedulingPolicy = & kubeflow.SchedulingPolicy {MinResources : unschedulableResources }
391+ cond := getJobCondition (updatedJob , kubeflow .JobRunning )
392+ if cond == nil {
393+ return corev1 .ConditionFalse
394+ }
395+ return cond .Status
396+ }, foreverTimeout , waitInterval ).Should (gomega .Equal (corev1 .ConditionTrue ))
397+ })
398+ })
399+
400+ // volcano e2e tests
401+ ginkgo .Context ("with volcano-scheduler" , func () {
402+ const enableGangSchedulingFlag = "--gang-scheduling=volcano"
403+ var (
404+ ctx = context .Background ()
405+ unschedulableResources = & corev1.ResourceList {
406+ corev1 .ResourceCPU : resource .MustParse ("100000" ), // unschedulable
407+ corev1 .ResourceMemory : resource .MustParse ("100000Gi" ), // unschedulable
408+ }
409+ )
410+
411+ ginkgo .BeforeEach (func () {
412+ // Set up the volcano-scheduler.
413+ setupVolcanoScheduler ()
414+ // Set up the mpi-operator so that the volcano scheduler is used as gang-scheduler.
415+ setupMPIOperator (ctx , mpiJob , enableGangSchedulingFlag , unschedulableResources )
350416 })
351417
352418 ginkgo .AfterEach (func () {
353419 operator , err := k8sClient .AppsV1 ().Deployments (mpiOperator ).Get (ctx , mpiOperator , metav1.GetOptions {})
354420 oldOperator := operator .DeepCopy ()
355421 gomega .Expect (err ).Should (gomega .Succeed ())
422+ // disable gang-scheduler in operator
356423 for i , arg := range operator .Spec .Template .Spec .Containers [0 ].Args {
357424 if arg == enableGangSchedulingFlag {
358425 operator .Spec .Template .Spec .Containers [0 ].Args = append (
@@ -369,8 +436,8 @@ var _ = ginkgo.Describe("MPIJob", func() {
369436 return ok
370437 }, foreverTimeout , waitInterval ).Should (gomega .BeTrue ())
371438 }
372- // Clean up the scheduler-plugins .
373- cleanUpSchedulerPlugins ()
439+ // Clean up the volcano .
440+ cleanUpVolcanoScheduler ()
374441 })
375442
376443 ginkgo .It ("should create pending pods" , func () {
@@ -391,15 +458,15 @@ var _ = ginkgo.Describe("MPIJob", func() {
391458 var err error
392459 pods , err = k8sClient .CoreV1 ().Pods (mpiJob .Namespace ).List (ctx , metav1.ListOptions {
393460 LabelSelector : labels .FormatLabels (map [string ]string {
394- schedv1alpha1 . PodGroupLabel : mpiJob .Name ,
461+ common . JobNameLabel : mpiJob .Name ,
395462 }),
396463 })
397464 return err
398465 }, foreverTimeout , waitInterval ).Should (gomega .BeNil ())
399466 for _ , pod := range pods .Items {
400467 gomega .Expect (pod .Status .Phase ).Should (gomega .Equal (corev1 .PodPending ))
401468 }
402- pg , err := schedClient . SchedulingV1alpha1 ().PodGroups (mpiJob .Namespace ).Get (ctx , mpiJob .Name , metav1.GetOptions {})
469+ pg , err := volcanoClient . SchedulingV1beta1 ().PodGroups (mpiJob .Namespace ).Get (ctx , mpiJob .Name , metav1.GetOptions {})
403470 gomega .Expect (err ).NotTo (gomega .HaveOccurred ())
404471 gomega .Expect (pg .Spec .MinResources .Cpu ().String ()).Should (gomega .BeComparableTo (unschedulableResources .Cpu ().String ()))
405472 gomega .Expect (pg .Spec .MinResources .Memory ().String ()).Should (gomega .BeComparableTo (unschedulableResources .Memory ().String ()))
@@ -593,3 +660,53 @@ func cleanUpSchedulerPlugins() {
593660 gomega .Expect (err ).Should (gomega .Succeed ())
594661 }
595662}
663+
664+ func setupVolcanoScheduler () {
665+ if ! useExistingVolcanoScheduler {
666+ ginkgo .By ("Installing volcano-scheduler" )
667+ err := installVolcanoScheduler ()
668+ gomega .Expect (err ).Should (gomega .Succeed ())
669+ }
670+ }
671+
672+ func cleanUpVolcanoScheduler () {
673+ if ! useExistingVolcanoScheduler {
674+ ginkgo .By ("Uninstalling volcano-scheduler" )
675+ err := runCommand (kubectlPath , "delete" , "-f" , volcanoSchedulerManifestPath )
676+ gomega .Expect (err ).Should (gomega .Succeed ())
677+ }
678+ }
679+
680+ // setupMPIOperator scales down and scales up the MPIOperator replication so that set up gang-scheduler takes effect
681+ func setupMPIOperator (ctx context.Context , mpiJob * kubeflow.MPIJob , enableGangSchedulingFlag string , unschedulableResources * corev1.ResourceList ) {
682+ ginkgo .By ("Scale-In the deployment to 0" )
683+ operator , err := k8sClient .AppsV1 ().Deployments (mpiOperator ).Get (ctx , mpiOperator , metav1.GetOptions {})
684+ gomega .Expect (err ).Should (gomega .Succeed ())
685+ operator .Spec .Replicas = newInt32 (0 )
686+ _ , err = k8sClient .AppsV1 ().Deployments (mpiOperator ).Update (ctx , operator , metav1.UpdateOptions {})
687+ gomega .Expect (err ).Should (gomega .Succeed ())
688+ gomega .Eventually (func () bool {
689+ isNotZero , err := ensureDeploymentAvailableReplicas (ctx , mpiOperator , mpiOperator )
690+ gomega .Expect (err ).Should (gomega .Succeed ())
691+ return isNotZero
692+ }, foreverTimeout , waitInterval ).Should (gomega .BeFalse ())
693+
694+ ginkgo .By ("Update the replicas and args" )
695+ gomega .Eventually (func () error {
696+ updatedOperator , err := k8sClient .AppsV1 ().Deployments (mpiOperator ).Get (ctx , mpiOperator , metav1.GetOptions {})
697+ gomega .Expect (err ).Should (gomega .Succeed ())
698+ updatedOperator .Spec .Template .Spec .Containers [0 ].Args = append (updatedOperator .Spec .Template .Spec .Containers [0 ].Args , enableGangSchedulingFlag )
699+ updatedOperator .Spec .Replicas = newInt32 (1 )
700+ _ , err = k8sClient .AppsV1 ().Deployments (mpiOperator ).Update (ctx , updatedOperator , metav1.UpdateOptions {})
701+ return err
702+ }, foreverTimeout , waitInterval ).Should (gomega .BeNil ())
703+
704+ ginkgo .By ("Should be replicas is 1" )
705+ gomega .Eventually (func () bool {
706+ isNotZero , err := ensureDeploymentAvailableReplicas (ctx , mpiOperator , mpiOperator )
707+ gomega .Expect (err ).Should (gomega .Succeed ())
708+ return isNotZero
709+ }, foreverTimeout , waitInterval ).Should (gomega .BeTrue ())
710+ createMPIJobWithOpenMPI (mpiJob )
711+ mpiJob .Spec .RunPolicy .SchedulingPolicy = & kubeflow.SchedulingPolicy {MinResources : unschedulableResources }
712+ }
0 commit comments