jobqueuetest: add channel arg to the pushTestJob helper

Signed-off-by: Ondřej Budai <ondrej@budai.cz>
This commit is contained in:
Ondřej Budai 2022-03-03 11:29:01 +01:00 committed by Ondřej Budai
parent 7bfcee36f8
commit 4c31b04a65

View file

@ -43,9 +43,9 @@ func TestJobQueue(t *testing.T, makeJobQueue MakeJobQueue) {
t.Run("dequeue-by-id", wrap(testDequeueByID))
}
func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID) uuid.UUID {
func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID, channel string) uuid.UUID {
t.Helper()
id, err := q.Enqueue(jobType, args, dependencies, "")
id, err := q.Enqueue(jobType, args, dependencies, channel)
require.NoError(t, err)
require.NotEmpty(t, id)
return id
@ -78,7 +78,7 @@ func testErrors(t *testing.T, q jobqueue.JobQueue) {
require.Equal(t, uuid.Nil, id)
// token gets removed
pushTestJob(t, q, "octopus", nil, nil)
pushTestJob(t, q, "octopus", nil, nil, "")
id, tok, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{""})
require.NoError(t, err)
require.NotEmpty(t, tok)
@ -103,10 +103,10 @@ func testArgs(t *testing.T, q jobqueue.JobQueue) {
}
oneargs := argument{7, "🐠"}
one := pushTestJob(t, q, "fish", oneargs, nil)
one := pushTestJob(t, q, "fish", oneargs, nil, "")
twoargs := argument{42, "🐙"}
two := pushTestJob(t, q, "octopus", twoargs, nil)
two := pushTestJob(t, q, "octopus", twoargs, nil, "")
var parsedArgs argument
@ -148,8 +148,8 @@ func testArgs(t *testing.T, q jobqueue.JobQueue) {
}
func testJobTypes(t *testing.T, q jobqueue.JobQueue) {
one := pushTestJob(t, q, "octopus", nil, nil)
two := pushTestJob(t, q, "clownfish", nil, nil)
one := pushTestJob(t, q, "octopus", nil, nil, "")
two := pushTestJob(t, q, "clownfish", nil, nil, "")
require.Equal(t, two, finishNextTestJob(t, q, "clownfish", testResult{}, nil))
require.Equal(t, one, finishNextTestJob(t, q, "octopus", testResult{}, nil))
@ -179,15 +179,15 @@ func testDequeueTimeout(t *testing.T, q jobqueue.JobQueue) {
func testDependencies(t *testing.T, q jobqueue.JobQueue) {
t.Run("done-before-pushing-dependant", func(t *testing.T) {
one := pushTestJob(t, q, "test", nil, nil)
two := pushTestJob(t, q, "test", nil, nil)
one := pushTestJob(t, q, "test", nil, nil, "")
two := pushTestJob(t, q, "test", nil, nil, "")
r := []uuid.UUID{}
r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil))
r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil))
require.ElementsMatch(t, []uuid.UUID{one, two}, r)
j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two}, "")
jobType, _, queued, started, finished, canceled, deps, err := q.JobStatus(j)
require.NoError(t, err)
require.Equal(t, jobType, "test")
@ -213,10 +213,10 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) {
})
t.Run("done-after-pushing-dependant", func(t *testing.T) {
one := pushTestJob(t, q, "test", nil, nil)
two := pushTestJob(t, q, "test", nil, nil)
one := pushTestJob(t, q, "test", nil, nil, "")
two := pushTestJob(t, q, "test", nil, nil, "")
j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two}, "")
jobType, _, queued, started, finished, canceled, deps, err := q.JobStatus(j)
require.NoError(t, err)
require.Equal(t, jobType, "test")
@ -269,7 +269,7 @@ func testMultipleWorkers(t *testing.T, q jobqueue.JobQueue) {
time.Sleep(10 * time.Millisecond)
// This call to Dequeue() should not block on the one in the goroutine.
id := pushTestJob(t, q, "clownfish", nil, nil)
id := pushTestJob(t, q, "clownfish", nil, nil, "")
r, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}, []string{""})
require.NoError(t, err)
require.Equal(t, id, r)
@ -279,7 +279,7 @@ func testMultipleWorkers(t *testing.T, q jobqueue.JobQueue) {
require.Equal(t, json.RawMessage("null"), args)
// Now wake up the Dequeue() in the goroutine and wait for it to finish.
_ = pushTestJob(t, q, "octopus", nil, nil)
_ = pushTestJob(t, q, "octopus", nil, nil, "")
<-done
}
@ -308,13 +308,13 @@ func testMultipleWorkersSingleJobType(t *testing.T, q jobqueue.JobQueue) {
time.Sleep(10 * time.Millisecond)
// Satisfy the first listener
_ = pushTestJob(t, q, "clownfish", nil, nil)
_ = pushTestJob(t, q, "clownfish", nil, nil, "")
// Wait a bit for the listener to process the job
time.Sleep(10 * time.Millisecond)
// Satisfy the second listener
_ = pushTestJob(t, q, "clownfish", nil, nil)
_ = pushTestJob(t, q, "clownfish", nil, nil, "")
wg.Wait()
}
@ -325,7 +325,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) {
require.Error(t, err)
// Cancel a pending job
id := pushTestJob(t, q, "clownfish", nil, nil)
id := pushTestJob(t, q, "clownfish", nil, nil, "")
require.NotEmpty(t, id)
err = q.CancelJob(id)
require.NoError(t, err)
@ -338,7 +338,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) {
require.Error(t, err)
// Cancel a running job, which should not dequeue the canceled job from above
id = pushTestJob(t, q, "clownfish", nil, nil)
id = pushTestJob(t, q, "clownfish", nil, nil, "")
require.NotEmpty(t, id)
r, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}, []string{""})
require.NoError(t, err)
@ -358,7 +358,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) {
require.Error(t, err)
// Cancel a finished job, which is a no-op
id = pushTestJob(t, q, "clownfish", nil, nil)
id = pushTestJob(t, q, "clownfish", nil, nil, "")
require.NotEmpty(t, id)
r, tok, deps, typ, args, err = q.Dequeue(context.Background(), []string{"clownfish"}, []string{""})
require.NoError(t, err)
@ -381,7 +381,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) {
}
func testHeartbeats(t *testing.T, q jobqueue.JobQueue) {
id := pushTestJob(t, q, "octopus", nil, nil)
id := pushTestJob(t, q, "octopus", nil, nil, "")
// No heartbeats for queued job
require.Empty(t, q.Heartbeats(time.Second*0))
@ -417,8 +417,8 @@ func testHeartbeats(t *testing.T, q jobqueue.JobQueue) {
func testDequeueByID(t *testing.T, q jobqueue.JobQueue) {
t.Run("basic", func(t *testing.T) {
one := pushTestJob(t, q, "octopus", nil, nil)
two := pushTestJob(t, q, "octopus", nil, nil)
one := pushTestJob(t, q, "octopus", nil, nil, "")
two := pushTestJob(t, q, "octopus", nil, nil, "")
tok, d, typ, args, err := q.DequeueByID(context.Background(), one)
require.NoError(t, err)
@ -434,8 +434,8 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) {
})
t.Run("cannot dequeue a job without finished deps", func(t *testing.T) {
one := pushTestJob(t, q, "octopus", nil, nil)
two := pushTestJob(t, q, "octopus", nil, []uuid.UUID{one})
one := pushTestJob(t, q, "octopus", nil, nil, "")
two := pushTestJob(t, q, "octopus", nil, []uuid.UUID{one}, "")
_, _, _, _, err := q.DequeueByID(context.Background(), two)
require.Equal(t, jobqueue.ErrNotPending, err)
@ -445,7 +445,7 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) {
})
t.Run("cannot dequeue a non-pending job", func(t *testing.T) {
one := pushTestJob(t, q, "octopus", nil, nil)
one := pushTestJob(t, q, "octopus", nil, nil, "")
_, _, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{""})
require.NoError(t, err)