diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index 2a7b844cc..7a407d95d 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -56,6 +56,8 @@ type job struct { QueuedAt time.Time `json:"queued_at,omitempty"` StartedAt time.Time `json:"started_at,omitempty"` FinishedAt time.Time `json:"finished_at,omitempty"` + + Canceled bool `json:"canceled,omitempty"` } // Create a new fsJobQueue object for `dir`. This object must have exclusive @@ -161,31 +163,39 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string, args interf } } - // Unlock the mutex while polling channels, so that multiple goroutines - // can wait at the same time. - q.mu.Unlock() - id, err := selectUUIDChannel(ctx, chans) - q.mu.Lock() + // Loop until finding a non-canceled job. + var j *job + for { + // Unlock the mutex while polling channels, so that multiple goroutines + // can wait at the same time. + q.mu.Unlock() + id, err := selectUUIDChannel(ctx, chans) + q.mu.Lock() - if err != nil { - return uuid.Nil, err + if err != nil { + return uuid.Nil, err + } + + j, err = q.readJob(id) + if err != nil { + return uuid.Nil, err + } + + if !j.Canceled { + break + } } - j, err := q.readJob(id) - if err != nil { - return uuid.Nil, err - } - - err = json.Unmarshal(j.Args, args) + err := json.Unmarshal(j.Args, args) if err != nil { return uuid.Nil, fmt.Errorf("error unmarshaling arguments for job '%s': %v", j.Id, err) } j.StartedAt = time.Now() - err = q.db.Write(id.String(), j) + err = q.db.Write(j.Id.String(), j) if err != nil { - return uuid.Nil, fmt.Errorf("error writing job %s: %v", id, err) + return uuid.Nil, fmt.Errorf("error writing job %s: %v", j.Id, err) } return j.Id, nil @@ -200,6 +210,10 @@ func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error { return err } + if j.Canceled { + return jobqueue.ErrCanceled + } + if j.StartedAt.IsZero() || !j.FinishedAt.IsZero() { return jobqueue.ErrNotRunning } @@ -232,13 +246,36 @@ func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error { return nil } -func (q *fsJobQueue) JobStatus(id uuid.UUID, result interface{}) (queued, started, finished time.Time, err error) { +func (q *fsJobQueue) CancelJob(id uuid.UUID) error { + q.mu.Lock() + defer q.mu.Unlock() + + j, err := q.readJob(id) + if err != nil { + return err + } + + if !j.FinishedAt.IsZero() { + return nil + } + + j.Canceled = true + + err = q.db.Write(id.String(), j) + if err != nil { + return fmt.Errorf("error writing job %s: %v", id, err) + } + + return nil +} + +func (q *fsJobQueue) JobStatus(id uuid.UUID, result interface{}) (queued, started, finished time.Time, canceled bool, err error) { j, err := q.readJob(id) if err != nil { return } - if !j.FinishedAt.IsZero() { + if !j.FinishedAt.IsZero() && !j.Canceled { err = json.Unmarshal(j.Result, result) if err != nil { err = fmt.Errorf("error unmarshaling result for job '%s': %v", id, err) @@ -249,6 +286,7 @@ func (q *fsJobQueue) JobStatus(id uuid.UUID, result interface{}) (queued, starte queued = j.QueuedAt started = j.StartedAt finished = j.FinishedAt + canceled = j.Canceled return } diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go index 4661964e2..108da8677 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go @@ -132,19 +132,21 @@ func TestDependencies(t *testing.T) { require.ElementsMatch(t, []uuid.UUID{one, two}, r) j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two}) - queued, started, finished, err := q.JobStatus(j, nil) + queued, started, finished, canceled, err := q.JobStatus(j, nil) require.NoError(t, err) require.True(t, !queued.IsZero()) require.True(t, started.IsZero()) require.True(t, finished.IsZero()) + require.False(t, canceled) require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{})) - queued, started, finished, err = q.JobStatus(j, &testResult{}) + queued, started, finished, canceled, err = q.JobStatus(j, &testResult{}) require.NoError(t, err) require.True(t, !queued.IsZero()) require.True(t, !started.IsZero()) require.True(t, !finished.IsZero()) + require.False(t, canceled) }) t.Run("done-after-pushing-dependant", func(t *testing.T) { @@ -152,11 +154,12 @@ func TestDependencies(t *testing.T) { two := pushTestJob(t, q, "test", nil, nil) j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two}) - queued, started, finished, err := q.JobStatus(j, nil) + queued, started, finished, canceled, err := q.JobStatus(j, nil) require.NoError(t, err) require.True(t, !queued.IsZero()) require.True(t, started.IsZero()) require.True(t, finished.IsZero()) + require.False(t, canceled) r := []uuid.UUID{} r = append(r, finishNextTestJob(t, q, "test", testResult{})) @@ -165,11 +168,12 @@ func TestDependencies(t *testing.T) { require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{})) - queued, started, finished, err = q.JobStatus(j, &testResult{}) + queued, started, finished, canceled, err = q.JobStatus(j, &testResult{}) require.NoError(t, err) require.True(t, !queued.IsZero()) require.True(t, !started.IsZero()) require.True(t, !finished.IsZero()) + require.False(t, canceled) }) } @@ -203,3 +207,51 @@ func TestMultipleWorkers(t *testing.T) { _ = pushTestJob(t, q, "octopus", nil, nil) <-done } + +func TestCancel(t *testing.T) { + q, dir := newTemporaryQueue(t, []string{"octopus", "clownfish"}) + defer cleanupTempDir(t, dir) + + // Cancel a non-existing job + err := q.CancelJob(uuid.New()) + require.Error(t, err) + + // Cancel a pending job + id := pushTestJob(t, q, "clownfish", nil, nil) + require.NotEmpty(t, id) + err = q.CancelJob(id) + require.NoError(t, err) + _, _, _, canceled, err := q.JobStatus(id, &testResult{}) + require.NoError(t, err) + require.True(t, canceled) + err = q.FinishJob(id, &testResult{}) + require.Error(t, err) + + // Cancel a running job, which should not dequeue the canceled job from above + id = pushTestJob(t, q, "clownfish", nil, nil) + require.NotEmpty(t, id) + r, err := q.Dequeue(context.Background(), []string{"clownfish"}, &json.RawMessage{}) + require.NoError(t, err) + require.Equal(t, id, r) + err = q.CancelJob(id) + require.NoError(t, err) + _, _, _, canceled, err = q.JobStatus(id, &testResult{}) + require.NoError(t, err) + require.True(t, canceled) + err = q.FinishJob(id, &testResult{}) + require.Error(t, err) + + // Cancel a finished job, which is a no-op + id = pushTestJob(t, q, "clownfish", nil, nil) + require.NotEmpty(t, id) + r, err = q.Dequeue(context.Background(), []string{"clownfish"}, &json.RawMessage{}) + require.NoError(t, err) + require.Equal(t, id, r) + err = q.FinishJob(id, &testResult{}) + require.NoError(t, err) + err = q.CancelJob(id) + require.NoError(t, err) + _, _, _, canceled, err = q.JobStatus(id, &testResult{}) + require.NoError(t, err) + require.False(t, canceled) +} diff --git a/internal/jobqueue/jobqueue.go b/internal/jobqueue/jobqueue.go index 1f4f9bd0b..6dbd96b0e 100644 --- a/internal/jobqueue/jobqueue.go +++ b/internal/jobqueue/jobqueue.go @@ -48,16 +48,20 @@ type JobQueue interface { // job type and must be serializable to JSON. FinishJob(id uuid.UUID, result interface{}) error + // Cancel a job. Does nothing if the job has already finished. + CancelJob(id uuid.UUID) error + // Returns the current status of the job, in the form of three times: // queued, started, and finished. `started` and `finished` might be the // zero time (check with t.IsZero()), when the job is not running or // finished, respectively. // // If the job is finished, its result will be returned in `result`. - JobStatus(id uuid.UUID, result interface{}) (queued, started, finished time.Time, err error) + JobStatus(id uuid.UUID, result interface{}) (queued, started, finished time.Time, canceled bool, err error) } var ( ErrNotExist = errors.New("job does not exist") ErrNotRunning = errors.New("job is not running") + ErrCanceled = errors.New("job ws canceled") ) diff --git a/internal/jobqueue/testjobqueue/testjobqueue.go b/internal/jobqueue/testjobqueue/testjobqueue.go index 97f264d72..2c0d5e3f1 100644 --- a/internal/jobqueue/testjobqueue/testjobqueue.go +++ b/internal/jobqueue/testjobqueue/testjobqueue.go @@ -33,6 +33,7 @@ type job struct { QueuedAt time.Time StartedAt time.Time FinishedAt time.Time + Canceled bool } func New() *testJobQueue { @@ -134,9 +135,18 @@ func (q *testJobQueue) FinishJob(id uuid.UUID, result interface{}) error { return nil } -func (q *testJobQueue) JobStatus(id uuid.UUID, result interface{}) (queued, started, finished time.Time, err error) { - var j *job +func (q *testJobQueue) CancelJob(id uuid.UUID) error { + j, exists := q.jobs[id] + if !exists { + return jobqueue.ErrNotExist + } + j.Canceled = true + + return nil +} + +func (q *testJobQueue) JobStatus(id uuid.UUID, result interface{}) (queued, started, finished time.Time, canceled bool, err error) { j, exists := q.jobs[id] if !exists { err = jobqueue.ErrNotExist @@ -153,6 +163,7 @@ func (q *testJobQueue) JobStatus(id uuid.UUID, result interface{}) (queued, star queued = j.QueuedAt started = j.StartedAt finished = j.FinishedAt + canceled = j.Canceled return } diff --git a/internal/worker/server.go b/internal/worker/server.go index 98fde6dae..768120c14 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -79,14 +79,17 @@ func (s *Server) Enqueue(manifest *osbuild.Manifest, targets []*target.Target) ( } func (s *Server) JobStatus(id uuid.UUID) (state common.ComposeState, queued, started, finished time.Time, err error) { + var canceled bool var result OSBuildJobResult - queued, started, finished, err = s.jobs.JobStatus(id, &result) + queued, started, finished, canceled, err = s.jobs.JobStatus(id, &result) if err != nil { return } - if !finished.IsZero() { + if canceled { + state = common.CFailed + } else if !finished.IsZero() { if result.OSBuildOutput.Success { state = common.CFinished } else { @@ -102,15 +105,18 @@ func (s *Server) JobStatus(id uuid.UUID) (state common.ComposeState, queued, sta } func (s *Server) JobResult(id uuid.UUID) (common.ComposeState, *common.ComposeResult, error) { + var canceled bool var result OSBuildJobResult - _, started, finished, err := s.jobs.JobStatus(id, &result) + _, started, finished, canceled, err := s.jobs.JobStatus(id, &result) if err != nil { return common.CWaiting, nil, err } state := common.CWaiting - if !finished.IsZero() { + if canceled { + state = common.CFailed + } else if !finished.IsZero() { if result.OSBuildOutput.Success { state = common.CFinished } else {