From a6df2877a3c5acf81ae98ff5a004fa5b4adf73a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Budai?= Date: Wed, 11 Nov 2020 13:48:23 +0100 Subject: [PATCH] fsjobqueue: accept jobs of any type MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Soon, we want to begin tagging the jobs with the name of its submitter. The simplest way to add a tag to a job is to put it into its type string. However, as we don't know (and don't want to know) the submitters' names when osbuild-composer is initialized, we need to be able to push arbitrary job types into the jobqueue. This commit therefore lifts the restriction that a jobqueue accepts only a predefined set of job types. Now, jobqueue clients can push jobs of arbitrary names. Signed-off-by: Ondřej Budai --- cmd/osbuild-composer/composer.go | 15 +-------- internal/jobqueue/fsjobqueue/fsjobqueue.go | 31 ++++++++++++------- .../jobqueue/fsjobqueue/fsjobqueue_test.go | 18 +++++------ internal/kojiapi/server_test.go | 2 +- 4 files changed, 30 insertions(+), 36 deletions(-) diff --git a/cmd/osbuild-composer/composer.go b/cmd/osbuild-composer/composer.go index b3bb4b769..f82c13df7 100644 --- a/cmd/osbuild-composer/composer.go +++ b/cmd/osbuild-composer/composer.go @@ -69,20 +69,7 @@ func NewComposer(config *ComposerConfigFile, stateDir, cacheDir string, logger * c.rpm = rpmmd.NewRPMMD(path.Join(c.cacheDir, "rpmmd"), "/usr/libexec/osbuild-composer/dnf-json") - // construct job types of the form osbuild:{arch} and osbuild-koji:{arch} for all arches - jobTypes := []string{"osbuild", "koji-init", "koji-finalize"} - archSet := map[string]bool{} - for _, name := range c.distros.List() { - d := c.distros.GetDistro(name) - for _, arch := range d.ListArches() { - if !archSet[arch] { - archSet[arch] = true - jobTypes = append(jobTypes, "osbuild:"+arch, "osbuild-koji:"+arch) - } - } - } - - jobs, err := fsjobqueue.New(queueDir, jobTypes) + jobs, err := fsjobqueue.New(queueDir) if err != nil { return nil, fmt.Errorf("cannot create jobqueue: %v", err) } diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index e2903a5d3..0ef3b4c35 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -66,17 +66,13 @@ const channelSize = 100 // Create a new fsJobQueue object for `dir`. This object must have exclusive // access to `dir`. If `dir` contains jobs created from previous runs, they are // loaded and rescheduled to run if necessary. -func New(dir string, acceptedJobTypes []string) (*fsJobQueue, error) { +func New(dir string) (*fsJobQueue, error) { q := &fsJobQueue{ db: jsondb.New(dir, 0600), pending: make(map[string]chan uuid.UUID), dependants: make(map[uuid.UUID][]uuid.UUID), } - for _, jt := range acceptedJobTypes { - q.pending[jt] = make(chan uuid.UUID, channelSize) - } - // Look for jobs that are still pending and build the dependant map. ids, err := q.db.List() if err != nil { @@ -104,10 +100,6 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu q.mu.Lock() defer q.mu.Unlock() - if _, exists := q.pending[jobType]; !exists { - return uuid.Nil, fmt.Errorf("this queue does not accept job type '%s'", jobType) - } - var j = job{ Id: uuid.New(), Type: jobType, @@ -161,9 +153,12 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, // queue doesn't accept. chans := []chan uuid.UUID{} for _, jt := range jobTypes { - if c, exists := q.pending[jt]; exists { - chans = append(chans, c) + c, exists := q.pending[jt] + if !exists { + c = make(chan uuid.UUID, channelSize) + q.pending[jt] = c } + chans = append(chans, c) } // Loop until finding a non-canceled job. @@ -175,6 +170,15 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, id, err := selectUUIDChannel(ctx, chans) q.mu.Lock() + // Delete empty channels + for _, jt := range jobTypes { + c, exists := q.pending[jt] + if exists && len(c) == 0 { + close(c) + delete(q.pending, jt) + } + } + if err != nil { return uuid.Nil, nil, "", nil, err } @@ -301,6 +305,8 @@ func (q *fsJobQueue) readJob(id uuid.UUID) (*job, error) { // Enqueue `job` if it is pending and all its dependencies have finished. // Update `q.dependants` if the job was not queued and updateDependants is true // (i.e., when this is a new job). +// `q.mu` must be locked when this method is called. The only exception is +// `New()` because no concurrent calls are possible there. func (q *fsJobQueue) maybeEnqueue(j *job, updateDependants bool) error { if !j.StartedAt.IsZero() { return nil @@ -321,7 +327,8 @@ func (q *fsJobQueue) maybeEnqueue(j *job, updateDependants bool) error { if depsFinished { c, exists := q.pending[j.Type] if !exists { - return fmt.Errorf("this queue doesn't accept job type '%s'", j.Type) + c = make(chan uuid.UUID, channelSize) + q.pending[j.Type] = c } c <- j.Id } else if updateDependants { diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go index 1b72fb600..114e9735d 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go @@ -23,11 +23,11 @@ func cleanupTempDir(t *testing.T, dir string) { require.NoError(t, err) } -func newTemporaryQueue(t *testing.T, jobTypes []string) (jobqueue.JobQueue, string) { +func newTemporaryQueue(t *testing.T) (jobqueue.JobQueue, string) { dir, err := ioutil.TempDir("", "jobqueue-test-") require.NoError(t, err) - q, err := fsjobqueue.New(dir, jobTypes) + q, err := fsjobqueue.New(dir) require.NoError(t, err) require.NotNil(t, q) @@ -57,13 +57,13 @@ func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result } func TestNonExistant(t *testing.T) { - q, err := fsjobqueue.New("/non-existant-directory", []string{}) + q, err := fsjobqueue.New("/non-existant-directory") require.Error(t, err) require.Nil(t, q) } func TestErrors(t *testing.T) { - q, dir := newTemporaryQueue(t, []string{"test"}) + q, dir := newTemporaryQueue(t) defer cleanupTempDir(t, dir) // not serializable to JSON @@ -83,7 +83,7 @@ func TestArgs(t *testing.T) { S string } - q, dir := newTemporaryQueue(t, []string{"fish", "octopus"}) + q, dir := newTemporaryQueue(t) defer cleanupTempDir(t, dir) oneargs := argument{7, "🐠"} @@ -114,7 +114,7 @@ func TestArgs(t *testing.T) { } func TestJobTypes(t *testing.T) { - q, dir := newTemporaryQueue(t, []string{"octopus", "clownfish"}) + q, dir := newTemporaryQueue(t) defer cleanupTempDir(t, dir) one := pushTestJob(t, q, "octopus", nil, nil) @@ -134,7 +134,7 @@ func TestJobTypes(t *testing.T) { } func TestDependencies(t *testing.T) { - q, dir := newTemporaryQueue(t, []string{"test"}) + q, dir := newTemporaryQueue(t) defer cleanupTempDir(t, dir) t.Run("done-before-pushing-dependant", func(t *testing.T) { @@ -205,7 +205,7 @@ func TestDependencies(t *testing.T) { // Test that a job queue allows parallel access to multiple workers, mainly to // verify the quirky unlocking in Dequeue(). func TestMultipleWorkers(t *testing.T) { - q, dir := newTemporaryQueue(t, []string{"octopus", "clownfish"}) + q, dir := newTemporaryQueue(t) defer cleanupTempDir(t, dir) done := make(chan struct{}) @@ -240,7 +240,7 @@ func TestMultipleWorkers(t *testing.T) { } func TestCancel(t *testing.T) { - q, dir := newTemporaryQueue(t, []string{"octopus", "clownfish"}) + q, dir := newTemporaryQueue(t) defer cleanupTempDir(t, dir) // Cancel a non-existing job diff --git a/internal/kojiapi/server_test.go b/internal/kojiapi/server_test.go index 33b5fae9e..f240e9507 100644 --- a/internal/kojiapi/server_test.go +++ b/internal/kojiapi/server_test.go @@ -31,7 +31,7 @@ func newTestKojiServer(t *testing.T, dir string) (*kojiapi.Server, *worker.Serve require.NoError(t, err) require.NotNil(t, distros) - queue, err := fsjobqueue.New(dir, []string{"osbuild:x86_64", "koji-init", "osbuild-koji:x86_64", "koji-finalize"}) + queue, err := fsjobqueue.New(dir) require.NoError(t, err) workerServer := worker.NewServer(nil, queue, "")