diff --git a/gaia.go b/gaia.go index 3ebf1f66..56222e59 100644 --- a/gaia.go +++ b/gaia.go @@ -80,7 +80,10 @@ const ( // RunReschedule status RunReschedule PipelineRunStatus = "reschedule" - + + // RunTimeOut status + RunTimeOut PipelineRunStatus = "timeout" + // JobWaitingExec status JobWaitingExec JobStatus = "waiting for execution" @@ -230,6 +233,7 @@ type Pipeline struct { Tags []string `json:"tags,omitempty"` Docker bool `json:"docker"` CronInst *cron.Cron `json:"-"` + TimeOut int `json:"timeout"` } // GitRepo represents a single git repository @@ -262,6 +266,12 @@ type Argument struct { Value string `json:"value,omitempty"` } +// StartPipelineParam Add the timeout parameter to the pipeline unit:minutes +type StartPipelineParam struct { + TimeOut int `json:"timeout,omitempty"` + Arg []*Argument `json:"arg,omitempty"` +} + // CreatePipeline represents a pipeline which is not yet // compiled. type CreatePipeline struct { @@ -296,6 +306,7 @@ type PipelineRun struct { PipelineTags []string `json:"pipelinetags,omitempty"` Docker bool `json:"docker,omitempty"` DockerWorkerID string `json:"dockerworkerid,omitempty"` + TimeOut int `json:"timeout,omitempty"` } // Worker represents a single registered worker. diff --git a/providers/pipelines/pipeline.go b/providers/pipelines/pipeline.go index 628f5c78..9e6b4d00 100644 --- a/providers/pipelines/pipeline.go +++ b/providers/pipelines/pipeline.go @@ -549,7 +549,7 @@ func (pp *PipelineProvider) PipelineTriggerAuth(c echo.Context) error { // @Produce json // @Security ApiKeyAuth // @Param pipelineid query string true "The ID of the pipeline." -// @Param args body gaia.Argument false "Optional arguments of the pipeline." +// @Param args body gaia.StartPipelineParam false "Optional arguments of the pipeline." // @Success 200 {object} gaia.PipelineRun // @Failure 400 {string} string "Various failures regarding starting the pipeline like: invalid id, invalid docker value and schedule errors" // @Failure 404 {string} string "Pipeline not found" @@ -559,8 +559,8 @@ func (pp *PipelineProvider) PipelineStart(c echo.Context) error { // Look for arguments. // We do not check for errors here cause arguments are optional. - var args []*gaia.Argument - _ = c.Bind(&args) + var param gaia.StartPipelineParam + _ = c.Bind(¶m) // Convert string to int because id is int pipelineID, err := strconv.Atoi(pipelineIDStr) @@ -578,14 +578,19 @@ func (pp *PipelineProvider) PipelineStart(c echo.Context) error { } // Overwrite docker setting - for _, a := range args { + for _, a := range param.Arg { if a.Key == "docker" { foundPipeline.Docker = a.Value == "1" } } - + + // set foundPipeline for the given timeout + if param.TimeOut > 0 { + foundPipeline.TimeOut = param.TimeOut + } + if foundPipeline.Name != "" { - pipelineRun, err := pp.deps.Scheduler.SchedulePipeline(&foundPipeline, gaia.StartReasonManual, args) + pipelineRun, err := pp.deps.Scheduler.SchedulePipeline(&foundPipeline, gaia.StartReasonManual, param.Arg) if err != nil { return c.String(http.StatusBadRequest, err.Error()) } else if pipelineRun != nil { diff --git a/workers/scheduler/gaiascheduler/scheduler.go b/workers/scheduler/gaiascheduler/scheduler.go index 9c6dc643..de798e95 100644 --- a/workers/scheduler/gaiascheduler/scheduler.go +++ b/workers/scheduler/gaiascheduler/scheduler.go @@ -458,6 +458,7 @@ func (s *Scheduler) SchedulePipeline(p *gaia.Pipeline, startedReason string, arg PipelineTags: p.Tags, Docker: p.Docker, StartReason: startedReason, + TimeOut: p.TimeOut, } // Put run into store @@ -579,11 +580,13 @@ func (s *Scheduler) executeScheduledJobs(r gaia.PipelineRun, pS plugin.Plugin) { runFail = true } } - - if runFail && r.Status != gaia.RunCancelled { + + if runFail && r.Status != gaia.RunCancelled && r.Status != gaia.RunTimeOut { s.finishPipelineRun(&r, gaia.RunFailed) } else if r.Status == gaia.RunCancelled { s.finishPipelineRun(&r, gaia.RunCancelled) + } else if r.Status == gaia.RunTimeOut { + s.finishPipelineRun(&r, gaia.RunTimeOut) } else { s.finishPipelineRun(&r, gaia.RunSuccess) } @@ -631,6 +634,30 @@ func (s *Scheduler) executeScheduler(r *gaia.PipelineRun, pS plugin.Plugin) { } }() + // This is usually used when a job failed and the whole pipeline + // should be timeout. + timeOutChan := make(chan bool) + if r.TimeOut > 0 { + // Create a new timeOutTicker (scheduled go routine) which periodically + // check pipeline timeout + timeOutTicker := time.NewTicker(time.Duration(r.TimeOut) * time.Minute) + go func() { + defer ticker.Stop() + for { + select { + case <-timeOutTicker.C: + timeOutChan <- true + close(timeOutChan) + return + case _, ok := <-pipelineFinished: + if !ok { + return + } + } + } + }() + } + // Separate channel to save updates about the status of job executions. triggerSave := make(chan gaia.Job) @@ -657,6 +684,22 @@ func (s *Scheduler) executeScheduler(r *gaia.PipelineRun, pS plugin.Plugin) { return } } + case _, ok := <-timeOutChan: + if ok { + for _, job := range r.Jobs { + if job.Status == gaia.JobRunning || job.Status == gaia.JobWaitingExec { + job.Status = gaia.JobFailed + job.FailPipeline = true + } + } + r.Status = gaia.RunTimeOut + _ = s.storeService.PipelinePutRun(r) + close(done) + close(executeScheduler) + finished <- true + finalize = true + return + } case <-finished: close(pipelineFinished) return