jobqueue/JobStatus: return result as json.RawMessage

Similarly to the recent changes to Dequeue(), let the caller unmarshal the
return JSON. This allows us to pass the result on without being able
to unmarshal it.

In follow-up patches, we will pass results of jobs to dependent jobs,
but the worker API does not know about the different job types, nor how
to unmarshal them.
This commit is contained in:
Tom Gundersen 2020-11-06 17:03:37 +00:00 committed by Lars Karlitski
parent e277501ca3
commit 11d0da0b5c
6 changed files with 34 additions and 28 deletions

View file

@ -263,20 +263,13 @@ func (q *fsJobQueue) CancelJob(id uuid.UUID) error {
return nil
}
func (q *fsJobQueue) JobStatus(id uuid.UUID, result interface{}) (queued, started, finished time.Time, canceled bool, err error) {
func (q *fsJobQueue) JobStatus(id uuid.UUID) (result json.RawMessage, queued, started, finished time.Time, canceled bool, err error) {
j, err := q.readJob(id)
if err != nil {
return
}
if !j.FinishedAt.IsZero() && !j.Canceled {
err = json.Unmarshal(j.Result, result)
if err != nil {
err = fmt.Errorf("error unmarshaling result for job '%s': %v", id, err)
return
}
}
result = j.Result
queued = j.QueuedAt
started = j.StartedAt
finished = j.FinishedAt

View file

@ -147,7 +147,7 @@ func TestDependencies(t *testing.T) {
require.ElementsMatch(t, []uuid.UUID{one, two}, r)
j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
queued, started, finished, canceled, err := q.JobStatus(j, nil)
_, queued, started, finished, canceled, err := q.JobStatus(j)
require.NoError(t, err)
require.True(t, !queued.IsZero())
require.True(t, started.IsZero())
@ -156,12 +156,15 @@ func TestDependencies(t *testing.T) {
require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two}))
queued, started, finished, canceled, err = q.JobStatus(j, &testResult{})
result, queued, started, finished, canceled, err := q.JobStatus(j)
require.NoError(t, err)
require.True(t, !queued.IsZero())
require.True(t, !started.IsZero())
require.True(t, !finished.IsZero())
require.False(t, canceled)
err = json.Unmarshal(result, &testResult{})
require.NoError(t, err)
})
t.Run("done-after-pushing-dependant", func(t *testing.T) {
@ -169,7 +172,7 @@ func TestDependencies(t *testing.T) {
two := pushTestJob(t, q, "test", nil, nil)
j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
queued, started, finished, canceled, err := q.JobStatus(j, nil)
_, queued, started, finished, canceled, err := q.JobStatus(j)
require.NoError(t, err)
require.True(t, !queued.IsZero())
require.True(t, started.IsZero())
@ -183,12 +186,15 @@ func TestDependencies(t *testing.T) {
require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two}))
queued, started, finished, canceled, err = q.JobStatus(j, &testResult{})
result, queued, started, finished, canceled, err := q.JobStatus(j)
require.NoError(t, err)
require.True(t, !queued.IsZero())
require.True(t, !started.IsZero())
require.True(t, !finished.IsZero())
require.False(t, canceled)
err = json.Unmarshal(result, &testResult{})
require.NoError(t, err)
})
}
@ -242,9 +248,10 @@ func TestCancel(t *testing.T) {
require.NotEmpty(t, id)
err = q.CancelJob(id)
require.NoError(t, err)
_, _, _, canceled, err := q.JobStatus(id, &testResult{})
result, _, _, _, canceled, err := q.JobStatus(id)
require.NoError(t, err)
require.True(t, canceled)
require.Nil(t, result)
err = q.FinishJob(id, &testResult{})
require.Error(t, err)
@ -259,9 +266,10 @@ func TestCancel(t *testing.T) {
require.Equal(t, json.RawMessage("null"), args)
err = q.CancelJob(id)
require.NoError(t, err)
_, _, _, canceled, err = q.JobStatus(id, &testResult{})
result, _, _, _, canceled, err = q.JobStatus(id)
require.NoError(t, err)
require.True(t, canceled)
require.Nil(t, result)
err = q.FinishJob(id, &testResult{})
require.Error(t, err)
@ -278,7 +286,9 @@ func TestCancel(t *testing.T) {
require.NoError(t, err)
err = q.CancelJob(id)
require.NoError(t, err)
_, _, _, canceled, err = q.JobStatus(id, &testResult{})
result, _, _, _, canceled, err = q.JobStatus(id)
require.NoError(t, err)
require.False(t, canceled)
err = json.Unmarshal(result, &testResult{})
require.NoError(t, err)
}

View file

@ -56,7 +56,7 @@ type JobQueue interface {
// finished, respectively.
//
// If the job is finished, its result will be returned in `result`.
JobStatus(id uuid.UUID, result interface{}) (queued, started, finished time.Time, canceled bool, err error)
JobStatus(id uuid.UUID) (result json.RawMessage, queued, started, finished time.Time, canceled bool, err error)
}
var (

View file

@ -141,20 +141,14 @@ func (q *testJobQueue) CancelJob(id uuid.UUID) error {
return nil
}
func (q *testJobQueue) JobStatus(id uuid.UUID, result interface{}) (queued, started, finished time.Time, canceled bool, err error) {
func (q *testJobQueue) JobStatus(id uuid.UUID) (result json.RawMessage, queued, started, finished time.Time, canceled bool, err error) {
j, exists := q.jobs[id]
if !exists {
err = jobqueue.ErrNotExist
return
}
if !j.FinishedAt.IsZero() {
err = json.Unmarshal(j.Result, result)
if err != nil {
return
}
}
result = j.Result
queued = j.QueuedAt
started = j.StartedAt
finished = j.FinishedAt