From 8df143fabeb78ee4571192012a1b65ce03fcc6a4 Mon Sep 17 00:00:00 2001 From: Lars Karlitski Date: Sun, 10 May 2020 18:17:05 +0200 Subject: [PATCH] fsjobqueue: pass accepted job types to New() This makes the queue more type safe and allows to get rid of the `pendingChannel` and `pendingChannels` helpers, which only existed to create not-yet-existing pending channels. --- cmd/osbuild-composer/main.go | 2 +- internal/jobqueue/fsjobqueue/fsjobqueue.go | 57 ++++++++----------- .../jobqueue/fsjobqueue/fsjobqueue_test.go | 17 +++--- 3 files changed, 33 insertions(+), 43 deletions(-) diff --git a/cmd/osbuild-composer/main.go b/cmd/osbuild-composer/main.go index 8a7cbeb32..338049fc5 100644 --- a/cmd/osbuild-composer/main.go +++ b/cmd/osbuild-composer/main.go @@ -125,7 +125,7 @@ func main() { log.Fatalf("cannot create queue directory: %v", err) } - jobs, err := fsjobqueue.New(queueDir) + jobs, err := fsjobqueue.New(queueDir, []string{"osbuild"}) if err != nil { log.Fatalf("cannot create jobqueue: %v", err) } diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index 50e773784..ffab2407a 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -35,9 +35,7 @@ type fsJobQueue struct { db *jsondb.JSONDatabase - // Maps job types to channels of job ids for that type. Only access - // through pendingChannel(), which ensures that a map for the given job - // typ exists. + // Maps job types to channels of job ids for that type. pending map[string]chan uuid.UUID // Maps job ids to the jobs that depend on it, if any of those @@ -63,13 +61,17 @@ type job struct { // Create a new fsJobQueue object for `dir`. This object must have exclusive // access to `dir`. If `dir` contains jobs created from previous runs, they are // loaded and rescheduled to run if necessary. -func New(dir string) (*fsJobQueue, error) { +func New(dir string, acceptedJobTypes []string) (*fsJobQueue, error) { q := &fsJobQueue{ db: jsondb.New(dir, 0600), pending: make(map[string]chan uuid.UUID), dependants: make(map[uuid.UUID][]uuid.UUID), } + for _, jt := range acceptedJobTypes { + q.pending[jt] = make(chan uuid.UUID, 100) + } + // Look for jobs that are still pending and build the dependant map. ids, err := q.db.List() if err != nil { @@ -97,6 +99,10 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu q.mu.Lock() defer q.mu.Unlock() + if _, exists := q.pending[jobType]; !exists { + return uuid.Nil, fmt.Errorf("this queue does not accept job type '%s'", jobType) + } + var j = job{ Id: uuid.New(), Type: jobType, @@ -146,7 +152,14 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string, args interf return uuid.Nil, err } - chans := q.pendingChannels(jobTypes) + // Filter q.pending by the `jobTypes`. Ignore those job types that this + // queue doesn't accept. + chans := []chan uuid.UUID{} + for _, jt := range jobTypes { + if c, exists := q.pending[jt]; exists { + chans = append(chans, c) + } + } // Unlock the mutex while polling channels, so that multiple goroutines // can wait at the same time. @@ -278,7 +291,11 @@ func (q *fsJobQueue) maybeEnqueue(j *job, updateDependants bool) error { } if depsFinished { - q.pendingChannel(j.Type) <- j.Id + c, exists := q.pending[j.Type] + if !exists { + return fmt.Errorf("this queue doesn't accept job type '%s'", j.Type) + } + c <- j.Id } else if updateDependants { for _, id := range j.Dependencies { q.dependants[id] = append(q.dependants[id], j.Id) @@ -288,34 +305,6 @@ func (q *fsJobQueue) maybeEnqueue(j *job, updateDependants bool) error { return nil } -// Safe access to the pending channel for `jobType`. Channels are created on -// demand. -func (q *fsJobQueue) pendingChannel(jobType string) chan uuid.UUID { - c, exists := q.pending[jobType] - if !exists { - c = make(chan uuid.UUID, 100) - q.pending[jobType] = c - } - - return c -} - -// Same as pendingChannel(), but for multiple job types. -func (q *fsJobQueue) pendingChannels(jobTypes []string) []chan uuid.UUID { - chans := make([]chan uuid.UUID, len(jobTypes)) - - for i, jt := range jobTypes { - c, exists := q.pending[jt] - if !exists { - c = make(chan uuid.UUID, 100) - q.pending[jt] = c - } - chans[i] = c - } - - return chans -} - // Sorts and removes duplicates from `ids`. func uniqueUUIDList(ids []uuid.UUID) []uuid.UUID { s := map[uuid.UUID]bool{} diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go index 4fb64ebcb..4661964e2 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go @@ -23,11 +23,11 @@ func cleanupTempDir(t *testing.T, dir string) { require.NoError(t, err) } -func newTemporaryQueue(t *testing.T) (jobqueue.JobQueue, string) { +func newTemporaryQueue(t *testing.T, jobTypes []string) (jobqueue.JobQueue, string) { dir, err := ioutil.TempDir("", "jobqueue-test-") require.NoError(t, err) - q, err := fsjobqueue.New(dir) + q, err := fsjobqueue.New(dir, jobTypes) require.NoError(t, err) require.NotNil(t, q) @@ -35,6 +35,7 @@ func newTemporaryQueue(t *testing.T) (jobqueue.JobQueue, string) { } func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID) uuid.UUID { + t.Helper() id, err := q.Enqueue(jobType, args, dependencies) require.NoError(t, err) require.NotEmpty(t, id) @@ -53,13 +54,13 @@ func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result } func TestNonExistant(t *testing.T) { - q, err := fsjobqueue.New("/non-existant-directory") + q, err := fsjobqueue.New("/non-existant-directory", []string{}) require.Error(t, err) require.Nil(t, q) } func TestErrors(t *testing.T) { - q, dir := newTemporaryQueue(t) + q, dir := newTemporaryQueue(t, []string{"test"}) defer cleanupTempDir(t, dir) // not serializable to JSON @@ -79,7 +80,7 @@ func TestArgs(t *testing.T) { S string } - q, dir := newTemporaryQueue(t) + q, dir := newTemporaryQueue(t, []string{"fish", "octopus"}) defer cleanupTempDir(t, dir) oneargs := argument{7, "🐠"} @@ -101,7 +102,7 @@ func TestArgs(t *testing.T) { } func TestJobTypes(t *testing.T) { - q, dir := newTemporaryQueue(t) + q, dir := newTemporaryQueue(t, []string{"octopus", "clownfish"}) defer cleanupTempDir(t, dir) one := pushTestJob(t, q, "octopus", nil, nil) @@ -118,7 +119,7 @@ func TestJobTypes(t *testing.T) { } func TestDependencies(t *testing.T) { - q, dir := newTemporaryQueue(t) + q, dir := newTemporaryQueue(t, []string{"test"}) defer cleanupTempDir(t, dir) t.Run("done-before-pushing-dependant", func(t *testing.T) { @@ -175,7 +176,7 @@ func TestDependencies(t *testing.T) { // Test that a job queue allows parallel access to multiple workers, mainly to // verify the quirky unlocking in Dequeue(). func TestMultipleWorkers(t *testing.T) { - q, dir := newTemporaryQueue(t) + q, dir := newTemporaryQueue(t, []string{"octopus", "clownfish"}) defer cleanupTempDir(t, dir) done := make(chan struct{})