jobqueue: Add AllRootJobIDs function to jobqueue
This lists the root job UUIDs (the jobs with no dependants). Currently only implemented by fsjobqueue. The function for dbjobqueue currently returns nil. Related: RHEL-60120
This commit is contained in:
parent
d781b8de6d
commit
87c0462a33
4 changed files with 80 additions and 1 deletions
|
|
@ -702,3 +702,27 @@ func jobMatchesCriteria(j *job, acceptedJobTypes []string, acceptedChannels []st
|
||||||
|
|
||||||
return contains(acceptedJobTypes, j.Type) && contains(acceptedChannels, j.Channel)
|
return contains(acceptedJobTypes, j.Type) && contains(acceptedChannels, j.Channel)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AllRootJobIDs Return a list of all the top level(root) job uuids
|
||||||
|
// This only includes jobs without any Dependents set
|
||||||
|
func (q *fsJobQueue) AllRootJobIDs() ([]uuid.UUID, error) {
|
||||||
|
ids, err := q.db.List()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
jobIDs := []uuid.UUID{}
|
||||||
|
for _, id := range ids {
|
||||||
|
var j job
|
||||||
|
exists, err := q.db.Read(id, &j)
|
||||||
|
if err != nil {
|
||||||
|
return jobIDs, err
|
||||||
|
}
|
||||||
|
if !exists || len(j.Dependents) > 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
jobIDs = append(jobIDs, j.Id)
|
||||||
|
}
|
||||||
|
|
||||||
|
return jobIDs, nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,13 +3,15 @@ package fsjobqueue_test
|
||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/osbuild/osbuild-composer/pkg/jobqueue"
|
"github.com/google/uuid"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue"
|
"github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue"
|
||||||
"github.com/osbuild/osbuild-composer/internal/jobqueue/jobqueuetest"
|
"github.com/osbuild/osbuild-composer/internal/jobqueue/jobqueuetest"
|
||||||
|
"github.com/osbuild/osbuild-composer/pkg/jobqueue"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestJobQueueInterface(t *testing.T) {
|
func TestJobQueueInterface(t *testing.T) {
|
||||||
|
|
@ -42,3 +44,46 @@ func TestJobQueueBadJSON(t *testing.T) {
|
||||||
require.Nil(t, err)
|
require.Nil(t, err)
|
||||||
require.NotNil(t, q)
|
require.NotNil(t, q)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func sortUUIDs(entries []uuid.UUID) {
|
||||||
|
sort.Slice(entries, func(i, j int) bool {
|
||||||
|
return entries[i].String() < entries[j].String()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAllRootJobIDs(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
q, err := fsjobqueue.New(dir)
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.NotNil(t, q)
|
||||||
|
|
||||||
|
var rootJobs []uuid.UUID
|
||||||
|
|
||||||
|
// root with no dependencies
|
||||||
|
jidRoot1, err := q.Enqueue("oneRoot", nil, nil, "OneRootJob")
|
||||||
|
require.Nil(t, err)
|
||||||
|
rootJobs = append(rootJobs, jidRoot1)
|
||||||
|
|
||||||
|
// root with 2 dependencies
|
||||||
|
jid1, err := q.Enqueue("twoDeps", nil, nil, "TwoDepJobs")
|
||||||
|
require.Nil(t, err)
|
||||||
|
jid2, err := q.Enqueue("twoDeps", nil, nil, "TwoDepJobs")
|
||||||
|
require.Nil(t, err)
|
||||||
|
jidRoot2, err := q.Enqueue("twoDeps", nil, []uuid.UUID{jid1, jid2}, "TwoDepJobs")
|
||||||
|
require.Nil(t, err)
|
||||||
|
rootJobs = append(rootJobs, jidRoot2)
|
||||||
|
|
||||||
|
// root with 2 dependencies, one shared with the previous root
|
||||||
|
jid3, err := q.Enqueue("sharedDeps", nil, nil, "SharedDepJobs")
|
||||||
|
require.Nil(t, err)
|
||||||
|
jidRoot3, err := q.Enqueue("sharedDeps", nil, []uuid.UUID{jid1, jid3}, "SharedDepJobs")
|
||||||
|
require.Nil(t, err)
|
||||||
|
rootJobs = append(rootJobs, jidRoot3)
|
||||||
|
|
||||||
|
sortUUIDs(rootJobs)
|
||||||
|
roots, err := q.AllRootJobIDs()
|
||||||
|
require.Nil(t, err)
|
||||||
|
require.Greater(t, len(roots), 0)
|
||||||
|
sortUUIDs(roots)
|
||||||
|
require.Equal(t, rootJobs, roots)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -903,3 +903,10 @@ func (q *DBJobQueue) jobDependents(ctx context.Context, conn connection, id uuid
|
||||||
|
|
||||||
return dependents, nil
|
return dependents, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AllRootJobIDs returns a list of top level job UUIDs that the worker knows about
|
||||||
|
func (q *DBJobQueue) AllRootJobIDs() ([]uuid.UUID, error) {
|
||||||
|
// TODO write this
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -95,6 +95,9 @@ type JobQueue interface {
|
||||||
|
|
||||||
// Deletes the worker
|
// Deletes the worker
|
||||||
DeleteWorker(workerID uuid.UUID) error
|
DeleteWorker(workerID uuid.UUID) error
|
||||||
|
|
||||||
|
// AllRootJobIDs returns a list of top level job UUIDs that the worker knows about
|
||||||
|
AllRootJobIDs() ([]uuid.UUID, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SimpleLogger provides a structured logging methods for the jobqueue library.
|
// SimpleLogger provides a structured logging methods for the jobqueue library.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue