jobqueue: allow canceling jobs

This is not exposed to a worker yet. It will continue the job and get an
error when it tries to update the job's status to finished.
This commit is contained in:
Lars Karlitski 2020-05-10 19:19:32 +02:00 committed by Tom Gundersen
parent b795ca25a2
commit 27e8e4b5d5
5 changed files with 139 additions and 28 deletions

View file

@ -56,6 +56,8 @@ type job struct {
QueuedAt time.Time `json:"queued_at,omitempty"`
StartedAt time.Time `json:"started_at,omitempty"`
FinishedAt time.Time `json:"finished_at,omitempty"`
Canceled bool `json:"canceled,omitempty"`
}
// Create a new fsJobQueue object for `dir`. This object must have exclusive
@ -161,31 +163,39 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string, args interf
}
}
// Unlock the mutex while polling channels, so that multiple goroutines
// can wait at the same time.
q.mu.Unlock()
id, err := selectUUIDChannel(ctx, chans)
q.mu.Lock()
// Loop until finding a non-canceled job.
var j *job
for {
// Unlock the mutex while polling channels, so that multiple goroutines
// can wait at the same time.
q.mu.Unlock()
id, err := selectUUIDChannel(ctx, chans)
q.mu.Lock()
if err != nil {
return uuid.Nil, err
if err != nil {
return uuid.Nil, err
}
j, err = q.readJob(id)
if err != nil {
return uuid.Nil, err
}
if !j.Canceled {
break
}
}
j, err := q.readJob(id)
if err != nil {
return uuid.Nil, err
}
err = json.Unmarshal(j.Args, args)
err := json.Unmarshal(j.Args, args)
if err != nil {
return uuid.Nil, fmt.Errorf("error unmarshaling arguments for job '%s': %v", j.Id, err)
}
j.StartedAt = time.Now()
err = q.db.Write(id.String(), j)
err = q.db.Write(j.Id.String(), j)
if err != nil {
return uuid.Nil, fmt.Errorf("error writing job %s: %v", id, err)
return uuid.Nil, fmt.Errorf("error writing job %s: %v", j.Id, err)
}
return j.Id, nil
@ -200,6 +210,10 @@ func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
return err
}
if j.Canceled {
return jobqueue.ErrCanceled
}
if j.StartedAt.IsZero() || !j.FinishedAt.IsZero() {
return jobqueue.ErrNotRunning
}
@ -232,13 +246,36 @@ func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
return nil
}
func (q *fsJobQueue) JobStatus(id uuid.UUID, result interface{}) (queued, started, finished time.Time, err error) {
func (q *fsJobQueue) CancelJob(id uuid.UUID) error {
q.mu.Lock()
defer q.mu.Unlock()
j, err := q.readJob(id)
if err != nil {
return err
}
if !j.FinishedAt.IsZero() {
return nil
}
j.Canceled = true
err = q.db.Write(id.String(), j)
if err != nil {
return fmt.Errorf("error writing job %s: %v", id, err)
}
return nil
}
func (q *fsJobQueue) JobStatus(id uuid.UUID, result interface{}) (queued, started, finished time.Time, canceled bool, err error) {
j, err := q.readJob(id)
if err != nil {
return
}
if !j.FinishedAt.IsZero() {
if !j.FinishedAt.IsZero() && !j.Canceled {
err = json.Unmarshal(j.Result, result)
if err != nil {
err = fmt.Errorf("error unmarshaling result for job '%s': %v", id, err)
@ -249,6 +286,7 @@ func (q *fsJobQueue) JobStatus(id uuid.UUID, result interface{}) (queued, starte
queued = j.QueuedAt
started = j.StartedAt
finished = j.FinishedAt
canceled = j.Canceled
return
}

View file

@ -132,19 +132,21 @@ func TestDependencies(t *testing.T) {
require.ElementsMatch(t, []uuid.UUID{one, two}, r)
j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
queued, started, finished, err := q.JobStatus(j, nil)
queued, started, finished, canceled, err := q.JobStatus(j, nil)
require.NoError(t, err)
require.True(t, !queued.IsZero())
require.True(t, started.IsZero())
require.True(t, finished.IsZero())
require.False(t, canceled)
require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}))
queued, started, finished, err = q.JobStatus(j, &testResult{})
queued, started, finished, canceled, err = q.JobStatus(j, &testResult{})
require.NoError(t, err)
require.True(t, !queued.IsZero())
require.True(t, !started.IsZero())
require.True(t, !finished.IsZero())
require.False(t, canceled)
})
t.Run("done-after-pushing-dependant", func(t *testing.T) {
@ -152,11 +154,12 @@ func TestDependencies(t *testing.T) {
two := pushTestJob(t, q, "test", nil, nil)
j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
queued, started, finished, err := q.JobStatus(j, nil)
queued, started, finished, canceled, err := q.JobStatus(j, nil)
require.NoError(t, err)
require.True(t, !queued.IsZero())
require.True(t, started.IsZero())
require.True(t, finished.IsZero())
require.False(t, canceled)
r := []uuid.UUID{}
r = append(r, finishNextTestJob(t, q, "test", testResult{}))
@ -165,11 +168,12 @@ func TestDependencies(t *testing.T) {
require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}))
queued, started, finished, err = q.JobStatus(j, &testResult{})
queued, started, finished, canceled, err = q.JobStatus(j, &testResult{})
require.NoError(t, err)
require.True(t, !queued.IsZero())
require.True(t, !started.IsZero())
require.True(t, !finished.IsZero())
require.False(t, canceled)
})
}
@ -203,3 +207,51 @@ func TestMultipleWorkers(t *testing.T) {
_ = pushTestJob(t, q, "octopus", nil, nil)
<-done
}
func TestCancel(t *testing.T) {
q, dir := newTemporaryQueue(t, []string{"octopus", "clownfish"})
defer cleanupTempDir(t, dir)
// Cancel a non-existing job
err := q.CancelJob(uuid.New())
require.Error(t, err)
// Cancel a pending job
id := pushTestJob(t, q, "clownfish", nil, nil)
require.NotEmpty(t, id)
err = q.CancelJob(id)
require.NoError(t, err)
_, _, _, canceled, err := q.JobStatus(id, &testResult{})
require.NoError(t, err)
require.True(t, canceled)
err = q.FinishJob(id, &testResult{})
require.Error(t, err)
// Cancel a running job, which should not dequeue the canceled job from above
id = pushTestJob(t, q, "clownfish", nil, nil)
require.NotEmpty(t, id)
r, err := q.Dequeue(context.Background(), []string{"clownfish"}, &json.RawMessage{})
require.NoError(t, err)
require.Equal(t, id, r)
err = q.CancelJob(id)
require.NoError(t, err)
_, _, _, canceled, err = q.JobStatus(id, &testResult{})
require.NoError(t, err)
require.True(t, canceled)
err = q.FinishJob(id, &testResult{})
require.Error(t, err)
// Cancel a finished job, which is a no-op
id = pushTestJob(t, q, "clownfish", nil, nil)
require.NotEmpty(t, id)
r, err = q.Dequeue(context.Background(), []string{"clownfish"}, &json.RawMessage{})
require.NoError(t, err)
require.Equal(t, id, r)
err = q.FinishJob(id, &testResult{})
require.NoError(t, err)
err = q.CancelJob(id)
require.NoError(t, err)
_, _, _, canceled, err = q.JobStatus(id, &testResult{})
require.NoError(t, err)
require.False(t, canceled)
}

