From e277501ca36318945ea7192490e714270c2295c9 Mon Sep 17 00:00:00 2001 From: Tom Gundersen Date: Sat, 15 Aug 2020 21:07:51 +0200 Subject: [PATCH] jobqueue: return dependencies on dequeue Once a job has been enqueued, there is no way to query its dependencies. This makes dequeue more symmetric to enqueue by returning the dependencies that were passed to enqueue, allowing the caller to query the dependencies and their results. Signed-off-by: Tom Gundersen --- internal/jobqueue/fsjobqueue/fsjobqueue.go | 12 +++--- .../jobqueue/fsjobqueue/fsjobqueue_test.go | 42 +++++++++++-------- internal/jobqueue/jobqueue.go | 4 +- .../jobqueue/testjobqueue/testjobqueue.go | 6 +-- internal/worker/server.go | 2 +- 5 files changed, 37 insertions(+), 29 deletions(-) diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index aa0c925b8..d384ce84c 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -144,13 +144,13 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu return j.Id, nil } -func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, string, json.RawMessage, error) { +func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { q.mu.Lock() defer q.mu.Unlock() // Return early if the context is already canceled. if err := ctx.Err(); err != nil { - return uuid.Nil, "", nil, err + return uuid.Nil, nil, "", nil, err } // Filter q.pending by the `jobTypes`. Ignore those job types that this @@ -172,12 +172,12 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, q.mu.Lock() if err != nil { - return uuid.Nil, "", nil, err + return uuid.Nil, nil, "", nil, err } j, err = q.readJob(id) if err != nil { - return uuid.Nil, "", nil, err + return uuid.Nil, nil, "", nil, err } if !j.Canceled { @@ -189,10 +189,10 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, err := q.db.Write(j.Id.String(), j) if err != nil { - return uuid.Nil, "", nil, fmt.Errorf("error writing job %s: %v", j.Id, err) + return uuid.Nil, nil, "", nil, fmt.Errorf("error writing job %s: %v", j.Id, err) } - return j.Id, j.Type, j.Args, nil + return j.Id, j.Dependencies, j.Type, j.Args, nil } func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error { diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go index 4c3fec619..a49deb182 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go @@ -42,10 +42,11 @@ func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interfa return id } -func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result interface{}) uuid.UUID { - id, typ, args, err := q.Dequeue(context.Background(), []string{jobType}) +func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result interface{}, deps []uuid.UUID) uuid.UUID { + id, d, typ, args, err := q.Dequeue(context.Background(), []string{jobType}) require.NoError(t, err) require.NotEmpty(t, id) + require.ElementsMatch(t, deps, d) require.Equal(t, jobType, typ) require.NotNil(t, args) @@ -93,17 +94,19 @@ func TestArgs(t *testing.T) { var parsedArgs argument - id, typ, args, err := q.Dequeue(context.Background(), []string{"octopus"}) + id, deps, typ, args, err := q.Dequeue(context.Background(), []string{"octopus"}) require.NoError(t, err) require.Equal(t, two, id) + require.Empty(t, deps) require.Equal(t, "octopus", typ) err = json.Unmarshal(args, &parsedArgs) require.NoError(t, err) require.Equal(t, twoargs, parsedArgs) - id, typ, args, err = q.Dequeue(context.Background(), []string{"fish"}) + id, deps, typ, args, err = q.Dequeue(context.Background(), []string{"fish"}) require.NoError(t, err) require.Equal(t, one, id) + require.Empty(t, deps) require.Equal(t, "fish", typ) err = json.Unmarshal(args, &parsedArgs) require.NoError(t, err) @@ -117,14 +120,15 @@ func TestJobTypes(t *testing.T) { one := pushTestJob(t, q, "octopus", nil, nil) two := pushTestJob(t, q, "clownfish", nil, nil) - require.Equal(t, two, finishNextTestJob(t, q, "clownfish", testResult{})) - require.Equal(t, one, finishNextTestJob(t, q, "octopus", testResult{})) + require.Equal(t, two, finishNextTestJob(t, q, "clownfish", testResult{}, nil)) + require.Equal(t, one, finishNextTestJob(t, q, "octopus", testResult{}, nil)) ctx, cancel := context.WithCancel(context.Background()) cancel() - id, typ, args, err := q.Dequeue(ctx, []string{"zebra"}) + id, deps, typ, args, err := q.Dequeue(ctx, []string{"zebra"}) require.Equal(t, err, context.Canceled) require.Equal(t, uuid.Nil, id) + require.Empty(t, deps) require.Equal(t, "", typ) require.Nil(t, args) } @@ -138,8 +142,8 @@ func TestDependencies(t *testing.T) { two := pushTestJob(t, q, "test", nil, nil) r := []uuid.UUID{} - r = append(r, finishNextTestJob(t, q, "test", testResult{})) - r = append(r, finishNextTestJob(t, q, "test", testResult{})) + r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil)) + r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil)) require.ElementsMatch(t, []uuid.UUID{one, two}, r) j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two}) @@ -150,7 +154,7 @@ func TestDependencies(t *testing.T) { require.True(t, finished.IsZero()) require.False(t, canceled) - require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{})) + require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two})) queued, started, finished, canceled, err = q.JobStatus(j, &testResult{}) require.NoError(t, err) @@ -173,11 +177,11 @@ func TestDependencies(t *testing.T) { require.False(t, canceled) r := []uuid.UUID{} - r = append(r, finishNextTestJob(t, q, "test", testResult{})) - r = append(r, finishNextTestJob(t, q, "test", testResult{})) + r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil)) + r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil)) require.ElementsMatch(t, []uuid.UUID{one, two}, r) - require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{})) + require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two})) queued, started, finished, canceled, err = q.JobStatus(j, &testResult{}) require.NoError(t, err) @@ -199,9 +203,10 @@ func TestMultipleWorkers(t *testing.T) { defer close(done) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - id, typ, args, err := q.Dequeue(ctx, []string{"octopus"}) + id, deps, typ, args, err := q.Dequeue(ctx, []string{"octopus"}) require.NoError(t, err) require.NotEmpty(t, id) + require.Empty(t, deps) require.Equal(t, "octopus", typ) require.Equal(t, json.RawMessage("null"), args) }() @@ -212,9 +217,10 @@ func TestMultipleWorkers(t *testing.T) { // This call to Dequeue() should not block on the one in the goroutine. id := pushTestJob(t, q, "clownfish", nil, nil) - r, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}) + r, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}) require.NoError(t, err) require.Equal(t, id, r) + require.Empty(t, deps) require.Equal(t, "clownfish", typ) require.Equal(t, json.RawMessage("null"), args) @@ -245,9 +251,10 @@ func TestCancel(t *testing.T) { // Cancel a running job, which should not dequeue the canceled job from above id = pushTestJob(t, q, "clownfish", nil, nil) require.NotEmpty(t, id) - r, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}) + r, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}) require.NoError(t, err) require.Equal(t, id, r) + require.Empty(t, deps) require.Equal(t, "clownfish", typ) require.Equal(t, json.RawMessage("null"), args) err = q.CancelJob(id) @@ -261,9 +268,10 @@ func TestCancel(t *testing.T) { // Cancel a finished job, which is a no-op id = pushTestJob(t, q, "clownfish", nil, nil) require.NotEmpty(t, id) - r, typ, args, err = q.Dequeue(context.Background(), []string{"clownfish"}) + r, deps, typ, args, err = q.Dequeue(context.Background(), []string{"clownfish"}) require.NoError(t, err) require.Equal(t, id, r) + require.Empty(t, deps) require.Equal(t, "clownfish", typ) require.Equal(t, json.RawMessage("null"), args) err = q.FinishJob(id, &testResult{}) diff --git a/internal/jobqueue/jobqueue.go b/internal/jobqueue/jobqueue.go index 751952c35..b4e36266a 100644 --- a/internal/jobqueue/jobqueue.go +++ b/internal/jobqueue/jobqueue.go @@ -39,9 +39,9 @@ type JobQueue interface { // Waits until a job with a type of any of `jobTypes` is available, or `ctx` is // canceled. // - // Returns the job's id, type, and arguments, or an error. Arguments + // Returns the job's id, dependencies, type, and arguments, or an error. Arguments // can be unmarshaled to the type given in Enqueue(). - Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, string, json.RawMessage, error) + Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) // Mark the job with `id` as finished. `result` must fit the associated // job type and must be serializable to JSON. diff --git a/internal/jobqueue/testjobqueue/testjobqueue.go b/internal/jobqueue/testjobqueue/testjobqueue.go index d14a6d9b1..0ff89fc97 100644 --- a/internal/jobqueue/testjobqueue/testjobqueue.go +++ b/internal/jobqueue/testjobqueue/testjobqueue.go @@ -79,7 +79,7 @@ func (q *testJobQueue) Enqueue(jobType string, args interface{}, dependencies [] return j.Id, nil } -func (q *testJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, string, json.RawMessage, error) { +func (q *testJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { for _, t := range jobTypes { if len(q.pending[t]) == 0 { continue @@ -91,10 +91,10 @@ func (q *testJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUI j := q.jobs[id] j.StartedAt = time.Now() - return j.Id, j.Type, j.Args, nil + return j.Id, j.Dependencies, j.Type, j.Args, nil } - return uuid.Nil, "", nil, errors.New("no job available") + return uuid.Nil, nil, "", nil, errors.New("no job available") } func (q *testJobQueue) FinishJob(id uuid.UUID, result interface{}) error { diff --git a/internal/worker/server.go b/internal/worker/server.go index 7a8fc8432..47ade4d94 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -175,7 +175,7 @@ func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string) jts = append(jts, t) } - jobId, jobType, args, err := s.jobs.Dequeue(ctx, jts) + jobId, _, jobType, args, err := s.jobs.Dequeue(ctx, jts) if err != nil { return uuid.Nil, uuid.Nil, "", nil, err }