jobqueue: introduce the concept of channels

Channels are a concept similar to job types. Callers must specify a channel
name when queueing a new job. A list of channels is also specified when
dequeueing a job. The dequeued job's channel will always be from one of the
specified channel. Of course, the job types are also respected. The dequeued
job will also always be from one of the specified type.

Currently, all calls to jobqueue were changed so all queue operations use
an empty channel name and all dequeue operations use a list containing
an empty channel.

Thus, this is a non-functional change.

Signed-off-by: Ondřej Budai <ondrej@budai.cz>
This commit is contained in:
Ondřej Budai 2022-03-06 11:59:56 +01:00 committed by Ondřej Budai
parent ba4f49078a
commit 7bfcee36f8
15 changed files with 202 additions and 176 deletions

View file

@ -82,19 +82,19 @@ func testJobsUptoByType(t *testing.T, q *dbjobqueue.DBJobQueue) {
date85 := time.Date(1985, time.January, 1, 0, 0, 0, 0, time.UTC)
date90 := time.Date(1990, time.January, 1, 0, 0, 0, 0, time.UTC)
id80, err := q.Enqueue("octopus", nil, nil)
id80, err := q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id80)
_,_,_,_,_, err = q.Dequeue(context.Background(), []string{"octopus"})
_, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""})
require.NoError(t, err)
err = q.FinishJob(id80, nil)
require.NoError(t, err)
setFinishedAt(t, q, id80, date80)
id85, err := q.Enqueue("octopus", nil, nil)
id85, err := q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id85)
_,_,_,_,_, err = q.Dequeue(context.Background(), []string{"octopus"})
_, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""})
require.NoError(t, err)
err = q.FinishJob(id85, nil)
require.NoError(t, err)
@ -111,65 +111,65 @@ func testJobsUptoByType(t *testing.T, q *dbjobqueue.DBJobQueue) {
func testDeleteJobAndDependencies(t *testing.T, q *dbjobqueue.DBJobQueue) {
// id1 -> id2 -> id3
id1, err := q.Enqueue("octopus", nil, nil)
id1, err := q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id1)
id2, err := q.Enqueue("octopus", nil, []uuid.UUID{id1})
id2, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id2)
id3, err := q.Enqueue("octopus", nil, []uuid.UUID{id2})
id3, err := q.Enqueue("octopus", nil, []uuid.UUID{id2}, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id3)
c1, err := q.Enqueue("octopus", nil, nil)
c1, err := q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, c1)
c2, err := q.Enqueue("octopus", nil, []uuid.UUID{c1})
c2, err := q.Enqueue("octopus", nil, []uuid.UUID{c1}, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, c2)
c3, err := q.Enqueue("octopus", nil, []uuid.UUID{c2})
c3, err := q.Enqueue("octopus", nil, []uuid.UUID{c2}, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, c3)
controls := []uuid.UUID{c1, c2, c3}
_,_,_, err = q.Job(c1)
_, _, _, _, err = q.Job(c1)
require.NoError(t, err)
require.NoError(t, q.DeleteJobIncludingDependencies(id3))
for _, id := range []uuid.UUID{id1, id2, id3} {
_,_,_, err = q.Job(id)
_, _, _, _, err = q.Job(id)
require.ErrorIs(t, err, jobqueue.ErrNotExist)
}
// controls should still exist
for _, c := range controls {
_,_,_, err = q.Job(c)
_, _, _, _, err = q.Job(c)
require.NoError(t, err)
}
// id1 -> id2 -> id4 && id3 -> id4
id1, err = q.Enqueue("octopus", nil, nil)
id1, err = q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id1)
id2, err = q.Enqueue("octopus", nil, []uuid.UUID{id1})
id2, err = q.Enqueue("octopus", nil, []uuid.UUID{id1}, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id2)
id3, err = q.Enqueue("octopus", nil, nil)
id3, err = q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id3)
id4, err := q.Enqueue("octopus", nil, []uuid.UUID{id2, id3})
id4, err := q.Enqueue("octopus", nil, []uuid.UUID{id2, id3}, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id4)
require.NoError(t, q.DeleteJobIncludingDependencies(id4))
for _, id := range []uuid.UUID{id1, id2, id3, id4} {
_,_,_, err = q.Job(id)
_, _, _, _, err = q.Job(id)
require.ErrorIs(t, err, jobqueue.ErrNotExist)
}
// controls should still exist
for _, c := range controls {
_,_,_, err = q.Job(c)
_, _, _, _, err = q.Job(c)
require.NoError(t, err)
}
@ -177,45 +177,44 @@ func testDeleteJobAndDependencies(t *testing.T, q *dbjobqueue.DBJobQueue) {
// situation as it does not occur in the service. This should be changed once we allow
// multiple build job per depsolve job, and the depsolve job should only be removed once all
// the build jobs have been dealt with.
id1, err = q.Enqueue("octopus", nil, nil)
id1, err = q.Enqueue("octopus", nil, nil, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id1)
id2a, err := q.Enqueue("octopus", nil, []uuid.UUID{id1})
id2a, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id2a)
id2b, err := q.Enqueue("octopus", nil, []uuid.UUID{id1})
id2b, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id2b)
id3, err = q.Enqueue("octopus", nil, []uuid.UUID{id2a})
id3, err = q.Enqueue("octopus", nil, []uuid.UUID{id2a}, "")
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, id3)
require.NoError(t, q.DeleteJobIncludingDependencies(id3))
for _, id := range []uuid.UUID{id1, id2a, id3} {
_,_,_, err = q.Job(id)
_, _, _, _, err = q.Job(id)
require.ErrorIs(t, err, jobqueue.ErrNotExist)
}
// id2b still exists
_,_,_, err = q.Job(id2b)
_, _, _, _, err = q.Job(id2b)
require.NoError(t, err)
// id2b can still be deleted with it's dependencies missing
require.NoError(t, q.DeleteJobIncludingDependencies(id2b))
_,_,_, err = q.Job(id2b)
_, _, _, _, err = q.Job(id2b)
require.ErrorIs(t, err, jobqueue.ErrNotExist)
// controls should still exist
for _, c := range controls {
_,_,_, err = q.Job(c)
_, _, _, _, err = q.Job(c)
require.NoError(t, err)
}
require.NoError(t, q.DeleteJobIncludingDependencies(uuid.Nil))
// controls should still exist
for _, c := range controls {
_,_,_, err = q.Job(c)
_, _, _, _, err = q.Job(c)
require.NoError(t, err)
}
}

View file

