diff --git a/internal/jobqueue/jobqueuetest/jobqueuetest.go b/internal/jobqueue/jobqueuetest/jobqueuetest.go index 9e4e9e51e..fc3dc002a 100644 --- a/internal/jobqueue/jobqueuetest/jobqueuetest.go +++ b/internal/jobqueue/jobqueuetest/jobqueuetest.go @@ -43,9 +43,9 @@ func TestJobQueue(t *testing.T, makeJobQueue MakeJobQueue) { t.Run("dequeue-by-id", wrap(testDequeueByID)) } -func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID) uuid.UUID { +func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID, channel string) uuid.UUID { t.Helper() - id, err := q.Enqueue(jobType, args, dependencies, "") + id, err := q.Enqueue(jobType, args, dependencies, channel) require.NoError(t, err) require.NotEmpty(t, id) return id @@ -78,7 +78,7 @@ func testErrors(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, uuid.Nil, id) // token gets removed - pushTestJob(t, q, "octopus", nil, nil) + pushTestJob(t, q, "octopus", nil, nil, "") id, tok, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) require.NoError(t, err) require.NotEmpty(t, tok) @@ -103,10 +103,10 @@ func testArgs(t *testing.T, q jobqueue.JobQueue) { } oneargs := argument{7, "🐠"} - one := pushTestJob(t, q, "fish", oneargs, nil) + one := pushTestJob(t, q, "fish", oneargs, nil, "") twoargs := argument{42, "🐙"} - two := pushTestJob(t, q, "octopus", twoargs, nil) + two := pushTestJob(t, q, "octopus", twoargs, nil, "") var parsedArgs argument @@ -148,8 +148,8 @@ func testArgs(t *testing.T, q jobqueue.JobQueue) { } func testJobTypes(t *testing.T, q jobqueue.JobQueue) { - one := pushTestJob(t, q, "octopus", nil, nil) - two := pushTestJob(t, q, "clownfish", nil, nil) + one := pushTestJob(t, q, "octopus", nil, nil, "") + two := pushTestJob(t, q, "clownfish", nil, nil, "") require.Equal(t, two, finishNextTestJob(t, q, "clownfish", testResult{}, nil)) require.Equal(t, one, finishNextTestJob(t, q, "octopus", testResult{}, nil)) @@ -179,15 +179,15 @@ func testDequeueTimeout(t *testing.T, q jobqueue.JobQueue) { func testDependencies(t *testing.T, q jobqueue.JobQueue) { t.Run("done-before-pushing-dependant", func(t *testing.T) { - one := pushTestJob(t, q, "test", nil, nil) - two := pushTestJob(t, q, "test", nil, nil) + one := pushTestJob(t, q, "test", nil, nil, "") + two := pushTestJob(t, q, "test", nil, nil, "") r := []uuid.UUID{} r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil)) r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil)) require.ElementsMatch(t, []uuid.UUID{one, two}, r) - j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two}) + j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two}, "") jobType, _, queued, started, finished, canceled, deps, err := q.JobStatus(j) require.NoError(t, err) require.Equal(t, jobType, "test") @@ -213,10 +213,10 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) { }) t.Run("done-after-pushing-dependant", func(t *testing.T) { - one := pushTestJob(t, q, "test", nil, nil) - two := pushTestJob(t, q, "test", nil, nil) + one := pushTestJob(t, q, "test", nil, nil, "") + two := pushTestJob(t, q, "test", nil, nil, "") - j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two}) + j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two}, "") jobType, _, queued, started, finished, canceled, deps, err := q.JobStatus(j) require.NoError(t, err) require.Equal(t, jobType, "test") @@ -269,7 +269,7 @@ func testMultipleWorkers(t *testing.T, q jobqueue.JobQueue) { time.Sleep(10 * time.Millisecond) // This call to Dequeue() should not block on the one in the goroutine. - id := pushTestJob(t, q, "clownfish", nil, nil) + id := pushTestJob(t, q, "clownfish", nil, nil, "") r, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}, []string{""}) require.NoError(t, err) require.Equal(t, id, r) @@ -279,7 +279,7 @@ func testMultipleWorkers(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, json.RawMessage("null"), args) // Now wake up the Dequeue() in the goroutine and wait for it to finish. - _ = pushTestJob(t, q, "octopus", nil, nil) + _ = pushTestJob(t, q, "octopus", nil, nil, "") <-done } @@ -308,13 +308,13 @@ func testMultipleWorkersSingleJobType(t *testing.T, q jobqueue.JobQueue) { time.Sleep(10 * time.Millisecond) // Satisfy the first listener - _ = pushTestJob(t, q, "clownfish", nil, nil) + _ = pushTestJob(t, q, "clownfish", nil, nil, "") // Wait a bit for the listener to process the job time.Sleep(10 * time.Millisecond) // Satisfy the second listener - _ = pushTestJob(t, q, "clownfish", nil, nil) + _ = pushTestJob(t, q, "clownfish", nil, nil, "") wg.Wait() } @@ -325,7 +325,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { require.Error(t, err) // Cancel a pending job - id := pushTestJob(t, q, "clownfish", nil, nil) + id := pushTestJob(t, q, "clownfish", nil, nil, "") require.NotEmpty(t, id) err = q.CancelJob(id) require.NoError(t, err) @@ -338,7 +338,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { require.Error(t, err) // Cancel a running job, which should not dequeue the canceled job from above - id = pushTestJob(t, q, "clownfish", nil, nil) + id = pushTestJob(t, q, "clownfish", nil, nil, "") require.NotEmpty(t, id) r, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}, []string{""}) require.NoError(t, err) @@ -358,7 +358,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { require.Error(t, err) // Cancel a finished job, which is a no-op - id = pushTestJob(t, q, "clownfish", nil, nil) + id = pushTestJob(t, q, "clownfish", nil, nil, "") require.NotEmpty(t, id) r, tok, deps, typ, args, err = q.Dequeue(context.Background(), []string{"clownfish"}, []string{""}) require.NoError(t, err) @@ -381,7 +381,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { } func testHeartbeats(t *testing.T, q jobqueue.JobQueue) { - id := pushTestJob(t, q, "octopus", nil, nil) + id := pushTestJob(t, q, "octopus", nil, nil, "") // No heartbeats for queued job require.Empty(t, q.Heartbeats(time.Second*0)) @@ -417,8 +417,8 @@ func testHeartbeats(t *testing.T, q jobqueue.JobQueue) { func testDequeueByID(t *testing.T, q jobqueue.JobQueue) { t.Run("basic", func(t *testing.T) { - one := pushTestJob(t, q, "octopus", nil, nil) - two := pushTestJob(t, q, "octopus", nil, nil) + one := pushTestJob(t, q, "octopus", nil, nil, "") + two := pushTestJob(t, q, "octopus", nil, nil, "") tok, d, typ, args, err := q.DequeueByID(context.Background(), one) require.NoError(t, err) @@ -434,8 +434,8 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) { }) t.Run("cannot dequeue a job without finished deps", func(t *testing.T) { - one := pushTestJob(t, q, "octopus", nil, nil) - two := pushTestJob(t, q, "octopus", nil, []uuid.UUID{one}) + one := pushTestJob(t, q, "octopus", nil, nil, "") + two := pushTestJob(t, q, "octopus", nil, []uuid.UUID{one}, "") _, _, _, _, err := q.DequeueByID(context.Background(), two) require.Equal(t, jobqueue.ErrNotPending, err) @@ -445,7 +445,7 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) { }) t.Run("cannot dequeue a non-pending job", func(t *testing.T) { - one := pushTestJob(t, q, "octopus", nil, nil) + one := pushTestJob(t, q, "octopus", nil, nil, "") _, _, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) require.NoError(t, err)