View file

@ -48,16 +48,20 @@ type JobQueue interface {
// job type and must be serializable to JSON.
FinishJob(id uuid.UUID, result interface{}) error
// Cancel a job. Does nothing if the job has already finished.
CancelJob(id uuid.UUID) error
// Returns the current status of the job, in the form of three times:
// queued, started, and finished. `started` and `finished` might be the
// zero time (check with t.IsZero()), when the job is not running or
// finished, respectively.
//
// If the job is finished, its result will be returned in `result`.
JobStatus(id uuid.UUID, result interface{}) (queued, started, finished time.Time, err error)
JobStatus(id uuid.UUID, result interface{}) (queued, started, finished time.Time, canceled bool, err error)
}
var (
ErrNotExist = errors.New("job does not exist")
ErrNotRunning = errors.New("job is not running")
ErrCanceled = errors.New("job ws canceled")
)

View file

@ -33,6 +33,7 @@ type job struct {
QueuedAt time.Time
StartedAt time.Time
FinishedAt time.Time
Canceled bool
}
func New() *testJobQueue {
@ -134,9 +135,18 @@ func (q *testJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
return nil
}
func (q *testJobQueue) JobStatus(id uuid.UUID, result interface{}) (queued, started, finished time.Time, err error) {
var j *job
func (q *testJobQueue) CancelJob(id uuid.UUID) error {
j, exists := q.jobs[id]
if !exists {
return jobqueue.ErrNotExist
}
j.Canceled = true
return nil
}
func (q *testJobQueue) JobStatus(id uuid.UUID, result interface{}) (queued, started, finished time.Time, canceled bool, err error) {
j, exists := q.jobs[id]
if !exists {
err = jobqueue.ErrNotExist
@ -153,6 +163,7 @@ func (q *testJobQueue) JobStatus(id uuid.UUID, result interface{}) (queued, star
queued = j.QueuedAt
started = j.StartedAt
finished = j.FinishedAt
canceled = j.Canceled
return
}

View file

@ -79,14 +79,17 @@ func (s *Server) Enqueue(manifest *osbuild.Manifest, targets []*target.Target) (
}
func (s *Server) JobStatus(id uuid.UUID) (state common.ComposeState, queued, started, finished time.Time, err error) {
var canceled bool
var result OSBuildJobResult
queued, started, finished, err = s.jobs.JobStatus(id, &result)
queued, started, finished, canceled, err = s.jobs.JobStatus(id, &result)
if err != nil {
return
}
if !finished.IsZero() {
if canceled {
state = common.CFailed
} else if !finished.IsZero() {
if result.OSBuildOutput.Success {
state = common.CFinished
} else {
@ -102,15 +105,18 @@ func (s *Server) JobStatus(id uuid.UUID) (state common.ComposeState, queued, sta
}
func (s *Server) JobResult(id uuid.UUID) (common.ComposeState, *common.ComposeResult, error) {
var canceled bool
var result OSBuildJobResult
_, started, finished, err := s.jobs.JobStatus(id, &result)
_, started, finished, canceled, err := s.jobs.JobStatus(id, &result)
if err != nil {
return common.CWaiting, nil, err
}
state := common.CWaiting
if !finished.IsZero() {
if canceled {
state = common.CFailed
} else if !finished.IsZero() {
if result.OSBuildOutput.Success {
state = common.CFinished
} else {