@ -465,12 +465,12 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
var id uuid.UUID
if request.Koji != nil {
id, err = enqueueKojiCompose(h.server.workers, uint64(request.Koji.TaskId), request.Koji.Server, request.Koji.Name, request.Koji.Version, request.Koji.Release, distribution, bp, manifestSeed, irs)
id, err = enqueueKojiCompose(h.server.workers, uint64(request.Koji.TaskId), request.Koji.Server, request.Koji.Name, request.Koji.Version, request.Koji.Release, distribution, bp, manifestSeed, irs, "")
if err != nil {
return err
}
} else {
id, err = enqueueCompose(h.server.workers, distribution, bp, manifestSeed, irs)
id, err = enqueueCompose(h.server.workers, distribution, bp, manifestSeed, irs, "")
if err != nil {
return err
}
@ -488,7 +488,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
})
}
func enqueueCompose(workers *worker.Server, distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest) (uuid.UUID, error) {
func enqueueCompose(workers *worker.Server, distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) {
var id uuid.UUID
if len(irs) != 1 {
return id, HTTPError(ErrorInvalidNumberOfImageBuilds)
@ -502,12 +502,12 @@ func enqueueCompose(workers *worker.Server, distribution distro.Distro, bp bluep
Arch: ir.arch.Name(),
Releasever: distribution.Releasever(),
PackageSetsRepos: ir.packageSetsRepositories,
})
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
manifestJobID, err := workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobID)
manifestJobID, err := workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobID, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
@ -519,7 +519,7 @@ func enqueueCompose(workers *worker.Server, distribution distro.Distro, bp bluep
Build: ir.imageType.BuildPipelines(),
Payload: ir.imageType.PayloadPipelines(),
},
}, manifestJobID)
}, manifestJobID, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
@ -530,7 +530,7 @@ func enqueueCompose(workers *worker.Server, distribution distro.Distro, bp bluep
return id, nil
}
func enqueueKojiCompose(workers *worker.Server, taskID uint64, server, name, version, release string, distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest) (uuid.UUID, error) {
func enqueueKojiCompose(workers *worker.Server, taskID uint64, server, name, version, release string, distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) {
var id uuid.UUID
kojiDirectory := "osbuild-composer-koji-" + uuid.New().String()
@ -539,7 +539,7 @@ func enqueueKojiCompose(workers *worker.Server, taskID uint64, server, name, ver
Name: name,
Version: version,
Release: release,
})
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
@ -554,12 +554,12 @@ func enqueueKojiCompose(workers *worker.Server, taskID uint64, server, name, ver
Arch: ir.arch.Name(),
Releasever: distribution.Releasever(),
PackageSetsRepos: ir.packageSetsRepositories,
})
}, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
manifestJobID, err := workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobID)
manifestJobID, err := workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobID, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
@ -581,7 +581,7 @@ func enqueueKojiCompose(workers *worker.Server, taskID uint64, server, name, ver
KojiServer: server,
KojiDirectory: kojiDirectory,
KojiFilename: kojiFilename,
}, manifestJobID, initID)
}, manifestJobID, initID, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
@ -600,7 +600,7 @@ func enqueueKojiCompose(workers *worker.Server, taskID uint64, server, name, ver
KojiDirectory: kojiDirectory,
TaskID: taskID,
StartTime: uint64(time.Now().Unix()),
}, initID, buildIDs)
}, initID, buildIDs, channel)
if err != nil {
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}

View file

@ -12,6 +12,8 @@ import (
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
v2 "github.com/osbuild/osbuild-composer/internal/cloudapi/v2"
"github.com/osbuild/osbuild-composer/internal/distro/test_distro"
"github.com/osbuild/osbuild-composer/internal/kojiapi/api"
@ -19,7 +21,6 @@ import (
"github.com/osbuild/osbuild-composer/internal/test"
"github.com/osbuild/osbuild-composer/internal/worker"
"github.com/osbuild/osbuild-composer/internal/worker/clienterrors"
"github.com/stretchr/testify/require"
)
type jobResult struct {
@ -368,7 +369,7 @@ func TestKojiCompose(t *testing.T) {
c.composeReplyCode, c.composeReply, "id", "operation_id")
// handle koji-init
_, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"koji-init"})
_, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"koji-init"}, []string{""})
require.NoError(t, err)
require.Equal(t, "koji-init", jobType)
@ -386,7 +387,7 @@ func TestKojiCompose(t *testing.T) {
fmt.Sprintf(`{"href":"/api/worker/v1/jobs/%v","id":"%v","kind":"UpdateJobResponse"}`, token, token))
// handle osbuild-koji #1
_, token, jobType, rawJob, _, err = workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild-koji"})
_, token, jobType, rawJob, _, err = workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild-koji"}, []string{""})
require.NoError(t, err)
require.Equal(t, "osbuild-koji", jobType)
@ -403,7 +404,7 @@ func TestKojiCompose(t *testing.T) {
fmt.Sprintf(`{"href":"/api/worker/v1/jobs/%v","id":"%v","kind":"UpdateJobResponse"}`, token, token))
// handle osbuild-koji #2
_, token, jobType, rawJob, _, err = workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild-koji"})
_, token, jobType, rawJob, _, err = workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild-koji"}, []string{""})
require.NoError(t, err)
require.Equal(t, "osbuild-koji", jobType)
@ -427,7 +428,7 @@ func TestKojiCompose(t *testing.T) {
fmt.Sprintf(`{"href":"/api/worker/v1/jobs/%v","id":"%v","kind":"UpdateJobResponse"}`, token, token))
// handle koji-finalize
finalizeID, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"koji-finalize"})
finalizeID, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"koji-finalize"}, []string{""})
require.NoError(t, err)
require.Equal(t, "koji-finalize", jobType)
@ -510,7 +511,7 @@ func TestKojiJobTypeValidation(t *testing.T) {
Version: "42",
Release: "1",
}
initID, err := workers.EnqueueKojiInit(&initJob)
initID, err := workers.EnqueueKojiInit(&initJob, "")
require.NoError(t, err)
buildJobs := make([]worker.OSBuildKojiJob, nImages)
@ -524,7 +525,7 @@ func TestKojiJobTypeValidation(t *testing.T) {
KojiDirectory: "koji-server-test-dir",
KojiFilename: fname,
}
buildID, err := workers.EnqueueOSBuildKoji(fmt.Sprintf("fake-arch-%d", idx), &buildJob, initID)
buildID, err := workers.EnqueueOSBuildKoji(fmt.Sprintf("fake-arch-%d", idx), &buildJob, initID, "")
require.NoError(t, err)
buildJobs[idx] = buildJob
@ -542,7 +543,7 @@ func TestKojiJobTypeValidation(t *testing.T) {
TaskID: 0,
StartTime: uint64(time.Now().Unix()),
}
finalizeID, err := workers.EnqueueKojiFinalize(&finalizeJob, initID, buildJobIDs)
finalizeID, err := workers.EnqueueKojiFinalize(&finalizeJob, initID, buildJobIDs, "")
require.NoError(t, err)
// ----- Jobs queued - Test API endpoints (status, manifests, logs) ----- //

View file

@ -39,7 +39,7 @@ func newV2Server(t *testing.T, dir string) (*v2.Server, *worker.Server, context.
depsolveContext, cancel := context.WithCancel(context.Background())
go func() {
for {
_, token, _, _, _, err := workerServer.RequestJob(context.Background(), test_distro.TestDistroName, []string{"depsolve"})
_, token, _, _, _, err := workerServer.RequestJob(context.Background(), test_distro.TestDistroName, []string{"depsolve"}, []string{""})
if err != nil {
continue
}
@ -575,7 +575,7 @@ func TestComposeStatusSuccess(t *testing.T) {
"kind": "ComposeId"
}`, "id")
jobId, token, jobType, args, dynArgs, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"})
jobId, token, jobType, args, dynArgs, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"}, []string{""})
require.NoError(t, err)
require.Equal(t, "osbuild", jobType)
@ -647,7 +647,7 @@ func TestComposeStatusFailure(t *testing.T) {
"kind": "ComposeId"
}`, "id")
jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"})
jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"}, []string{""})
require.NoError(t, err)
require.Equal(t, "osbuild", jobType)
@ -699,7 +699,7 @@ func TestComposeLegacyError(t *testing.T) {
"kind": "ComposeId"
}`, "id")
jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"})
jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"}, []string{""})
require.NoError(t, err)
require.Equal(t, "osbuild", jobType)
@ -754,7 +754,7 @@ func TestComposeJobError(t *testing.T) {
"kind": "ComposeId"
}`, "id")
jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"})
jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"}, []string{""})
require.NoError(t, err)
require.Equal(t, "osbuild", jobType)

View file

@ -28,7 +28,7 @@ const (
sqlListen = `LISTEN jobs`
sqlUnlisten = `UNLISTEN jobs`
sqlEnqueue = `INSERT INTO jobs(id, type, args, queued_at) VALUES ($1, $2, $3, NOW())`
sqlEnqueue = `INSERT INTO jobs(id, type, args, queued_at, channel) VALUES ($1, $2, $3, NOW(), $4)`
sqlDequeue = `
UPDATE jobs
SET token = $1, started_at = now()
@ -37,7 +37,7 @@ const (
FROM ready_jobs
-- use ANY here, because "type in ()" doesn't work with bound parameters
-- literal syntax for this is '{"a", "b"}': https://www.postgresql.org/docs/13/arrays.html
WHERE type = ANY($2)
WHERE type = ANY($2) AND channel = ANY($3)
LIMIT 1
FOR UPDATE SKIP LOCKED
)
@ -62,7 +62,7 @@ const (
WHERE job_id = $1`
sqlQueryJob = `
SELECT type, args, started_at, finished_at, canceled
SELECT type, args, channel, started_at, finished_at, canceled
FROM jobs
WHERE id = $1`
sqlQueryJobStatus = `
@ -141,7 +141,7 @@ func (q *DBJobQueue) Close() {
q.pool.Close()
}
func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID) (uuid.UUID, error) {
func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error) {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return uuid.Nil, fmt.Errorf("error connecting to database: %v", err)
@ -160,7 +160,7 @@ func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
}()
id := uuid.New()
_, err = conn.Exec(context.Background(), sqlEnqueue, id, jobType, args)
_, err = conn.Exec(context.Background(), sqlEnqueue, id, jobType, args, channel)
if err != nil {
return uuid.Nil, fmt.Errorf("error enqueuing job: %v", err)
}
@ -187,7 +187,7 @@ func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
return id, nil
}
func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
// Return early if the context is already canceled.
if err := ctx.Err(); err != nil {
return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
@ -216,7 +216,7 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID,
var started, queued *time.Time
token := uuid.New()
for {
err = conn.QueryRow(ctx, sqlDequeue, token, jobTypes).Scan(&id, &token, &jobType, &args, &queued, &started)
err = conn.QueryRow(ctx, sqlDequeue, token, jobTypes, channels).Scan(&id, &token, &jobType, &args, &queued, &started)
if err == nil {
break
}
@ -310,7 +310,7 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
var started, finished *time.Time
var jobType string
canceled := false
err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, nil, &started, &finished, &canceled)
err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, nil, nil, &started, &finished, &canceled)
if err == pgx.ErrNoRows {
return jobqueue.ErrNotExist
}
@ -409,14 +409,14 @@ func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, result json.RawMes
}
// Job returns all the parameters that define a job (everything provided during Enqueue).
func (q *DBJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, err error) {
func (q *DBJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, channel string, err error) {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return
}
defer conn.Release()
err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, &args, nil, nil, nil)
err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, &args, &channel, nil, nil, nil)
if err == pgx.ErrNoRows {
err = jobqueue.ErrNotExist
return

View file

@ -0,0 +1,15 @@
ALTER TABLE jobs
ADD COLUMN channel varchar NOT NULL DEFAULT '';
-- We added a column, thus we have to recreate the view.
CREATE OR REPLACE VIEW ready_jobs AS
SELECT *
FROM jobs
WHERE started_at IS NULL
AND canceled = FALSE
AND id NOT IN (
SELECT job_id
FROM job_dependencies JOIN jobs ON dependency_id = id
WHERE finished_at IS NULL
)
ORDER BY queued_at ASC

View file

@ -67,6 +67,7 @@ type job struct {
Args json.RawMessage `json:"args,omitempty"`
Dependencies []uuid.UUID `json:"dependencies"`
Result json.RawMessage `json:"result,omitempty"`
Channel string `json:"channel"`
QueuedAt time.Time `json:"queued_at,omitempty"`
StartedAt time.Time `json:"started_at,omitempty"`
@ -127,7 +128,7 @@ func New(dir string) (*fsJobQueue, error) {
return q, nil
}
func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID) (uuid.UUID, error) {
func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error) {
q.mu.Lock()
defer q.mu.Unlock()
@ -137,6 +138,7 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
Type: jobType,
Dependencies: dependencies,
QueuedAt: time.Now(),
Channel: channel,
}
var err error
@ -172,7 +174,7 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
return j.Id, nil
}
func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
q.mu.Lock()
defer q.mu.Unlock()
@ -191,7 +193,7 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID,
for {
var found bool
var err error
j, found, err = q.dequeueSuitableJob(jobTypes)
j, found, err = q.dequeueSuitableJob(jobTypes, channels)
if err != nil {
return uuid.Nil, uuid.Nil, nil, "", nil, err
}
@ -358,7 +360,7 @@ func (q *fsJobQueue) JobStatus(id uuid.UUID) (jobType string, result json.RawMes
return
}
func (q *fsJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, err error) {
func (q *fsJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, channel string, err error) {
j, err := q.readJob(id)
if err != nil {
return
@ -367,6 +369,7 @@ func (q *fsJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, de
jobType = j.Type
args = j.Args
dependencies = j.Dependencies
channel = j.Channel
return
}
@ -476,10 +479,11 @@ func (q *fsJobQueue) hasAllFinishedDependencies(j *job) (bool, error) {
// - must be pending
// - its dependencies must be finished
// - must be of one of the type from jobTypes
// - must be of one of the channel from channels
//
// If a suitable job is not found, false is returned.
// If an error occurs during the search, it's returned.
func (q *fsJobQueue) dequeueSuitableJob(jobTypes []string) (*job, bool, error) {
func (q *fsJobQueue) dequeueSuitableJob(jobTypes []string, channels []string) (*job, bool, error) {
el := q.pending.Front()
for el != nil {
id := el.Value.(uuid.UUID)
@ -489,7 +493,7 @@ func (q *fsJobQueue) dequeueSuitableJob(jobTypes []string) (*job, bool, error) {
return nil, false, err
}
if !jobMatchesCriteria(j, jobTypes) {
if !jobMatchesCriteria(j, jobTypes, channels) {
el = el.Next()
continue
}
@ -527,12 +531,17 @@ func (q *fsJobQueue) removePendingJob(id uuid.UUID) {
//
// Criteria:
// - the job's type is one of the acceptedJobTypes
func jobMatchesCriteria(j *job, acceptedJobTypes []string) bool {
for _, jt := range acceptedJobTypes {
if jt == j.Type {
return true
// - the job's channel is one of the acceptedChannels
func jobMatchesCriteria(j *job, acceptedJobTypes []string, acceptedChannels []string) bool {
contains := func(slice []string, str string) bool {
for _, item := range slice {
if str == item {
return true
}
}
return false
}
return false
return contains(acceptedJobTypes, j.Type) && contains(acceptedChannels, j.Channel)
}

View file

@ -32,16 +32,16 @@ type JobQueue interface {
// have finished.
//
// Returns the id of the new job, or an error.
Enqueue(jobType string, args interface{}, dependencies []uuid.UUID) (uuid.UUID, error)
Enqueue(jobType string, args interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error)
// Dequeues a job, blocking until one is available.
//
// Waits until a job with a type of any of `jobTypes` is available, or `ctx` is
// canceled.
// Waits until a job with a type of any of `jobTypes` and any of `channels`
// is available, or `ctx` is canceled.
//
// Returns the job's id, token, dependencies, type, and arguments, or an error. Arguments
// can be unmarshaled to the type given in Enqueue().
Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error)
Dequeue(ctx context.Context, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error)
// Dequeues a pending job by its ID in a non-blocking way.
//
@ -67,7 +67,7 @@ type JobQueue interface {
JobStatus(id uuid.UUID) (jobType string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error)
// Job returns all the parameters that define a job (everything provided during Enqueue).
Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, err error)
Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, channel string, err error)
// Find job by token, this will return an error if the job hasn't been dequeued
IdFromToken(token uuid.UUID) (id uuid.UUID, err error)

View file

@ -45,14 +45,14 @@ func TestJobQueue(t *testing.T, makeJobQueue MakeJobQueue) {
func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID) uuid.UUID {
t.Helper()
id, err := q.Enqueue(jobType, args, dependencies)
id, err := q.Enqueue(jobType, args, dependencies, "")
require.NoError(t, err)
require.NotEmpty(t, id)
return id
}
func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result interface{}, deps []uuid.UUID) uuid.UUID {
id, tok, d, typ, args, err := q.Dequeue(context.Background(), []string{jobType})
id, tok, d, typ, args, err := q.Dequeue(context.Background(), []string{jobType}, []string{""})
require.NoError(t, err)
require.NotEmpty(t, id)
require.NotEmpty(t, tok)
@ -68,18 +68,18 @@ func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result
func testErrors(t *testing.T, q jobqueue.JobQueue) {
// not serializable to JSON
id, err := q.Enqueue("test", make(chan string), nil)
id, err := q.Enqueue("test", make(chan string), nil, "")
require.Error(t, err)
require.Equal(t, uuid.Nil, id)
// invalid dependency
id, err = q.Enqueue("test", "arg0", []uuid.UUID{uuid.New()})
id, err = q.Enqueue("test", "arg0", []uuid.UUID{uuid.New()}, "")
require.Error(t, err)
require.Equal(t, uuid.Nil, id)
// token gets removed
pushTestJob(t, q, "octopus", nil, nil)
id, tok, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"})
id, tok, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{""})
require.NoError(t, err)
require.NotEmpty(t, tok)
@ -110,7 +110,7 @@ func testArgs(t *testing.T, q jobqueue.JobQueue) {
var parsedArgs argument
id, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"octopus"})
id, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{""})
require.NoError(t, err)
require.Equal(t, two, id)
require.NotEmpty(t, tok)
@ -121,13 +121,13 @@ func testArgs(t *testing.T, q jobqueue.JobQueue) {
require.Equal(t, twoargs, parsedArgs)
// Read job params after Dequeue
jtype, jargs, jdeps, err := q.Job(id)
jtype, jargs, jdeps, _, err := q.Job(id)
require.NoError(t, err)
require.Equal(t, args, jargs)
require.Equal(t, deps, jdeps)
require.Equal(t, typ, jtype)
id, tok, deps, typ, args, err = q.Dequeue(context.Background(), []string{"fish"})
id, tok, deps, typ, args, err = q.Dequeue(context.Background(), []string{"fish"}, []string{""})
require.NoError(t, err)
require.Equal(t, one, id)
require.NotEmpty(t, tok)
@ -137,13 +137,13 @@ func testArgs(t *testing.T, q jobqueue.JobQueue) {
require.NoError(t, err)
require.Equal(t, oneargs, parsedArgs)
jtype, jargs, jdeps, err = q.Job(id)
jtype, jargs, jdeps, _, err = q.Job(id)
require.NoError(t, err)
require.Equal(t, args, jargs)
require.Equal(t, deps, jdeps)
require.Equal(t, typ, jtype)
_, _, _, err = q.Job(uuid.New())
_, _, _, _, err = q.Job(uuid.New())
require.Error(t, err)
}
@ -156,7 +156,7 @@ func testJobTypes(t *testing.T, q jobqueue.JobQueue) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"zebra"})
id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"zebra"}, []string{""})
require.Equal(t, err, jobqueue.ErrDequeueTimeout)
require.Equal(t, uuid.Nil, id)
require.Equal(t, uuid.Nil, tok)
@ -168,12 +168,12 @@ func testJobTypes(t *testing.T, q jobqueue.JobQueue) {
func testDequeueTimeout(t *testing.T, q jobqueue.JobQueue) {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*20)
defer cancel()
_, _, _, _, _, err := q.Dequeue(ctx, []string{"octopus"})
_, _, _, _, _, err := q.Dequeue(ctx, []string{"octopus"}, []string{""})
require.Equal(t, jobqueue.ErrDequeueTimeout, err)
ctx2, cancel2 := context.WithCancel(context.Background())
cancel2()
_, _, _, _, _, err = q.Dequeue(ctx2, []string{"octopus"})
_, _, _, _, _, err = q.Dequeue(ctx2, []string{"octopus"}, []string{""})
require.Equal(t, jobqueue.ErrDequeueTimeout, err)
}
@ -255,7 +255,7 @@ func testMultipleWorkers(t *testing.T, q jobqueue.JobQueue) {
defer close(done)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"octopus"})
id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"octopus"}, []string{""})
require.NoError(t, err)
require.NotEmpty(t, id)
require.NotEmpty(t, tok)
@ -270,7 +270,7 @@ func testMultipleWorkers(t *testing.T, q jobqueue.JobQueue) {
// This call to Dequeue() should not block on the one in the goroutine.
id := pushTestJob(t, q, "clownfish", nil, nil)
r, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"})
r, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}, []string{""})
require.NoError(t, err)
require.Equal(t, id, r)
require.NotEmpty(t, tok)
@ -293,7 +293,7 @@ func testMultipleWorkersSingleJobType(t *testing.T, q jobqueue.JobQueue) {
defer wg.Add(-1)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"clownfish"})
id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"clownfish"}, []string{""})
require.NoError(t, err)
require.NotEmpty(t, id)
require.NotEmpty(t, tok)
@ -340,7 +340,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) {
// Cancel a running job, which should not dequeue the canceled job from above
id = pushTestJob(t, q, "clownfish", nil, nil)
require.NotEmpty(t, id)
r, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"})
r, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}, []string{""})
require.NoError(t, err)
require.Equal(t, id, r)
require.NotEmpty(t, tok)
@ -360,7 +360,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) {
// Cancel a finished job, which is a no-op
id = pushTestJob(t, q, "clownfish", nil, nil)
require.NotEmpty(t, id)
r, tok, deps, typ, args, err = q.Dequeue(context.Background(), []string{"clownfish"})
r, tok, deps, typ, args, err = q.Dequeue(context.Background(), []string{"clownfish"}, []string{""})
require.NoError(t, err)
require.Equal(t, id, r)
require.NotEmpty(t, tok)
@ -385,7 +385,7 @@ func testHeartbeats(t *testing.T, q jobqueue.JobQueue) {
// No heartbeats for queued job
require.Empty(t, q.Heartbeats(time.Second*0))
r, tok, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"})
r, tok, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{""})
require.NoError(t, err)
require.Equal(t, id, r)
require.NotEmpty(t, tok)
@ -447,7 +447,7 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) {
t.Run("cannot dequeue a non-pending job", func(t *testing.T) {
one := pushTestJob(t, q, "octopus", nil, nil)
_, _, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"})
_, _, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{""})
require.NoError(t, err)
_, _, _, _, err = q.DequeueByID(context.Background(), one)

View file

@ -161,7 +161,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
Name: request.Name,
Version: request.Version,
Release: request.Release,
})
}, "")
if err != nil {
// This is a programming error.
panic(err)
@ -177,7 +177,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
KojiServer: request.Koji.Server,
KojiDirectory: kojiDirectory,
KojiFilename: kojiFilenames[i],
}, initID)
}, initID, "")
if err != nil {
// This is a programming error.
panic(err)
@ -194,7 +194,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
KojiDirectory: kojiDirectory,
TaskID: uint64(request.Koji.TaskId),
StartTime: uint64(time.Now().Unix()),
}, initID, buildIDs)
}, initID, buildIDs, "")
if err != nil {
// This is a programming error.
panic(err)

View file

@ -14,6 +14,8 @@ import (
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/osbuild/osbuild-composer/internal/distro/test_distro"
"github.com/osbuild/osbuild-composer/internal/kojiapi"
"github.com/osbuild/osbuild-composer/internal/kojiapi/api"
@ -23,7 +25,6 @@ import (
"github.com/osbuild/osbuild-composer/internal/test"
"github.com/osbuild/osbuild-composer/internal/worker"
"github.com/osbuild/osbuild-composer/internal/worker/clienterrors"
"github.com/stretchr/testify/require"
)
func newTestKojiServer(t *testing.T, dir string) (*kojiapi.Server, *worker.Server) {
@ -334,7 +335,7 @@ func TestCompose(t *testing.T) {
wg.Add(1)
go func(t *testing.T, result worker.KojiInitJobResult) {
_, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"koji-init"})
_, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"koji-init"}, []string{""})
require.NoError(t, err)
require.Equal(t, "koji-init", jobType)
@ -387,7 +388,7 @@ func TestCompose(t *testing.T) {
c.composeReplyCode, c.composeReply, "id")
wg.Wait()
_, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"osbuild-koji"})
_, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"osbuild-koji"}, []string{""})
require.NoError(t, err)
require.Equal(t, "osbuild-koji", jobType)
@ -403,7 +404,7 @@ func TestCompose(t *testing.T) {
test.TestRoute(t, workerHandler, false, "PATCH", fmt.Sprintf("/api/worker/v1/jobs/%v", token), string(buildJobResult), http.StatusOK,
fmt.Sprintf(`{"href":"/api/worker/v1/jobs/%v","id":"%v","kind":"UpdateJobResponse"}`, token, token))
_, token, jobType, rawJob, _, err = workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"osbuild-koji"})
_, token, jobType, rawJob, _, err = workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"osbuild-koji"}, []string{""})
require.NoError(t, err)
require.Equal(t, "osbuild-koji", jobType)
@ -426,7 +427,7 @@ func TestCompose(t *testing.T) {
}`, test_distro.TestArchName, test_distro.TestDistroName), http.StatusOK,
fmt.Sprintf(`{"href":"/api/worker/v1/jobs/%v","id":"%v","kind":"UpdateJobResponse"}`, token, token))
finalizeID, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"koji-finalize"})
finalizeID, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"koji-finalize"}, []string{""})
require.NoError(t, err)
require.Equal(t, "koji-finalize", jobType)
@ -507,7 +508,7 @@ func TestJobTypeValidation(t *testing.T) {
Version: "42",
Release: "1",
}
initID, err := workers.EnqueueKojiInit(&initJob)
initID, err := workers.EnqueueKojiInit(&initJob, "")
require.NoError(t, err)
buildJobs := make([]worker.OSBuildKojiJob, nImages)
@ -521,7 +522,7 @@ func TestJobTypeValidation(t *testing.T) {
KojiDirectory: "koji-server-test-dir",
KojiFilename: fname,
}
buildID, err := workers.EnqueueOSBuildKoji(fmt.Sprintf("fake-arch-%d", idx), &buildJob, initID)
buildID, err := workers.EnqueueOSBuildKoji(fmt.Sprintf("fake-arch-%d", idx), &buildJob, initID, "")
require.NoError(t, err)
buildJobs[idx] = buildJob
@ -539,7 +540,7 @@ func TestJobTypeValidation(t *testing.T) {
TaskID: 0,
StartTime: uint64(time.Now().Unix()),
}
finalizeID, err := workers.EnqueueKojiFinalize(&finalizeJob, initID, buildJobIDs)
finalizeID, err := workers.EnqueueKojiFinalize(&finalizeJob, initID, buildJobIDs, "")
require.NoError(t, err)
// ----- Jobs queued - Test API endpoints (status, manifests, logs) ----- //

