fsjobqueue: accept jobs of any type

Soon, we want to begin tagging the jobs with the name of its submitter.
The simplest way to add a tag to a job is to put it into its type string.
However, as we don't know (and don't want to know) the submitters' names when
osbuild-composer is initialized, we need to be able to push arbitrary job
types into the jobqueue.

This commit therefore lifts the restriction that a jobqueue accepts only
a predefined set of job types. Now, jobqueue clients can push jobs of
arbitrary names.

Signed-off-by: Ondřej Budai <ondrej@budai.cz>
This commit is contained in:
Ondřej Budai 2020-11-11 13:48:23 +01:00 committed by Tom Gundersen
parent e007f9964e
commit a6df2877a3
4 changed files with 30 additions and 36 deletions

View file

@ -69,20 +69,7 @@ func NewComposer(config *ComposerConfigFile, stateDir, cacheDir string, logger *
c.rpm = rpmmd.NewRPMMD(path.Join(c.cacheDir, "rpmmd"), "/usr/libexec/osbuild-composer/dnf-json")
// construct job types of the form osbuild:{arch} and osbuild-koji:{arch} for all arches
jobTypes := []string{"osbuild", "koji-init", "koji-finalize"}
archSet := map[string]bool{}
for _, name := range c.distros.List() {
d := c.distros.GetDistro(name)
for _, arch := range d.ListArches() {
if !archSet[arch] {
archSet[arch] = true
jobTypes = append(jobTypes, "osbuild:"+arch, "osbuild-koji:"+arch)
}
}
}
jobs, err := fsjobqueue.New(queueDir, jobTypes)
jobs, err := fsjobqueue.New(queueDir)
if err != nil {
return nil, fmt.Errorf("cannot create jobqueue: %v", err)
}

View file

@ -66,17 +66,13 @@ const channelSize = 100
// Create a new fsJobQueue object for `dir`. This object must have exclusive
// access to `dir`. If `dir` contains jobs created from previous runs, they are
// loaded and rescheduled to run if necessary.
func New(dir string, acceptedJobTypes []string) (*fsJobQueue, error) {
func New(dir string) (*fsJobQueue, error) {
q := &fsJobQueue{
db: jsondb.New(dir, 0600),
pending: make(map[string]chan uuid.UUID),
dependants: make(map[uuid.UUID][]uuid.UUID),
}
for _, jt := range acceptedJobTypes {
q.pending[jt] = make(chan uuid.UUID, channelSize)
}
// Look for jobs that are still pending and build the dependant map.
ids, err := q.db.List()
if err != nil {
@ -104,10 +100,6 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
q.mu.Lock()
defer q.mu.Unlock()
if _, exists := q.pending[jobType]; !exists {
return uuid.Nil, fmt.Errorf("this queue does not accept job type '%s'", jobType)
}
var j = job{
Id: uuid.New(),
Type: jobType,
@ -161,9 +153,12 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID,
// queue doesn't accept.
chans := []chan uuid.UUID{}
for _, jt := range jobTypes {
if c, exists := q.pending[jt]; exists {
chans = append(chans, c)
c, exists := q.pending[jt]
if !exists {
c = make(chan uuid.UUID, channelSize)
q.pending[jt] = c
}
chans = append(chans, c)
}
// Loop until finding a non-canceled job.
@ -175,6 +170,15 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID,
id, err := selectUUIDChannel(ctx, chans)
q.mu.Lock()
// Delete empty channels
for _, jt := range jobTypes {
c, exists := q.pending[jt]
if exists && len(c) == 0 {
close(c)
delete(q.pending, jt)
}
}
if err != nil {
return uuid.Nil, nil, "", nil, err
}
@ -301,6 +305,8 @@ func (q *fsJobQueue) readJob(id uuid.UUID) (*job, error) {
// Enqueue `job` if it is pending and all its dependencies have finished.
// Update `q.dependants` if the job was not queued and updateDependants is true
// (i.e., when this is a new job).
// `q.mu` must be locked when this method is called. The only exception is
// `New()` because no concurrent calls are possible there.
func (q *fsJobQueue) maybeEnqueue(j *job, updateDependants bool) error {
if !j.StartedAt.IsZero() {
return nil
@ -321,7 +327,8 @@ func (q *fsJobQueue) maybeEnqueue(j *job, updateDependants bool) error {
if depsFinished {
c, exists := q.pending[j.Type]
if !exists {
return fmt.Errorf("this queue doesn't accept job type '%s'", j.Type)
c = make(chan uuid.UUID, channelSize)
q.pending[j.Type] = c
}
c <- j.Id
} else if updateDependants {

View file

@ -23,11 +23,11 @@ func cleanupTempDir(t *testing.T, dir string) {
require.NoError(t, err)
}
func newTemporaryQueue(t *testing.T, jobTypes []string) (jobqueue.JobQueue, string) {
func newTemporaryQueue(t *testing.T) (jobqueue.JobQueue, string) {
dir, err := ioutil.TempDir("", "jobqueue-test-")
require.NoError(t, err)
q, err := fsjobqueue.New(dir, jobTypes)
q, err := fsjobqueue.New(dir)
require.NoError(t, err)
require.NotNil(t, q)
@ -57,13 +57,13 @@ func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result
}
func TestNonExistant(t *testing.T) {
q, err := fsjobqueue.New("/non-existant-directory", []string{})
q, err := fsjobqueue.New("/non-existant-directory")
require.Error(t, err)
require.Nil(t, q)
}
func TestErrors(t *testing.T) {
q, dir := newTemporaryQueue(t, []string{"test"})
q, dir := newTemporaryQueue(t)
defer cleanupTempDir(t, dir)
// not serializable to JSON
@ -83,7 +83,7 @@ func TestArgs(t *testing.T) {
S string
}
q, dir := newTemporaryQueue(t, []string{"fish", "octopus"})
q, dir := newTemporaryQueue(t)
defer cleanupTempDir(t, dir)
oneargs := argument{7, "🐠"}
@ -114,7 +114,7 @@ func TestArgs(t *testing.T) {
}
func TestJobTypes(t *testing.T) {
q, dir := newTemporaryQueue(t, []string{"octopus", "clownfish"})
q, dir := newTemporaryQueue(t)
defer cleanupTempDir(t, dir)
one := pushTestJob(t, q, "octopus", nil, nil)
@ -134,7 +134,7 @@ func TestJobTypes(t *testing.T) {
}
func TestDependencies(t *testing.T) {
q, dir := newTemporaryQueue(t, []string{"test"})
q, dir := newTemporaryQueue(t)
defer cleanupTempDir(t, dir)
t.Run("done-before-pushing-dependant", func(t *testing.T) {
@ -205,7 +205,7 @@ func TestDependencies(t *testing.T) {
// Test that a job queue allows parallel access to multiple workers, mainly to
// verify the quirky unlocking in Dequeue().
func TestMultipleWorkers(t *testing.T) {
q, dir := newTemporaryQueue(t, []string{"octopus", "clownfish"})
q, dir := newTemporaryQueue(t)
defer cleanupTempDir(t, dir)
done := make(chan struct{})
@ -240,7 +240,7 @@ func TestMultipleWorkers(t *testing.T) {
}
func TestCancel(t *testing.T) {
q, dir := newTemporaryQueue(t, []string{"octopus", "clownfish"})
q, dir := newTemporaryQueue(t)
defer cleanupTempDir(t, dir)
// Cancel a non-existing job

View file

@ -31,7 +31,7 @@ func newTestKojiServer(t *testing.T, dir string) (*kojiapi.Server, *worker.Serve
require.NoError(t, err)
require.NotNil(t, distros)
queue, err := fsjobqueue.New(dir, []string{"osbuild:x86_64", "koji-init", "osbuild-koji:x86_64", "koji-finalize"})
queue, err := fsjobqueue.New(dir)
require.NoError(t, err)
workerServer := worker.NewServer(nil, queue, "")