jobqueue: return dependencies on dequeue

Once a job has been enqueued, there is no way to query its dependencies.

This makes dequeue more symmetric to enqueue by returning the
dependencies that were passed to enqueue, allowing the caller to
query the dependencies and their results.

Signed-off-by: Tom Gundersen <teg@jklm.no>
This commit is contained in:
Tom Gundersen 2020-08-15 21:07:51 +02:00 committed by Lars Karlitski
parent e72b14bdd1
commit e277501ca3
5 changed files with 37 additions and 29 deletions

View file

@ -144,13 +144,13 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
return j.Id, nil
}
func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, string, json.RawMessage, error) {
func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
q.mu.Lock()
defer q.mu.Unlock()
// Return early if the context is already canceled.
if err := ctx.Err(); err != nil {
return uuid.Nil, "", nil, err
return uuid.Nil, nil, "", nil, err
}
// Filter q.pending by the `jobTypes`. Ignore those job types that this
@ -172,12 +172,12 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID,
q.mu.Lock()
if err != nil {
return uuid.Nil, "", nil, err
return uuid.Nil, nil, "", nil, err
}
j, err = q.readJob(id)
if err != nil {
return uuid.Nil, "", nil, err
return uuid.Nil, nil, "", nil, err
}
if !j.Canceled {
@ -189,10 +189,10 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID,
err := q.db.Write(j.Id.String(), j)
if err != nil {
return uuid.Nil, "", nil, fmt.Errorf("error writing job %s: %v", j.Id, err)
return uuid.Nil, nil, "", nil, fmt.Errorf("error writing job %s: %v", j.Id, err)
}
return j.Id, j.Type, j.Args, nil
return j.Id, j.Dependencies, j.Type, j.Args, nil
}
func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error {

View file

@ -42,10 +42,11 @@ func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interfa
return id
}
func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result interface{}) uuid.UUID {
id, typ, args, err := q.Dequeue(context.Background(), []string{jobType})
func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result interface{}, deps []uuid.UUID) uuid.UUID {
id, d, typ, args, err := q.Dequeue(context.Background(), []string{jobType})
require.NoError(t, err)
require.NotEmpty(t, id)
require.ElementsMatch(t, deps, d)
require.Equal(t, jobType, typ)
require.NotNil(t, args)
@ -93,17 +94,19 @@ func TestArgs(t *testing.T) {
var parsedArgs argument
id, typ, args, err := q.Dequeue(context.Background(), []string{"octopus"})
id, deps, typ, args, err := q.Dequeue(context.Background(), []string{"octopus"})
require.NoError(t, err)
require.Equal(t, two, id)
require.Empty(t, deps)
require.Equal(t, "octopus", typ)
err = json.Unmarshal(args, &parsedArgs)
require.NoError(t, err)
require.Equal(t, twoargs, parsedArgs)
id, typ, args, err = q.Dequeue(context.Background(), []string{"fish"})
id, deps, typ, args, err = q.Dequeue(context.Background(), []string{"fish"})
require.NoError(t, err)
require.Equal(t, one, id)
require.Empty(t, deps)
require.Equal(t, "fish", typ)
err = json.Unmarshal(args, &parsedArgs)
require.NoError(t, err)
@ -117,14 +120,15 @@ func TestJobTypes(t *testing.T) {
one := pushTestJob(t, q, "octopus", nil, nil)
two := pushTestJob(t, q, "clownfish", nil, nil)
require.Equal(t, two, finishNextTestJob(t, q, "clownfish", testResult{}))
require.Equal(t, one, finishNextTestJob(t, q, "octopus", testResult{}))
require.Equal(t, two, finishNextTestJob(t, q, "clownfish", testResult{}, nil))
require.Equal(t, one, finishNextTestJob(t, q, "octopus", testResult{}, nil))
ctx, cancel := context.WithCancel(context.Background())
cancel()
id, typ, args, err := q.Dequeue(ctx, []string{"zebra"})
id, deps, typ, args, err := q.Dequeue(ctx, []string{"zebra"})
require.Equal(t, err, context.Canceled)
require.Equal(t, uuid.Nil, id)
require.Empty(t, deps)
require.Equal(t, "", typ)
require.Nil(t, args)
}
@ -138,8 +142,8 @@ func TestDependencies(t *testing.T) {
two := pushTestJob(t, q, "test", nil, nil)
r := []uuid.UUID{}
r = append(r, finishNextTestJob(t, q, "test", testResult{}))
r = append(r, finishNextTestJob(t, q, "test", testResult{}))
r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil))
r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil))
require.ElementsMatch(t, []uuid.UUID{one, two}, r)
j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
@ -150,7 +154,7 @@ func TestDependencies(t *testing.T) {
require.True(t, finished.IsZero())
require.False(t, canceled)
require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}))
require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two}))
queued, started, finished, canceled, err = q.JobStatus(j, &testResult{})
require.NoError(t, err)
@ -173,11 +177,11 @@ func TestDependencies(t *testing.T) {
require.False(t, canceled)
r := []uuid.UUID{}
r = append(r, finishNextTestJob(t, q, "test", testResult{}))
r = append(r, finishNextTestJob(t, q, "test", testResult{}))
r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil))
r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil))
require.ElementsMatch(t, []uuid.UUID{one, two}, r)
require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}))
require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two}))
queued, started, finished, canceled, err = q.JobStatus(j, &testResult{})
require.NoError(t, err)
@ -199,9 +203,10 @@ func TestMultipleWorkers(t *testing.T) {
defer close(done)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
id, typ, args, err := q.Dequeue(ctx, []string{"octopus"})
id, deps, typ, args, err := q.Dequeue(ctx, []string{"octopus"})
require.NoError(t, err)
require.NotEmpty(t, id)
require.Empty(t, deps)
require.Equal(t, "octopus", typ)
require.Equal(t, json.RawMessage("null"), args)
}()
@ -212,9 +217,10 @@ func TestMultipleWorkers(t *testing.T) {
// This call to Dequeue() should not block on the one in the goroutine.
id := pushTestJob(t, q, "clownfish", nil, nil)
r, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"})
r, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"})
require.NoError(t, err)
require.Equal(t, id, r)
require.Empty(t, deps)
require.Equal(t, "clownfish", typ)
require.Equal(t, json.RawMessage("null"), args)
@ -245,9 +251,10 @@ func TestCancel(t *testing.T) {
// Cancel a running job, which should not dequeue the canceled job from above
id = pushTestJob(t, q, "clownfish", nil, nil)
require.NotEmpty(t, id)
r, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"})
r, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"})
require.NoError(t, err)
require.Equal(t, id, r)
require.Empty(t, deps)
require.Equal(t, "clownfish", typ)
require.Equal(t, json.RawMessage("null"), args)
err = q.CancelJob(id)
@ -261,9 +268,10 @@ func TestCancel(t *testing.T) {
// Cancel a finished job, which is a no-op
id = pushTestJob(t, q, "clownfish", nil, nil)
require.NotEmpty(t, id)
r, typ, args, err = q.Dequeue(context.Background(), []string{"clownfish"})
r, deps, typ, args, err = q.Dequeue(context.Background(), []string{"clownfish"})
require.NoError(t, err)
require.Equal(t, id, r)
require.Empty(t, deps)
require.Equal(t, "clownfish", typ)
require.Equal(t, json.RawMessage("null"), args)
err = q.FinishJob(id, &testResult{})

View file

@ -39,9 +39,9 @@ type JobQueue interface {
// Waits until a job with a type of any of `jobTypes` is available, or `ctx` is
// canceled.
//
// Returns the job's id, type, and arguments, or an error. Arguments
// Returns the job's id, dependencies, type, and arguments, or an error. Arguments
// can be unmarshaled to the type given in Enqueue().
Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, string, json.RawMessage, error)
Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error)
// Mark the job with `id` as finished. `result` must fit the associated
// job type and must be serializable to JSON.

View file

@ -79,7 +79,7 @@ func (q *testJobQueue) Enqueue(jobType string, args interface{}, dependencies []
return j.Id, nil
}
func (q *testJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, string, json.RawMessage, error) {
func (q *testJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
for _, t := range jobTypes {
if len(q.pending[t]) == 0 {
continue
@ -91,10 +91,10 @@ func (q *testJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUI
j := q.jobs[id]
j.StartedAt = time.Now()
return j.Id, j.Type, j.Args, nil
return j.Id, j.Dependencies, j.Type, j.Args, nil
}
return uuid.Nil, "", nil, errors.New("no job available")
return uuid.Nil, nil, "", nil, errors.New("no job available")
}
func (q *testJobQueue) FinishJob(id uuid.UUID, result interface{}) error {

View file

@ -175,7 +175,7 @@ func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string)
jts = append(jts, t)
}
jobId, jobType, args, err := s.jobs.Dequeue(ctx, jts)
jobId, _, jobType, args, err := s.jobs.Dequeue(ctx, jts)
if err != nil {
return uuid.Nil, uuid.Nil, "", nil, err
}