View file

@ -2337,7 +2337,7 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request
Build: imageType.BuildPipelines(),
Payload: imageType.PayloadPipelines(),
},
})
}, "")
if err == nil {
err = api.store.PushCompose(composeID, manifest, imageType, bp, size, targets, jobId, packageSets["packages"])
}

View file

@ -7,12 +7,13 @@ import (
"os"
"testing"
"github.com/stretchr/testify/require"
"github.com/osbuild/osbuild-composer/internal/distro"
"github.com/osbuild/osbuild-composer/internal/distro/test_distro"
rpmmd_mock "github.com/osbuild/osbuild-composer/internal/mocks/rpmmd"
"github.com/osbuild/osbuild-composer/internal/worker"
"github.com/osbuild/osbuild-composer/internal/worker/clienterrors"
"github.com/stretchr/testify/require"
)
func TestComposeStatusFromLegacyError(t *testing.T) {
@ -41,10 +42,10 @@ func TestComposeStatusFromLegacyError(t *testing.T) {
t.Fatalf("error creating osbuild manifest: %v", err)
}
jobId, err := api.workers.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest})
jobId, err := api.workers.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "")
require.NoError(t, err)
j, token, _, _, _, err := api.workers.RequestJob(context.Background(), arch.Name(), []string{"osbuild"})
j, token, _, _, _, err := api.workers.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""})
require.NoError(t, err)
require.Equal(t, jobId, j)
@ -87,10 +88,10 @@ func TestComposeStatusFromJobError(t *testing.T) {
t.Fatalf("error creating osbuild manifest: %v", err)
}
jobId, err := api.workers.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest})
jobId, err := api.workers.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "")
require.NoError(t, err)
j, token, _, _, _, err := api.workers.RequestJob(context.Background(), arch.Name(), []string{"osbuild"})
j, token, _, _, _, err := api.workers.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""})
require.NoError(t, err)
require.Equal(t, jobId, j)

