jobqueuetest: add a test for multiple channels

Signed-off-by: Ondřej Budai <ondrej@budai.cz>
This commit is contained in:
Ondřej Budai 2022-03-03 13:29:29 +01:00 committed by Ondřej Budai
parent 32080e6202
commit 2765d2d9a8

View file

@ -41,6 +41,7 @@ func TestJobQueue(t *testing.T, makeJobQueue MakeJobQueue) {
t.Run("heartbeats", wrap(testHeartbeats))
t.Run("timeout", wrap(testDequeueTimeout))
t.Run("dequeue-by-id", wrap(testDequeueByID))
t.Run("multiple-channels", wrap(testMultipleChannels))
}
func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID, channel string) uuid.UUID {
@ -482,3 +483,105 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) {
require.Equal(t, jobqueue.ErrNotPending, err)
})
}
func testMultipleChannels(t *testing.T, q jobqueue.JobQueue) {
t.Run("two single channel dequeuers", func(t *testing.T) {
var wg sync.WaitGroup
oneChan := make(chan uuid.UUID, 1)
twoChan := make(chan uuid.UUID, 1)
// dequeue kingfisher channel
wg.Add(1)
go func() {
defer wg.Done()
id, _, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{"kingfisher"})
require.NoError(t, err)
expectedID := <-twoChan
require.Equal(t, expectedID, id)
}()
// dequeue toucan channel
wg.Add(1)
go func() {
defer wg.Done()
id, _, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{"toucan"})
require.NoError(t, err)
expectedID := <-oneChan
require.Equal(t, expectedID, id)
}()
// enqueue into toucan channel
one := pushTestJob(t, q, "octopus", nil, nil, "toucan")
oneChan <- one
// enqueue into kingfisher channel
two := pushTestJob(t, q, "octopus", nil, nil, "kingfisher")
twoChan <- two
wg.Wait()
})
t.Run("one double channel dequeuers", func(t *testing.T) {
var wg sync.WaitGroup
oneChan := make(chan uuid.UUID, 1)
twoChan := make(chan uuid.UUID, 1)
// dequeue kingfisher channel
wg.Add(1)
go func() {
defer wg.Done()
id, _, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{"kingfisher", "toucan"})
require.NoError(t, err)
expectedID := <-oneChan
require.Equal(t, expectedID, id)
id, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{"kingfisher", "toucan"})
require.NoError(t, err)
expectedID = <-twoChan
require.Equal(t, expectedID, id)
}()
// enqueue into toucan channel
one := pushTestJob(t, q, "octopus", nil, nil, "toucan")
oneChan <- one
// enqueue into kingfisher channel
two := pushTestJob(t, q, "octopus", nil, nil, "kingfisher")
twoChan <- two
wg.Wait()
})
t.Run("dequeing no-existing channel", func(t *testing.T) {
// enqueue into toucan channel
pushTestJob(t, q, "octopus", nil, nil, "toucan")
// dequeue from an empty channel
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond*100))
defer cancel()
_, _, _, _, _, err := q.Dequeue(ctx, []string{"octopus"}, []string{""})
require.ErrorIs(t, err, jobqueue.ErrDequeueTimeout)
// dequeue from toucan channel
_, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{"toucan"})
require.NoError(t, err)
// enqueue into an empty channel
pushTestJob(t, q, "octopus", nil, nil, "")
// dequeue from toucan channel
ctx2, cancel2 := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond*100))
defer cancel2()
_, _, _, _, _, err = q.Dequeue(ctx2, []string{"octopus"}, []string{"toucan"})
require.ErrorIs(t, err, jobqueue.ErrDequeueTimeout)
// dequeue from an empty channel
_, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""})
require.NoError(t, err)
})
}