jobqueue: drop JobStatus type

The enum is redundant information that can be deduced from the job's
times: queuedAt, startedAt, and finishedAt. Not having it reduces the
potential for inconsistent state.
This commit is contained in:
Lars Karlitski 2020-05-10 10:16:00 +02:00 committed by Tom Gundersen
parent e1805d5f62
commit 6773c01722
5 changed files with 70 additions and 101 deletions

View file

@ -51,10 +51,9 @@ type job struct {
Dependencies []uuid.UUID `json:"dependencies"`
Result json.RawMessage `json:"result,omitempty"`
Status jobqueue.JobStatus `json:"status"`
QueuedAt time.Time `json:"queued_at,omitempty"`
StartedAt time.Time `json:"started_at,omitempty"`
FinishedAt time.Time `json:"finished_at,omitempty"`
QueuedAt time.Time `json:"queued_at,omitempty"`
StartedAt time.Time `json:"started_at,omitempty"`
FinishedAt time.Time `json:"finished_at,omitempty"`
}
// Create a new fsJobQueue object for `dir`. This object must have exclusive
@ -82,7 +81,7 @@ func New(dir string) (*fsJobQueue, error) {
return nil, err
}
// We only enqueue jobs that were previously pending.
if j.Status != jobqueue.JobPending {
if j.StartedAt.IsZero() {
continue
}
// Enqueue the job again if all dependencies have finished, or
@ -112,7 +111,6 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
Id: uuid.New(),
Type: jobType,
Dependencies: uniqueUUIDList(dependencies),
Status: jobqueue.JobPending,
QueuedAt: time.Now(),
}
@ -192,7 +190,6 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string, args interf
return uuid.Nil, fmt.Errorf("error unmarshaling arguments for job '%s': %v", j.Id, err)
}
j.Status = jobqueue.JobRunning
j.StartedAt = time.Now()
err = q.db.Write(id.String(), j)
@ -209,11 +206,10 @@ func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
return err
}
if j.Status != jobqueue.JobRunning {
if j.StartedAt.IsZero() || !j.FinishedAt.IsZero() {
return jobqueue.ErrNotRunning
}
j.Status = jobqueue.JobFinished
j.FinishedAt = time.Now()
j.Result, err = json.Marshal(result)
@ -247,7 +243,7 @@ func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
return nil
}
func (q *fsJobQueue) JobStatus(id uuid.UUID, result interface{}) (status jobqueue.JobStatus, queued, started, finished time.Time, err error) {
func (q *fsJobQueue) JobStatus(id uuid.UUID, result interface{}) (queued, started, finished time.Time, err error) {
var j *job
j, err = q.readJob(id)
@ -255,7 +251,7 @@ func (q *fsJobQueue) JobStatus(id uuid.UUID, result interface{}) (status jobqueu
return
}
if j.Status == jobqueue.JobFinished {
if !j.FinishedAt.IsZero() {
err = json.Unmarshal(j.Result, result)
if err != nil {
err = fmt.Errorf("error unmarshaling result for job '%s': %v", id, err)
@ -263,7 +259,6 @@ func (q *fsJobQueue) JobStatus(id uuid.UUID, result interface{}) (status jobqueu
}
}
status = j.Status
queued = j.QueuedAt
started = j.StartedAt
finished = j.FinishedAt
@ -279,7 +274,7 @@ func (q *fsJobQueue) countFinishedJobs(ids []uuid.UUID) (int, error) {
if err != nil {
return 0, err
}
if j.Status == jobqueue.JobFinished {
if !j.FinishedAt.IsZero() {
n += 1
}
}

View file

@ -130,15 +130,19 @@ func TestDependencies(t *testing.T) {
require.ElementsMatch(t, []uuid.UUID{one, two}, r)
j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
status, _, _, _, err := q.JobStatus(j, nil)
queued, started, finished, err := q.JobStatus(j, nil)
require.NoError(t, err)
require.Equal(t, jobqueue.JobPending, status)
require.True(t, !queued.IsZero())
require.True(t, started.IsZero())
require.True(t, finished.IsZero())
require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}))
status, _, _, _, err = q.JobStatus(j, &testResult{})
queued, started, finished, err = q.JobStatus(j, &testResult{})
require.NoError(t, err)
require.Equal(t, jobqueue.JobFinished, status)
require.True(t, !queued.IsZero())
require.True(t, !started.IsZero())
require.True(t, !finished.IsZero())
})
t.Run("done-after-pushing-dependant", func(t *testing.T) {
@ -146,9 +150,11 @@ func TestDependencies(t *testing.T) {
two := pushTestJob(t, q, "test", nil, nil)
j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
status, _, _, _, err := q.JobStatus(j, nil)
queued, started, finished, err := q.JobStatus(j, nil)
require.NoError(t, err)
require.Equal(t, jobqueue.JobPending, status)
require.True(t, !queued.IsZero())
require.True(t, started.IsZero())
require.True(t, finished.IsZero())
r := []uuid.UUID{}
r = append(r, finishNextTestJob(t, q, "test", testResult{}))
@ -157,8 +163,10 @@ func TestDependencies(t *testing.T) {
require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}))
status, _, _, _, err = q.JobStatus(j, &testResult{})
queued, started, finished, err = q.JobStatus(j, &testResult{})
require.NoError(t, err)
require.Equal(t, jobqueue.JobFinished, status)
require.True(t, !queued.IsZero())
require.True(t, !started.IsZero())
require.True(t, !finished.IsZero())
})
}