From 87c0462a3342e5b5a3fdc5e0a2ff16d47a5ee83a Mon Sep 17 00:00:00 2001 From: "Brian C. Lane" Date: Wed, 10 Apr 2024 11:47:41 -0700 Subject: [PATCH] jobqueue: Add AllRootJobIDs function to jobqueue This lists the root job UUIDs (the jobs with no dependants). Currently only implemented by fsjobqueue. The function for dbjobqueue currently returns nil. Related: RHEL-60120 --- internal/jobqueue/fsjobqueue/fsjobqueue.go | 24 ++++++++++ .../jobqueue/fsjobqueue/fsjobqueue_test.go | 47 ++++++++++++++++++- pkg/jobqueue/dbjobqueue/dbjobqueue.go | 7 +++ pkg/jobqueue/jobqueue.go | 3 ++ 4 files changed, 80 insertions(+), 1 deletion(-) diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index cc73abe51..ed23ce109 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -702,3 +702,27 @@ func jobMatchesCriteria(j *job, acceptedJobTypes []string, acceptedChannels []st return contains(acceptedJobTypes, j.Type) && contains(acceptedChannels, j.Channel) } + +// AllRootJobIDs Return a list of all the top level(root) job uuids +// This only includes jobs without any Dependents set +func (q *fsJobQueue) AllRootJobIDs() ([]uuid.UUID, error) { + ids, err := q.db.List() + if err != nil { + return nil, err + } + + jobIDs := []uuid.UUID{} + for _, id := range ids { + var j job + exists, err := q.db.Read(id, &j) + if err != nil { + return jobIDs, err + } + if !exists || len(j.Dependents) > 0 { + continue + } + jobIDs = append(jobIDs, j.Id) + } + + return jobIDs, nil +} diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go index fe2ce2c3d..bb330a7fb 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go @@ -3,13 +3,15 @@ package fsjobqueue_test import ( "os" "path" + "sort" "testing" - "github.com/osbuild/osbuild-composer/pkg/jobqueue" + "github.com/google/uuid" "github.com/stretchr/testify/require" "github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue" "github.com/osbuild/osbuild-composer/internal/jobqueue/jobqueuetest" + "github.com/osbuild/osbuild-composer/pkg/jobqueue" ) func TestJobQueueInterface(t *testing.T) { @@ -42,3 +44,46 @@ func TestJobQueueBadJSON(t *testing.T) { require.Nil(t, err) require.NotNil(t, q) } + +func sortUUIDs(entries []uuid.UUID) { + sort.Slice(entries, func(i, j int) bool { + return entries[i].String() < entries[j].String() + }) +} + +func TestAllRootJobIDs(t *testing.T) { + dir := t.TempDir() + q, err := fsjobqueue.New(dir) + require.Nil(t, err) + require.NotNil(t, q) + + var rootJobs []uuid.UUID + + // root with no dependencies + jidRoot1, err := q.Enqueue("oneRoot", nil, nil, "OneRootJob") + require.Nil(t, err) + rootJobs = append(rootJobs, jidRoot1) + + // root with 2 dependencies + jid1, err := q.Enqueue("twoDeps", nil, nil, "TwoDepJobs") + require.Nil(t, err) + jid2, err := q.Enqueue("twoDeps", nil, nil, "TwoDepJobs") + require.Nil(t, err) + jidRoot2, err := q.Enqueue("twoDeps", nil, []uuid.UUID{jid1, jid2}, "TwoDepJobs") + require.Nil(t, err) + rootJobs = append(rootJobs, jidRoot2) + + // root with 2 dependencies, one shared with the previous root + jid3, err := q.Enqueue("sharedDeps", nil, nil, "SharedDepJobs") + require.Nil(t, err) + jidRoot3, err := q.Enqueue("sharedDeps", nil, []uuid.UUID{jid1, jid3}, "SharedDepJobs") + require.Nil(t, err) + rootJobs = append(rootJobs, jidRoot3) + + sortUUIDs(rootJobs) + roots, err := q.AllRootJobIDs() + require.Nil(t, err) + require.Greater(t, len(roots), 0) + sortUUIDs(roots) + require.Equal(t, rootJobs, roots) +} diff --git a/pkg/jobqueue/dbjobqueue/dbjobqueue.go b/pkg/jobqueue/dbjobqueue/dbjobqueue.go index 58c34cc23..34074dee7 100644 --- a/pkg/jobqueue/dbjobqueue/dbjobqueue.go +++ b/pkg/jobqueue/dbjobqueue/dbjobqueue.go @@ -903,3 +903,10 @@ func (q *DBJobQueue) jobDependents(ctx context.Context, conn connection, id uuid return dependents, nil } + +// AllRootJobIDs returns a list of top level job UUIDs that the worker knows about +func (q *DBJobQueue) AllRootJobIDs() ([]uuid.UUID, error) { + // TODO write this + + return nil, nil +} diff --git a/pkg/jobqueue/jobqueue.go b/pkg/jobqueue/jobqueue.go index a1c679fcf..28f30df8e 100644 --- a/pkg/jobqueue/jobqueue.go +++ b/pkg/jobqueue/jobqueue.go @@ -95,6 +95,9 @@ type JobQueue interface { // Deletes the worker DeleteWorker(workerID uuid.UUID) error + + // AllRootJobIDs returns a list of top level job UUIDs that the worker knows about + AllRootJobIDs() ([]uuid.UUID, error) } // SimpleLogger provides a structured logging methods for the jobqueue library.