diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index 54ac5974e..06c4d7b89 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -21,6 +21,7 @@ import ( "time" "github.com/google/uuid" + "github.com/osbuild/osbuild-composer/internal/jobqueue" "github.com/osbuild/osbuild-composer/internal/jsondb" ) @@ -202,15 +203,6 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, id, err := selectUUIDChannel(ctx, chans) q.mu.Lock() - // Delete empty channels - for _, jt := range jobTypes { - c, exists := q.pending[jt] - if exists && len(c) == 0 { - close(c) - delete(q.pending, jt) - } - } - if err != nil { if errors.As(err, &context.Canceled) || errors.As(err, &context.DeadlineExceeded) { return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout diff --git a/internal/jobqueue/jobqueuetest/jobqueuetest.go b/internal/jobqueue/jobqueuetest/jobqueuetest.go index 58f7fab00..4e17b72a3 100644 --- a/internal/jobqueue/jobqueuetest/jobqueuetest.go +++ b/internal/jobqueue/jobqueuetest/jobqueuetest.go @@ -6,6 +6,7 @@ package jobqueuetest import ( "context" "encoding/json" + "sync" "testing" "time" @@ -36,6 +37,7 @@ func TestJobQueue(t *testing.T, makeJobQueue MakeJobQueue) { t.Run("job-types", wrap(testJobTypes)) t.Run("dependencies", wrap(testDependencies)) t.Run("multiple-workers", wrap(testMultipleWorkers)) + t.Run("multiple-workers-single-job-type", wrap(testMultipleWorkersSingleJobType)) t.Run("heartbeats", wrap(testHeartbeats)) t.Run("timeout", wrap(testDequeueTimeout)) } @@ -276,6 +278,42 @@ func testMultipleWorkers(t *testing.T, q jobqueue.JobQueue) { <-done } +func testMultipleWorkersSingleJobType(t *testing.T, q jobqueue.JobQueue) { + var wg sync.WaitGroup + wg.Add(2) + + // Start two listeners + for i := 0; i < 2; i += 1 { + go func() { + defer wg.Add(-1) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"clownfish"}) + require.NoError(t, err) + require.NotEmpty(t, id) + require.NotEmpty(t, tok) + require.Empty(t, deps) + require.Equal(t, "clownfish", typ) + require.Equal(t, json.RawMessage("null"), args) + }() + } + + // Increase the likelihood that the above goroutines were scheduled and + // is waiting in Dequeue(). + time.Sleep(10 * time.Millisecond) + + // Satisfy the first listener + _ = pushTestJob(t, q, "clownfish", nil, nil) + + // Wait a bit for the listener to process the job + time.Sleep(10 * time.Millisecond) + + // Satisfy the second listener + _ = pushTestJob(t, q, "clownfish", nil, nil) + + wg.Wait() +} + func testCancel(t *testing.T, q jobqueue.JobQueue) { // Cancel a non-existing job err := q.CancelJob(uuid.New())