fsjobqueue: do not delete empty channels

Previously, we deleted empty channels when a job was dequeued. This is
completely wrong because there still might be some clients waiting for
a job. This commit removes the cleanup and adds a regression test.

Note that this has the potential to leak memory if we ever use a lot of
job types. Currently, we have just handful of them, so this is fine.

Signed-off-by: Ondřej Budai <ondrej@budai.cz>
This commit is contained in:
Ondřej Budai 2021-11-10 18:17:58 +01:00 committed by Sanne Raymaekers
parent a880c9c019
commit 5f4db72777
2 changed files with 39 additions and 9 deletions

View file

@ -21,6 +21,7 @@ import (
"time"
"github.com/google/uuid"
"github.com/osbuild/osbuild-composer/internal/jobqueue"
"github.com/osbuild/osbuild-composer/internal/jsondb"
)
@ -202,15 +203,6 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID,
id, err := selectUUIDChannel(ctx, chans)
q.mu.Lock()
// Delete empty channels
for _, jt := range jobTypes {
c, exists := q.pending[jt]
if exists && len(c) == 0 {
close(c)
delete(q.pending, jt)
}
}
if err != nil {
if errors.As(err, &context.Canceled) || errors.As(err, &context.DeadlineExceeded) {
return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout

View file

@ -6,6 +6,7 @@ package jobqueuetest
import (
"context"
"encoding/json"
"sync"
"testing"
"time"
@ -36,6 +37,7 @@ func TestJobQueue(t *testing.T, makeJobQueue MakeJobQueue) {
t.Run("job-types", wrap(testJobTypes))
t.Run("dependencies", wrap(testDependencies))
t.Run("multiple-workers", wrap(testMultipleWorkers))
t.Run("multiple-workers-single-job-type", wrap(testMultipleWorkersSingleJobType))
t.Run("heartbeats", wrap(testHeartbeats))
t.Run("timeout", wrap(testDequeueTimeout))
}
@ -276,6 +278,42 @@ func testMultipleWorkers(t *testing.T, q jobqueue.JobQueue) {
<-done
}
func testMultipleWorkersSingleJobType(t *testing.T, q jobqueue.JobQueue) {
var wg sync.WaitGroup
wg.Add(2)
// Start two listeners
for i := 0; i < 2; i += 1 {
go func() {
defer wg.Add(-1)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"clownfish"})
require.NoError(t, err)
require.NotEmpty(t, id)
require.NotEmpty(t, tok)
require.Empty(t, deps)
require.Equal(t, "clownfish", typ)
require.Equal(t, json.RawMessage("null"), args)
}()
}
// Increase the likelihood that the above goroutines were scheduled and
// is waiting in Dequeue().
time.Sleep(10 * time.Millisecond)
// Satisfy the first listener
_ = pushTestJob(t, q, "clownfish", nil, nil)
// Wait a bit for the listener to process the job
time.Sleep(10 * time.Millisecond)
// Satisfy the second listener
_ = pushTestJob(t, q, "clownfish", nil, nil)
wg.Wait()
}
func testCancel(t *testing.T, q jobqueue.JobQueue) {
// Cancel a non-existing job
err := q.CancelJob(uuid.New())