View file

@ -93,41 +93,41 @@ func (s *Server) WatchHeartbeats() {
}
}
func (s *Server) EnqueueOSBuild(arch string, job *OSBuildJob) (uuid.UUID, error) {
return s.enqueue("osbuild:"+arch, job, nil)
func (s *Server) EnqueueOSBuild(arch string, job *OSBuildJob, channel string) (uuid.UUID, error) {
return s.enqueue("osbuild:"+arch, job, nil, channel)
}
func (s *Server) EnqueueOSBuildAsDependency(arch string, job *OSBuildJob, manifestID uuid.UUID) (uuid.UUID, error) {
return s.enqueue("osbuild:"+arch, job, []uuid.UUID{manifestID})
func (s *Server) EnqueueOSBuildAsDependency(arch string, job *OSBuildJob, manifestID uuid.UUID, channel string) (uuid.UUID, error) {
return s.enqueue("osbuild:"+arch, job, []uuid.UUID{manifestID}, channel)
}
func (s *Server) EnqueueOSBuildKoji(arch string, job *OSBuildKojiJob, initID uuid.UUID) (uuid.UUID, error) {
return s.enqueue("osbuild-koji:"+arch, job, []uuid.UUID{initID})
func (s *Server) EnqueueOSBuildKoji(arch string, job *OSBuildKojiJob, initID uuid.UUID, channel string) (uuid.UUID, error) {
return s.enqueue("osbuild-koji:"+arch, job, []uuid.UUID{initID}, channel)
}
func (s *Server) EnqueueOSBuildKojiAsDependency(arch string, job *OSBuildKojiJob, manifestID, initID uuid.UUID) (uuid.UUID, error) {
return s.enqueue("osbuild-koji:"+arch, job, []uuid.UUID{initID, manifestID})
func (s *Server) EnqueueOSBuildKojiAsDependency(arch string, job *OSBuildKojiJob, manifestID, initID uuid.UUID, channel string) (uuid.UUID, error) {
return s.enqueue("osbuild-koji:"+arch, job, []uuid.UUID{initID, manifestID}, channel)
}
func (s *Server) EnqueueKojiInit(job *KojiInitJob) (uuid.UUID, error) {
return s.enqueue("koji-init", job, nil)
func (s *Server) EnqueueKojiInit(job *KojiInitJob, channel string) (uuid.UUID, error) {
return s.enqueue("koji-init", job, nil, channel)
}
func (s *Server) EnqueueKojiFinalize(job *KojiFinalizeJob, initID uuid.UUID, buildIDs []uuid.UUID) (uuid.UUID, error) {
return s.enqueue("koji-finalize", job, append([]uuid.UUID{initID}, buildIDs...))
func (s *Server) EnqueueKojiFinalize(job *KojiFinalizeJob, initID uuid.UUID, buildIDs []uuid.UUID, channel string) (uuid.UUID, error) {
return s.enqueue("koji-finalize", job, append([]uuid.UUID{initID}, buildIDs...), channel)
}
func (s *Server) EnqueueDepsolve(job *DepsolveJob) (uuid.UUID, error) {
return s.enqueue("depsolve", job, nil)
func (s *Server) EnqueueDepsolve(job *DepsolveJob, channel string) (uuid.UUID, error) {
return s.enqueue("depsolve", job, nil, channel)
}
func (s *Server) EnqueueManifestJobByID(job *ManifestJobByID, parent uuid.UUID) (uuid.UUID, error) {
return s.enqueue("manifest-id-only", job, []uuid.UUID{parent})
func (s *Server) EnqueueManifestJobByID(job *ManifestJobByID, parent uuid.UUID, channel string) (uuid.UUID, error) {
return s.enqueue("manifest-id-only", job, []uuid.UUID{parent}, channel)
}
func (s *Server) enqueue(jobType string, job interface{}, dependencies []uuid.UUID) (uuid.UUID, error) {
func (s *Server) enqueue(jobType string, job interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error) {
prometheus.EnqueueJobMetrics(jobType)
return s.jobs.Enqueue(jobType, job, dependencies)
return s.jobs.Enqueue(jobType, job, dependencies, channel)
}
func (s *Server) OSBuildJobStatus(id uuid.UUID, result *OSBuildJobResult) (*JobStatus, []uuid.UUID, error) {
@ -289,7 +289,7 @@ func (s *Server) jobStatus(id uuid.UUID, result interface{}) (string, *JobStatus
// OSBuildJob returns the parameters of an OSBuildJob
func (s *Server) OSBuildJob(id uuid.UUID, job *OSBuildJob) error {
jobType, rawArgs, _, err := s.jobs.Job(id)
jobType, rawArgs, _, _, err := s.jobs.Job(id)
if err != nil {
return err
}
@ -307,7 +307,7 @@ func (s *Server) OSBuildJob(id uuid.UUID, job *OSBuildJob) error {
// OSBuildKojiJob returns the parameters of an OSBuildKojiJob
func (s *Server) OSBuildKojiJob(id uuid.UUID, job *OSBuildKojiJob) error {
jobType, rawArgs, _, err := s.jobs.Job(id)
jobType, rawArgs, _, _, err := s.jobs.Job(id)
if err != nil {
return err
}
@ -325,7 +325,7 @@ func (s *Server) OSBuildKojiJob(id uuid.UUID, job *OSBuildKojiJob) error {
// JobType returns the type of the job
func (s *Server) JobType(id uuid.UUID) (string, error) {
jobType, _, _, err := s.jobs.Job(id)
jobType, _, _, _, err := s.jobs.Job(id)
// the architecture is internally encdode in the job type, but hide that
// from this API
return strings.Split(jobType, ":")[0], err
@ -389,15 +389,15 @@ func (s *Server) DeleteArtifacts(id uuid.UUID) error {
return os.RemoveAll(path.Join(s.artifactsDir, id.String()))
}
func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) {
return s.requestJob(ctx, arch, jobTypes, uuid.Nil)
func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) {
return s.requestJob(ctx, arch, jobTypes, uuid.Nil, channels)
}
func (s *Server) RequestJobById(ctx context.Context, arch string, requestedJobId uuid.UUID) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) {
return s.requestJob(ctx, arch, []string{}, requestedJobId)
return s.requestJob(ctx, arch, []string{}, requestedJobId, nil)
}
func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, requestedJobId uuid.UUID) (
func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, requestedJobId uuid.UUID, channels []string) (
jobId uuid.UUID, token uuid.UUID, jobType string, args json.RawMessage, dynamicArgs []json.RawMessage, err error) {
// treat osbuild jobs specially until we have found a generic way to
// specify dequeuing restrictions. For now, we only have one
@ -425,7 +425,7 @@ func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string,
jobId = requestedJobId
token, depIDs, jobType, args, err = s.jobs.DequeueByID(dequeueCtx, requestedJobId)
} else {
jobId, token, depIDs, jobType, args, err = s.jobs.Dequeue(dequeueCtx, jts)
jobId, token, depIDs, jobType, args, err = s.jobs.Dequeue(dequeueCtx, jts, channels)
}
if err != nil {
return
@ -550,7 +550,7 @@ func (h *apiHandlers) RequestJob(ctx echo.Context) error {
return err
}
jobId, token, jobType, jobArgs, dynamicJobArgs, err := h.server.RequestJob(ctx.Request().Context(), body.Arch, body.Types)
jobId, token, jobType, jobArgs, dynamicJobArgs, err := h.server.RequestJob(ctx.Request().Context(), body.Arch, body.Types, []string{""})
if err != nil {
if err == jobqueue.ErrDequeueTimeout {
return ctx.JSON(http.StatusNoContent, api.ObjectReference{

View file

@ -129,7 +129,7 @@ func TestCreate(t *testing.T) {
server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1")
handler := server.Handler()
_, err = server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest})
_, err = server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "")
require.NoError(t, err)
test.TestRoute(t, handler, false, "POST", "/api/worker/v1/jobs",
@ -158,10 +158,10 @@ func TestCancel(t *testing.T) {
server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1")
handler := server.Handler()
jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest})
jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "")
require.NoError(t, err)
j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"})
j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""})
require.NoError(t, err)
require.Equal(t, jobId, j)
require.Equal(t, "osbuild", typ)
@ -199,10 +199,10 @@ func TestUpdate(t *testing.T) {
server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1")
handler := server.Handler()
jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest})
jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "")
require.NoError(t, err)
j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"})
j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""})
require.NoError(t, err)
require.Equal(t, jobId, j)
require.Equal(t, "osbuild", typ)
@ -238,10 +238,10 @@ func TestArgs(t *testing.T) {
Payload: []string{"x", "y", "z"},
},
}
jobId, err := server.EnqueueOSBuild(arch.Name(), &job)
jobId, err := server.EnqueueOSBuild(arch.Name(), &job, "")
require.NoError(t, err)
_, _, _, args, _, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"})
_, _, _, args, _, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""})
require.NoError(t, err)
require.NotNil(t, args)
@ -272,10 +272,10 @@ func TestUpload(t *testing.T) {
server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1")
handler := server.Handler()
jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest})
jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "")
require.NoError(t, err)
j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"})
j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""})
require.NoError(t, err)
require.Equal(t, jobID, j)
require.Equal(t, "osbuild", typ)
@ -306,10 +306,10 @@ func TestUploadAlteredBasePath(t *testing.T) {
server := newTestServer(t, tempdir, time.Duration(0), "/api/image-builder-worker/v1")
handler := server.Handler()
jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest})
jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "")
require.NoError(t, err)
j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"})
j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""})
require.NoError(t, err)
require.Equal(t, jobID, j)
require.Equal(t, "osbuild", typ)
@ -379,7 +379,7 @@ func TestOAuth(t *testing.T) {
t.Fatalf("error creating osbuild manifest: %v", err)
}
_, err = workerServer.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest})
_, err = workerServer.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "")
require.NoError(t, err)
client, err := worker.NewClient(proxySrv.URL, nil, &offlineToken, &oauthSrv.URL, "/api/image-builder-worker/v1")
@ -404,7 +404,7 @@ func TestTimeout(t *testing.T) {
}
server := newTestServer(t, tempdir, time.Millisecond*10, "/api/image-builder-worker/v1")
_, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"})
_, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""})
require.Equal(t, jobqueue.ErrDequeueTimeout, err)
test.TestRoute(t, server.Handler(), false, "POST", "/api/image-builder-worker/v1/jobs", `{"arch":"arch","types":["types"]}`, http.StatusNoContent,
@ -424,10 +424,10 @@ func TestRequestJobById(t *testing.T) {
server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1")
handler := server.Handler()
depsolveJobId, err := server.EnqueueDepsolve(&worker.DepsolveJob{})
depsolveJobId, err := server.EnqueueDepsolve(&worker.DepsolveJob{}, "")
require.NoError(t, err)
jobId, err := server.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobId)
jobId, err := server.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobId, "")
require.NoError(t, err)
test.TestRoute(t, server.Handler(), false, "POST", "/api/worker/v1/jobs", `{"arch":"arch","types":["manifest-id-only"]}`, http.StatusBadRequest,
@ -436,7 +436,7 @@ func TestRequestJobById(t *testing.T) {
_, _, _, _, _, err = server.RequestJobById(context.Background(), arch.Name(), jobId)
require.Error(t, jobqueue.ErrNotPending, err)
_, token, _, _, _, err := server.RequestJob(context.Background(), arch.Name(), []string{"depsolve"})
_, token, _, _, _, err := server.RequestJob(context.Background(), arch.Name(), []string{"depsolve"}, []string{""})
require.NoError(t, err)
depsolveJR, err := json.Marshal(worker.DepsolveJobResult{})
@ -472,7 +472,7 @@ func TestMixedOSBuildJob(t *testing.T) {
Manifest: emptyManifestV2,
ImageName: "no-pipeline-names",
}
oldJobID, err := server.EnqueueOSBuild("x", &oldJob)
oldJobID, err := server.EnqueueOSBuild("x", &oldJob, "")
require.NoError(err)
newJob := worker.OSBuildJob{
@ -483,7 +483,7 @@ func TestMixedOSBuildJob(t *testing.T) {
Payload: []string{"other", "pipelines"},
},
}
newJobID, err := server.EnqueueOSBuild("x", &newJob)
newJobID, err := server.EnqueueOSBuild("x", &newJob, "")
require.NoError(err)
var oldJobRead worker.OSBuildJob
@ -511,7 +511,7 @@ func TestMixedOSBuildJob(t *testing.T) {
// don't block forever if the jobs weren't added or can't be retrieved
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
id, token, _, _, _, err := server.RequestJob(ctx, "x", []string{"osbuild"})
id, token, _, _, _, err := server.RequestJob(ctx, "x", []string{"osbuild"}, []string{""})
require.NoError(err)
return id, token
}
@ -612,9 +612,9 @@ func TestMixedOSBuildKojiJob(t *testing.T) {
enqueueKojiJob := func(job *worker.OSBuildKojiJob) uuid.UUID {
initJob := new(worker.KojiInitJob)
initJobID, err := server.EnqueueKojiInit(initJob)
initJobID, err := server.EnqueueKojiInit(initJob, "")
require.NoError(err)
jobID, err := server.EnqueueOSBuildKoji("k", job, initJobID)
jobID, err := server.EnqueueOSBuildKoji("k", job, initJobID, "")
require.NoError(err)
return jobID
}
@ -659,7 +659,7 @@ func TestMixedOSBuildKojiJob(t *testing.T) {
for idx := uint(0); idx < 2; idx++ {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
_, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"koji-init"})
_, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"koji-init"}, []string{""})
require.NoError(err)
require.NoError(server.FinishJob(token, nil))
}
@ -668,7 +668,7 @@ func TestMixedOSBuildKojiJob(t *testing.T) {
// don't block forever if the jobs weren't added or can't be retrieved
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
id, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"osbuild-koji"})
id, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"osbuild-koji"}, []string{""})
require.NoError(err)
return id, token
}
@ -769,10 +769,10 @@ func TestDepsolveLegacyErrorConversion(t *testing.T) {
}
server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1")
depsolveJobId, err := server.EnqueueDepsolve(&worker.DepsolveJob{})
depsolveJobId, err := server.EnqueueDepsolve(&worker.DepsolveJob{}, "")
require.NoError(t, err)
_, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{"depsolve"})
_, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{"depsolve"}, []string{""})
require.NoError(t, err)
reason := "Depsolve failed"
@ -812,7 +812,7 @@ func TestMixedOSBuildJobErrors(t *testing.T) {
Manifest: emptyManifestV2,
ImageName: "no-pipeline-names",
}
oldJobID, err := server.EnqueueOSBuild("x", &oldJob)
oldJobID, err := server.EnqueueOSBuild("x", &oldJob, "")
require.NoError(err)
newJob := worker.OSBuildJob{
@ -823,7 +823,7 @@ func TestMixedOSBuildJobErrors(t *testing.T) {
Payload: []string{"other", "pipelines"},
},
}
newJobID, err := server.EnqueueOSBuild("x", &newJob)
newJobID, err := server.EnqueueOSBuild("x", &newJob, "")
require.NoError(err)
oldJobRead := new(worker.OSBuildJob)
@ -844,7 +844,7 @@ func TestMixedOSBuildJobErrors(t *testing.T) {
// don't block forever if the jobs weren't added or can't be retrieved
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
id, token, _, _, _, err := server.RequestJob(ctx, "x", []string{"osbuild"})
id, token, _, _, _, err := server.RequestJob(ctx, "x", []string{"osbuild"}, []string{""})
require.NoError(err)
return id, token
}
@ -917,9 +917,9 @@ func TestMixedOSBuildKojiJobErrors(t *testing.T) {
enqueueKojiJob := func(job *worker.OSBuildKojiJob) uuid.UUID {
initJob := new(worker.KojiInitJob)
initJobID, err := server.EnqueueKojiInit(initJob)
initJobID, err := server.EnqueueKojiInit(initJob, "")
require.NoError(err)
jobID, err := server.EnqueueOSBuildKoji("k", job, initJobID)
jobID, err := server.EnqueueOSBuildKoji("k", job, initJobID, "")
require.NoError(err)
return jobID
}
@ -957,7 +957,7 @@ func TestMixedOSBuildKojiJobErrors(t *testing.T) {
for idx := uint(0); idx < 2; idx++ {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
_, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"koji-init"})
_, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"koji-init"}, []string{""})
require.NoError(err)
require.NoError(server.FinishJob(token, nil))
}
@ -966,7 +966,7 @@ func TestMixedOSBuildKojiJobErrors(t *testing.T) {
// don't block forever if the jobs weren't added or can't be retrieved
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
id, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"osbuild-koji"})
id, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"osbuild-koji"}, []string{""})
require.NoError(err)
return id, token
}