From 2765d2d9a8647dbc97684bc1cd4805095bf8c1ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Budai?= Date: Thu, 3 Mar 2022 13:29:29 +0100 Subject: [PATCH] jobqueuetest: add a test for multiple channels MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ondřej Budai --- .../jobqueue/jobqueuetest/jobqueuetest.go | 103 ++++++++++++++++++ 1 file changed, 103 insertions(+) diff --git a/internal/jobqueue/jobqueuetest/jobqueuetest.go b/internal/jobqueue/jobqueuetest/jobqueuetest.go index 92488f09b..c072323dd 100644 --- a/internal/jobqueue/jobqueuetest/jobqueuetest.go +++ b/internal/jobqueue/jobqueuetest/jobqueuetest.go @@ -41,6 +41,7 @@ func TestJobQueue(t *testing.T, makeJobQueue MakeJobQueue) { t.Run("heartbeats", wrap(testHeartbeats)) t.Run("timeout", wrap(testDequeueTimeout)) t.Run("dequeue-by-id", wrap(testDequeueByID)) + t.Run("multiple-channels", wrap(testMultipleChannels)) } func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID, channel string) uuid.UUID { @@ -482,3 +483,105 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, jobqueue.ErrNotPending, err) }) } + +func testMultipleChannels(t *testing.T, q jobqueue.JobQueue) { + t.Run("two single channel dequeuers", func(t *testing.T) { + var wg sync.WaitGroup + + oneChan := make(chan uuid.UUID, 1) + twoChan := make(chan uuid.UUID, 1) + + // dequeue kingfisher channel + wg.Add(1) + go func() { + defer wg.Done() + + id, _, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{"kingfisher"}) + require.NoError(t, err) + + expectedID := <-twoChan + require.Equal(t, expectedID, id) + }() + + // dequeue toucan channel + wg.Add(1) + go func() { + defer wg.Done() + + id, _, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{"toucan"}) + require.NoError(t, err) + + expectedID := <-oneChan + require.Equal(t, expectedID, id) + }() + + // enqueue into toucan channel + one := pushTestJob(t, q, "octopus", nil, nil, "toucan") + oneChan <- one + // enqueue into kingfisher channel + two := pushTestJob(t, q, "octopus", nil, nil, "kingfisher") + twoChan <- two + wg.Wait() + }) + + t.Run("one double channel dequeuers", func(t *testing.T) { + var wg sync.WaitGroup + + oneChan := make(chan uuid.UUID, 1) + twoChan := make(chan uuid.UUID, 1) + + // dequeue kingfisher channel + wg.Add(1) + go func() { + defer wg.Done() + + id, _, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{"kingfisher", "toucan"}) + require.NoError(t, err) + + expectedID := <-oneChan + require.Equal(t, expectedID, id) + + id, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{"kingfisher", "toucan"}) + require.NoError(t, err) + + expectedID = <-twoChan + require.Equal(t, expectedID, id) + }() + + // enqueue into toucan channel + one := pushTestJob(t, q, "octopus", nil, nil, "toucan") + oneChan <- one + // enqueue into kingfisher channel + two := pushTestJob(t, q, "octopus", nil, nil, "kingfisher") + twoChan <- two + wg.Wait() + }) + + t.Run("dequeing no-existing channel", func(t *testing.T) { + // enqueue into toucan channel + pushTestJob(t, q, "octopus", nil, nil, "toucan") + + // dequeue from an empty channel + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond*100)) + defer cancel() + _, _, _, _, _, err := q.Dequeue(ctx, []string{"octopus"}, []string{""}) + require.ErrorIs(t, err, jobqueue.ErrDequeueTimeout) + + // dequeue from toucan channel + _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{"toucan"}) + require.NoError(t, err) + + // enqueue into an empty channel + pushTestJob(t, q, "octopus", nil, nil, "") + + // dequeue from toucan channel + ctx2, cancel2 := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond*100)) + defer cancel2() + _, _, _, _, _, err = q.Dequeue(ctx2, []string{"octopus"}, []string{"toucan"}) + require.ErrorIs(t, err, jobqueue.ErrDequeueTimeout) + + // dequeue from an empty channel + _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) + require.NoError(t, err) + }) +}