From 7bfcee36f81eec7d790996d4302c28c4365d9f4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Budai?= Date: Sun, 6 Mar 2022 11:59:56 +0100 Subject: [PATCH] jobqueue: introduce the concept of channels MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Channels are a concept similar to job types. Callers must specify a channel name when queueing a new job. A list of channels is also specified when dequeueing a job. The dequeued job's channel will always be from one of the specified channel. Of course, the job types are also respected. The dequeued job will also always be from one of the specified type. Currently, all calls to jobqueue were changed so all queue operations use an empty channel name and all dequeue operations use a list containing an empty channel. Thus, this is a non-functional change. Signed-off-by: Ondřej Budai --- .../main_test.go | 57 ++++++++--------- internal/cloudapi/v2/v2.go | 24 +++---- internal/cloudapi/v2/v2_koji_test.go | 17 ++--- internal/cloudapi/v2/v2_test.go | 10 +-- internal/jobqueue/dbjobqueue/dbjobqueue.go | 20 +++--- .../dbjobqueue/schemas/002_channels.sql | 15 +++++ internal/jobqueue/fsjobqueue/fsjobqueue.go | 31 +++++---- internal/jobqueue/jobqueue.go | 10 +-- .../jobqueue/jobqueuetest/jobqueuetest.go | 40 ++++++------ internal/kojiapi/server.go | 6 +- internal/kojiapi/server_test.go | 17 ++--- internal/weldr/api.go | 2 +- internal/weldr/compose_test.go | 11 ++-- internal/worker/server.go | 54 ++++++++-------- internal/worker/server_test.go | 64 +++++++++---------- 15 files changed, 202 insertions(+), 176 deletions(-) create mode 100644 internal/jobqueue/dbjobqueue/schemas/002_channels.sql diff --git a/cmd/osbuild-composer-dbjobqueue-tests/main_test.go b/cmd/osbuild-composer-dbjobqueue-tests/main_test.go index 1134fc3c8..be3f20ed2 100644 --- a/cmd/osbuild-composer-dbjobqueue-tests/main_test.go +++ b/cmd/osbuild-composer-dbjobqueue-tests/main_test.go @@ -82,19 +82,19 @@ func testJobsUptoByType(t *testing.T, q *dbjobqueue.DBJobQueue) { date85 := time.Date(1985, time.January, 1, 0, 0, 0, 0, time.UTC) date90 := time.Date(1990, time.January, 1, 0, 0, 0, 0, time.UTC) - id80, err := q.Enqueue("octopus", nil, nil) + id80, err := q.Enqueue("octopus", nil, nil, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id80) - _,_,_,_,_, err = q.Dequeue(context.Background(), []string{"octopus"}) + _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) require.NoError(t, err) err = q.FinishJob(id80, nil) require.NoError(t, err) setFinishedAt(t, q, id80, date80) - id85, err := q.Enqueue("octopus", nil, nil) + id85, err := q.Enqueue("octopus", nil, nil, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id85) - _,_,_,_,_, err = q.Dequeue(context.Background(), []string{"octopus"}) + _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) require.NoError(t, err) err = q.FinishJob(id85, nil) require.NoError(t, err) @@ -111,65 +111,65 @@ func testJobsUptoByType(t *testing.T, q *dbjobqueue.DBJobQueue) { func testDeleteJobAndDependencies(t *testing.T, q *dbjobqueue.DBJobQueue) { // id1 -> id2 -> id3 - id1, err := q.Enqueue("octopus", nil, nil) + id1, err := q.Enqueue("octopus", nil, nil, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id1) - id2, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}) + id2, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id2) - id3, err := q.Enqueue("octopus", nil, []uuid.UUID{id2}) + id3, err := q.Enqueue("octopus", nil, []uuid.UUID{id2}, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id3) - c1, err := q.Enqueue("octopus", nil, nil) + c1, err := q.Enqueue("octopus", nil, nil, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, c1) - c2, err := q.Enqueue("octopus", nil, []uuid.UUID{c1}) + c2, err := q.Enqueue("octopus", nil, []uuid.UUID{c1}, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, c2) - c3, err := q.Enqueue("octopus", nil, []uuid.UUID{c2}) + c3, err := q.Enqueue("octopus", nil, []uuid.UUID{c2}, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, c3) controls := []uuid.UUID{c1, c2, c3} - _,_,_, err = q.Job(c1) + _, _, _, _, err = q.Job(c1) require.NoError(t, err) require.NoError(t, q.DeleteJobIncludingDependencies(id3)) for _, id := range []uuid.UUID{id1, id2, id3} { - _,_,_, err = q.Job(id) + _, _, _, _, err = q.Job(id) require.ErrorIs(t, err, jobqueue.ErrNotExist) } // controls should still exist for _, c := range controls { - _,_,_, err = q.Job(c) + _, _, _, _, err = q.Job(c) require.NoError(t, err) } // id1 -> id2 -> id4 && id3 -> id4 - id1, err = q.Enqueue("octopus", nil, nil) + id1, err = q.Enqueue("octopus", nil, nil, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id1) - id2, err = q.Enqueue("octopus", nil, []uuid.UUID{id1}) + id2, err = q.Enqueue("octopus", nil, []uuid.UUID{id1}, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id2) - id3, err = q.Enqueue("octopus", nil, nil) + id3, err = q.Enqueue("octopus", nil, nil, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id3) - id4, err := q.Enqueue("octopus", nil, []uuid.UUID{id2, id3}) + id4, err := q.Enqueue("octopus", nil, []uuid.UUID{id2, id3}, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id4) require.NoError(t, q.DeleteJobIncludingDependencies(id4)) for _, id := range []uuid.UUID{id1, id2, id3, id4} { - _,_,_, err = q.Job(id) + _, _, _, _, err = q.Job(id) require.ErrorIs(t, err, jobqueue.ErrNotExist) } // controls should still exist for _, c := range controls { - _,_,_, err = q.Job(c) + _, _, _, _, err = q.Job(c) require.NoError(t, err) } @@ -177,45 +177,44 @@ func testDeleteJobAndDependencies(t *testing.T, q *dbjobqueue.DBJobQueue) { // situation as it does not occur in the service. This should be changed once we allow // multiple build job per depsolve job, and the depsolve job should only be removed once all // the build jobs have been dealt with. - id1, err = q.Enqueue("octopus", nil, nil) + id1, err = q.Enqueue("octopus", nil, nil, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id1) - id2a, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}) + id2a, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id2a) - id2b, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}) + id2b, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id2b) - id3, err = q.Enqueue("octopus", nil, []uuid.UUID{id2a}) + id3, err = q.Enqueue("octopus", nil, []uuid.UUID{id2a}, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id3) require.NoError(t, q.DeleteJobIncludingDependencies(id3)) for _, id := range []uuid.UUID{id1, id2a, id3} { - _,_,_, err = q.Job(id) + _, _, _, _, err = q.Job(id) require.ErrorIs(t, err, jobqueue.ErrNotExist) } // id2b still exists - _,_,_, err = q.Job(id2b) + _, _, _, _, err = q.Job(id2b) require.NoError(t, err) // id2b can still be deleted with it's dependencies missing require.NoError(t, q.DeleteJobIncludingDependencies(id2b)) - _,_,_, err = q.Job(id2b) + _, _, _, _, err = q.Job(id2b) require.ErrorIs(t, err, jobqueue.ErrNotExist) // controls should still exist for _, c := range controls { - _,_,_, err = q.Job(c) + _, _, _, _, err = q.Job(c) require.NoError(t, err) } require.NoError(t, q.DeleteJobIncludingDependencies(uuid.Nil)) // controls should still exist for _, c := range controls { - _,_,_, err = q.Job(c) + _, _, _, _, err = q.Job(c) require.NoError(t, err) } } - diff --git a/internal/cloudapi/v2/v2.go b/internal/cloudapi/v2/v2.go index b653a6ae5..01bec0d7f 100644 --- a/internal/cloudapi/v2/v2.go +++ b/internal/cloudapi/v2/v2.go @@ -465,12 +465,12 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { var id uuid.UUID if request.Koji != nil { - id, err = enqueueKojiCompose(h.server.workers, uint64(request.Koji.TaskId), request.Koji.Server, request.Koji.Name, request.Koji.Version, request.Koji.Release, distribution, bp, manifestSeed, irs) + id, err = enqueueKojiCompose(h.server.workers, uint64(request.Koji.TaskId), request.Koji.Server, request.Koji.Name, request.Koji.Version, request.Koji.Release, distribution, bp, manifestSeed, irs, "") if err != nil { return err } } else { - id, err = enqueueCompose(h.server.workers, distribution, bp, manifestSeed, irs) + id, err = enqueueCompose(h.server.workers, distribution, bp, manifestSeed, irs, "") if err != nil { return err } @@ -488,7 +488,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { }) } -func enqueueCompose(workers *worker.Server, distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest) (uuid.UUID, error) { +func enqueueCompose(workers *worker.Server, distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) { var id uuid.UUID if len(irs) != 1 { return id, HTTPError(ErrorInvalidNumberOfImageBuilds) @@ -502,12 +502,12 @@ func enqueueCompose(workers *worker.Server, distribution distro.Distro, bp bluep Arch: ir.arch.Name(), Releasever: distribution.Releasever(), PackageSetsRepos: ir.packageSetsRepositories, - }) + }, channel) if err != nil { return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) } - manifestJobID, err := workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobID) + manifestJobID, err := workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobID, channel) if err != nil { return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) } @@ -519,7 +519,7 @@ func enqueueCompose(workers *worker.Server, distribution distro.Distro, bp bluep Build: ir.imageType.BuildPipelines(), Payload: ir.imageType.PayloadPipelines(), }, - }, manifestJobID) + }, manifestJobID, channel) if err != nil { return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) } @@ -530,7 +530,7 @@ func enqueueCompose(workers *worker.Server, distribution distro.Distro, bp bluep return id, nil } -func enqueueKojiCompose(workers *worker.Server, taskID uint64, server, name, version, release string, distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest) (uuid.UUID, error) { +func enqueueKojiCompose(workers *worker.Server, taskID uint64, server, name, version, release string, distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) { var id uuid.UUID kojiDirectory := "osbuild-composer-koji-" + uuid.New().String() @@ -539,7 +539,7 @@ func enqueueKojiCompose(workers *worker.Server, taskID uint64, server, name, ver Name: name, Version: version, Release: release, - }) + }, channel) if err != nil { return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) } @@ -554,12 +554,12 @@ func enqueueKojiCompose(workers *worker.Server, taskID uint64, server, name, ver Arch: ir.arch.Name(), Releasever: distribution.Releasever(), PackageSetsRepos: ir.packageSetsRepositories, - }) + }, channel) if err != nil { return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) } - manifestJobID, err := workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobID) + manifestJobID, err := workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobID, channel) if err != nil { return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) } @@ -581,7 +581,7 @@ func enqueueKojiCompose(workers *worker.Server, taskID uint64, server, name, ver KojiServer: server, KojiDirectory: kojiDirectory, KojiFilename: kojiFilename, - }, manifestJobID, initID) + }, manifestJobID, initID, channel) if err != nil { return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) } @@ -600,7 +600,7 @@ func enqueueKojiCompose(workers *worker.Server, taskID uint64, server, name, ver KojiDirectory: kojiDirectory, TaskID: taskID, StartTime: uint64(time.Now().Unix()), - }, initID, buildIDs) + }, initID, buildIDs, channel) if err != nil { return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) } diff --git a/internal/cloudapi/v2/v2_koji_test.go b/internal/cloudapi/v2/v2_koji_test.go index 24d4d46ef..f1ab0ccf2 100644 --- a/internal/cloudapi/v2/v2_koji_test.go +++ b/internal/cloudapi/v2/v2_koji_test.go @@ -12,6 +12,8 @@ import ( "time" "github.com/google/uuid" + "github.com/stretchr/testify/require" + v2 "github.com/osbuild/osbuild-composer/internal/cloudapi/v2" "github.com/osbuild/osbuild-composer/internal/distro/test_distro" "github.com/osbuild/osbuild-composer/internal/kojiapi/api" @@ -19,7 +21,6 @@ import ( "github.com/osbuild/osbuild-composer/internal/test" "github.com/osbuild/osbuild-composer/internal/worker" "github.com/osbuild/osbuild-composer/internal/worker/clienterrors" - "github.com/stretchr/testify/require" ) type jobResult struct { @@ -368,7 +369,7 @@ func TestKojiCompose(t *testing.T) { c.composeReplyCode, c.composeReply, "id", "operation_id") // handle koji-init - _, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"koji-init"}) + _, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"koji-init"}, []string{""}) require.NoError(t, err) require.Equal(t, "koji-init", jobType) @@ -386,7 +387,7 @@ func TestKojiCompose(t *testing.T) { fmt.Sprintf(`{"href":"/api/worker/v1/jobs/%v","id":"%v","kind":"UpdateJobResponse"}`, token, token)) // handle osbuild-koji #1 - _, token, jobType, rawJob, _, err = workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild-koji"}) + _, token, jobType, rawJob, _, err = workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild-koji"}, []string{""}) require.NoError(t, err) require.Equal(t, "osbuild-koji", jobType) @@ -403,7 +404,7 @@ func TestKojiCompose(t *testing.T) { fmt.Sprintf(`{"href":"/api/worker/v1/jobs/%v","id":"%v","kind":"UpdateJobResponse"}`, token, token)) // handle osbuild-koji #2 - _, token, jobType, rawJob, _, err = workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild-koji"}) + _, token, jobType, rawJob, _, err = workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild-koji"}, []string{""}) require.NoError(t, err) require.Equal(t, "osbuild-koji", jobType) @@ -427,7 +428,7 @@ func TestKojiCompose(t *testing.T) { fmt.Sprintf(`{"href":"/api/worker/v1/jobs/%v","id":"%v","kind":"UpdateJobResponse"}`, token, token)) // handle koji-finalize - finalizeID, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"koji-finalize"}) + finalizeID, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"koji-finalize"}, []string{""}) require.NoError(t, err) require.Equal(t, "koji-finalize", jobType) @@ -510,7 +511,7 @@ func TestKojiJobTypeValidation(t *testing.T) { Version: "42", Release: "1", } - initID, err := workers.EnqueueKojiInit(&initJob) + initID, err := workers.EnqueueKojiInit(&initJob, "") require.NoError(t, err) buildJobs := make([]worker.OSBuildKojiJob, nImages) @@ -524,7 +525,7 @@ func TestKojiJobTypeValidation(t *testing.T) { KojiDirectory: "koji-server-test-dir", KojiFilename: fname, } - buildID, err := workers.EnqueueOSBuildKoji(fmt.Sprintf("fake-arch-%d", idx), &buildJob, initID) + buildID, err := workers.EnqueueOSBuildKoji(fmt.Sprintf("fake-arch-%d", idx), &buildJob, initID, "") require.NoError(t, err) buildJobs[idx] = buildJob @@ -542,7 +543,7 @@ func TestKojiJobTypeValidation(t *testing.T) { TaskID: 0, StartTime: uint64(time.Now().Unix()), } - finalizeID, err := workers.EnqueueKojiFinalize(&finalizeJob, initID, buildJobIDs) + finalizeID, err := workers.EnqueueKojiFinalize(&finalizeJob, initID, buildJobIDs, "") require.NoError(t, err) // ----- Jobs queued - Test API endpoints (status, manifests, logs) ----- // diff --git a/internal/cloudapi/v2/v2_test.go b/internal/cloudapi/v2/v2_test.go index 85f958c5e..21fe950af 100644 --- a/internal/cloudapi/v2/v2_test.go +++ b/internal/cloudapi/v2/v2_test.go @@ -39,7 +39,7 @@ func newV2Server(t *testing.T, dir string) (*v2.Server, *worker.Server, context. depsolveContext, cancel := context.WithCancel(context.Background()) go func() { for { - _, token, _, _, _, err := workerServer.RequestJob(context.Background(), test_distro.TestDistroName, []string{"depsolve"}) + _, token, _, _, _, err := workerServer.RequestJob(context.Background(), test_distro.TestDistroName, []string{"depsolve"}, []string{""}) if err != nil { continue } @@ -575,7 +575,7 @@ func TestComposeStatusSuccess(t *testing.T) { "kind": "ComposeId" }`, "id") - jobId, token, jobType, args, dynArgs, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"}) + jobId, token, jobType, args, dynArgs, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"}, []string{""}) require.NoError(t, err) require.Equal(t, "osbuild", jobType) @@ -647,7 +647,7 @@ func TestComposeStatusFailure(t *testing.T) { "kind": "ComposeId" }`, "id") - jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"}) + jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"}, []string{""}) require.NoError(t, err) require.Equal(t, "osbuild", jobType) @@ -699,7 +699,7 @@ func TestComposeLegacyError(t *testing.T) { "kind": "ComposeId" }`, "id") - jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"}) + jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"}, []string{""}) require.NoError(t, err) require.Equal(t, "osbuild", jobType) @@ -754,7 +754,7 @@ func TestComposeJobError(t *testing.T) { "kind": "ComposeId" }`, "id") - jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"}) + jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"}, []string{""}) require.NoError(t, err) require.Equal(t, "osbuild", jobType) diff --git a/internal/jobqueue/dbjobqueue/dbjobqueue.go b/internal/jobqueue/dbjobqueue/dbjobqueue.go index 84f289a03..f3d129633 100644 --- a/internal/jobqueue/dbjobqueue/dbjobqueue.go +++ b/internal/jobqueue/dbjobqueue/dbjobqueue.go @@ -28,7 +28,7 @@ const ( sqlListen = `LISTEN jobs` sqlUnlisten = `UNLISTEN jobs` - sqlEnqueue = `INSERT INTO jobs(id, type, args, queued_at) VALUES ($1, $2, $3, NOW())` + sqlEnqueue = `INSERT INTO jobs(id, type, args, queued_at, channel) VALUES ($1, $2, $3, NOW(), $4)` sqlDequeue = ` UPDATE jobs SET token = $1, started_at = now() @@ -37,7 +37,7 @@ const ( FROM ready_jobs -- use ANY here, because "type in ()" doesn't work with bound parameters -- literal syntax for this is '{"a", "b"}': https://www.postgresql.org/docs/13/arrays.html - WHERE type = ANY($2) + WHERE type = ANY($2) AND channel = ANY($3) LIMIT 1 FOR UPDATE SKIP LOCKED ) @@ -62,7 +62,7 @@ const ( WHERE job_id = $1` sqlQueryJob = ` - SELECT type, args, started_at, finished_at, canceled + SELECT type, args, channel, started_at, finished_at, canceled FROM jobs WHERE id = $1` sqlQueryJobStatus = ` @@ -141,7 +141,7 @@ func (q *DBJobQueue) Close() { q.pool.Close() } -func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID) (uuid.UUID, error) { +func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error) { conn, err := q.pool.Acquire(context.Background()) if err != nil { return uuid.Nil, fmt.Errorf("error connecting to database: %v", err) @@ -160,7 +160,7 @@ func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu }() id := uuid.New() - _, err = conn.Exec(context.Background(), sqlEnqueue, id, jobType, args) + _, err = conn.Exec(context.Background(), sqlEnqueue, id, jobType, args, channel) if err != nil { return uuid.Nil, fmt.Errorf("error enqueuing job: %v", err) } @@ -187,7 +187,7 @@ func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu return id, nil } -func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { +func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { // Return early if the context is already canceled. if err := ctx.Err(); err != nil { return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout @@ -216,7 +216,7 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, var started, queued *time.Time token := uuid.New() for { - err = conn.QueryRow(ctx, sqlDequeue, token, jobTypes).Scan(&id, &token, &jobType, &args, &queued, &started) + err = conn.QueryRow(ctx, sqlDequeue, token, jobTypes, channels).Scan(&id, &token, &jobType, &args, &queued, &started) if err == nil { break } @@ -310,7 +310,7 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error { var started, finished *time.Time var jobType string canceled := false - err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, nil, &started, &finished, &canceled) + err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, nil, nil, &started, &finished, &canceled) if err == pgx.ErrNoRows { return jobqueue.ErrNotExist } @@ -409,14 +409,14 @@ func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, result json.RawMes } // Job returns all the parameters that define a job (everything provided during Enqueue). -func (q *DBJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, err error) { +func (q *DBJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, channel string, err error) { conn, err := q.pool.Acquire(context.Background()) if err != nil { return } defer conn.Release() - err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, &args, nil, nil, nil) + err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, &args, &channel, nil, nil, nil) if err == pgx.ErrNoRows { err = jobqueue.ErrNotExist return diff --git a/internal/jobqueue/dbjobqueue/schemas/002_channels.sql b/internal/jobqueue/dbjobqueue/schemas/002_channels.sql new file mode 100644 index 000000000..5c46e88f4 --- /dev/null +++ b/internal/jobqueue/dbjobqueue/schemas/002_channels.sql @@ -0,0 +1,15 @@ +ALTER TABLE jobs +ADD COLUMN channel varchar NOT NULL DEFAULT ''; + +-- We added a column, thus we have to recreate the view. +CREATE OR REPLACE VIEW ready_jobs AS +SELECT * +FROM jobs +WHERE started_at IS NULL + AND canceled = FALSE + AND id NOT IN ( + SELECT job_id + FROM job_dependencies JOIN jobs ON dependency_id = id + WHERE finished_at IS NULL +) +ORDER BY queued_at ASC diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index 949c7b256..e3c8c5797 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -67,6 +67,7 @@ type job struct { Args json.RawMessage `json:"args,omitempty"` Dependencies []uuid.UUID `json:"dependencies"` Result json.RawMessage `json:"result,omitempty"` + Channel string `json:"channel"` QueuedAt time.Time `json:"queued_at,omitempty"` StartedAt time.Time `json:"started_at,omitempty"` @@ -127,7 +128,7 @@ func New(dir string) (*fsJobQueue, error) { return q, nil } -func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID) (uuid.UUID, error) { +func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error) { q.mu.Lock() defer q.mu.Unlock() @@ -137,6 +138,7 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu Type: jobType, Dependencies: dependencies, QueuedAt: time.Now(), + Channel: channel, } var err error @@ -172,7 +174,7 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu return j.Id, nil } -func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { +func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { q.mu.Lock() defer q.mu.Unlock() @@ -191,7 +193,7 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, for { var found bool var err error - j, found, err = q.dequeueSuitableJob(jobTypes) + j, found, err = q.dequeueSuitableJob(jobTypes, channels) if err != nil { return uuid.Nil, uuid.Nil, nil, "", nil, err } @@ -358,7 +360,7 @@ func (q *fsJobQueue) JobStatus(id uuid.UUID) (jobType string, result json.RawMes return } -func (q *fsJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, err error) { +func (q *fsJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, channel string, err error) { j, err := q.readJob(id) if err != nil { return @@ -367,6 +369,7 @@ func (q *fsJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, de jobType = j.Type args = j.Args dependencies = j.Dependencies + channel = j.Channel return } @@ -476,10 +479,11 @@ func (q *fsJobQueue) hasAllFinishedDependencies(j *job) (bool, error) { // - must be pending // - its dependencies must be finished // - must be of one of the type from jobTypes +// - must be of one of the channel from channels // // If a suitable job is not found, false is returned. // If an error occurs during the search, it's returned. -func (q *fsJobQueue) dequeueSuitableJob(jobTypes []string) (*job, bool, error) { +func (q *fsJobQueue) dequeueSuitableJob(jobTypes []string, channels []string) (*job, bool, error) { el := q.pending.Front() for el != nil { id := el.Value.(uuid.UUID) @@ -489,7 +493,7 @@ func (q *fsJobQueue) dequeueSuitableJob(jobTypes []string) (*job, bool, error) { return nil, false, err } - if !jobMatchesCriteria(j, jobTypes) { + if !jobMatchesCriteria(j, jobTypes, channels) { el = el.Next() continue } @@ -527,12 +531,17 @@ func (q *fsJobQueue) removePendingJob(id uuid.UUID) { // // Criteria: // - the job's type is one of the acceptedJobTypes -func jobMatchesCriteria(j *job, acceptedJobTypes []string) bool { - for _, jt := range acceptedJobTypes { - if jt == j.Type { - return true +// - the job's channel is one of the acceptedChannels +func jobMatchesCriteria(j *job, acceptedJobTypes []string, acceptedChannels []string) bool { + contains := func(slice []string, str string) bool { + for _, item := range slice { + if str == item { + return true + } } + + return false } - return false + return contains(acceptedJobTypes, j.Type) && contains(acceptedChannels, j.Channel) } diff --git a/internal/jobqueue/jobqueue.go b/internal/jobqueue/jobqueue.go index 02cb98066..28ae645d2 100644 --- a/internal/jobqueue/jobqueue.go +++ b/internal/jobqueue/jobqueue.go @@ -32,16 +32,16 @@ type JobQueue interface { // have finished. // // Returns the id of the new job, or an error. - Enqueue(jobType string, args interface{}, dependencies []uuid.UUID) (uuid.UUID, error) + Enqueue(jobType string, args interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error) // Dequeues a job, blocking until one is available. // - // Waits until a job with a type of any of `jobTypes` is available, or `ctx` is - // canceled. + // Waits until a job with a type of any of `jobTypes` and any of `channels` + // is available, or `ctx` is canceled. // // Returns the job's id, token, dependencies, type, and arguments, or an error. Arguments // can be unmarshaled to the type given in Enqueue(). - Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) + Dequeue(ctx context.Context, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) // Dequeues a pending job by its ID in a non-blocking way. // @@ -67,7 +67,7 @@ type JobQueue interface { JobStatus(id uuid.UUID) (jobType string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) // Job returns all the parameters that define a job (everything provided during Enqueue). - Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, err error) + Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, channel string, err error) // Find job by token, this will return an error if the job hasn't been dequeued IdFromToken(token uuid.UUID) (id uuid.UUID, err error) diff --git a/internal/jobqueue/jobqueuetest/jobqueuetest.go b/internal/jobqueue/jobqueuetest/jobqueuetest.go index b7f403a3e..9e4e9e51e 100644 --- a/internal/jobqueue/jobqueuetest/jobqueuetest.go +++ b/internal/jobqueue/jobqueuetest/jobqueuetest.go @@ -45,14 +45,14 @@ func TestJobQueue(t *testing.T, makeJobQueue MakeJobQueue) { func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID) uuid.UUID { t.Helper() - id, err := q.Enqueue(jobType, args, dependencies) + id, err := q.Enqueue(jobType, args, dependencies, "") require.NoError(t, err) require.NotEmpty(t, id) return id } func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result interface{}, deps []uuid.UUID) uuid.UUID { - id, tok, d, typ, args, err := q.Dequeue(context.Background(), []string{jobType}) + id, tok, d, typ, args, err := q.Dequeue(context.Background(), []string{jobType}, []string{""}) require.NoError(t, err) require.NotEmpty(t, id) require.NotEmpty(t, tok) @@ -68,18 +68,18 @@ func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result func testErrors(t *testing.T, q jobqueue.JobQueue) { // not serializable to JSON - id, err := q.Enqueue("test", make(chan string), nil) + id, err := q.Enqueue("test", make(chan string), nil, "") require.Error(t, err) require.Equal(t, uuid.Nil, id) // invalid dependency - id, err = q.Enqueue("test", "arg0", []uuid.UUID{uuid.New()}) + id, err = q.Enqueue("test", "arg0", []uuid.UUID{uuid.New()}, "") require.Error(t, err) require.Equal(t, uuid.Nil, id) // token gets removed pushTestJob(t, q, "octopus", nil, nil) - id, tok, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}) + id, tok, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) require.NoError(t, err) require.NotEmpty(t, tok) @@ -110,7 +110,7 @@ func testArgs(t *testing.T, q jobqueue.JobQueue) { var parsedArgs argument - id, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"octopus"}) + id, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) require.NoError(t, err) require.Equal(t, two, id) require.NotEmpty(t, tok) @@ -121,13 +121,13 @@ func testArgs(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, twoargs, parsedArgs) // Read job params after Dequeue - jtype, jargs, jdeps, err := q.Job(id) + jtype, jargs, jdeps, _, err := q.Job(id) require.NoError(t, err) require.Equal(t, args, jargs) require.Equal(t, deps, jdeps) require.Equal(t, typ, jtype) - id, tok, deps, typ, args, err = q.Dequeue(context.Background(), []string{"fish"}) + id, tok, deps, typ, args, err = q.Dequeue(context.Background(), []string{"fish"}, []string{""}) require.NoError(t, err) require.Equal(t, one, id) require.NotEmpty(t, tok) @@ -137,13 +137,13 @@ func testArgs(t *testing.T, q jobqueue.JobQueue) { require.NoError(t, err) require.Equal(t, oneargs, parsedArgs) - jtype, jargs, jdeps, err = q.Job(id) + jtype, jargs, jdeps, _, err = q.Job(id) require.NoError(t, err) require.Equal(t, args, jargs) require.Equal(t, deps, jdeps) require.Equal(t, typ, jtype) - _, _, _, err = q.Job(uuid.New()) + _, _, _, _, err = q.Job(uuid.New()) require.Error(t, err) } @@ -156,7 +156,7 @@ func testJobTypes(t *testing.T, q jobqueue.JobQueue) { ctx, cancel := context.WithCancel(context.Background()) cancel() - id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"zebra"}) + id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"zebra"}, []string{""}) require.Equal(t, err, jobqueue.ErrDequeueTimeout) require.Equal(t, uuid.Nil, id) require.Equal(t, uuid.Nil, tok) @@ -168,12 +168,12 @@ func testJobTypes(t *testing.T, q jobqueue.JobQueue) { func testDequeueTimeout(t *testing.T, q jobqueue.JobQueue) { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*20) defer cancel() - _, _, _, _, _, err := q.Dequeue(ctx, []string{"octopus"}) + _, _, _, _, _, err := q.Dequeue(ctx, []string{"octopus"}, []string{""}) require.Equal(t, jobqueue.ErrDequeueTimeout, err) ctx2, cancel2 := context.WithCancel(context.Background()) cancel2() - _, _, _, _, _, err = q.Dequeue(ctx2, []string{"octopus"}) + _, _, _, _, _, err = q.Dequeue(ctx2, []string{"octopus"}, []string{""}) require.Equal(t, jobqueue.ErrDequeueTimeout, err) } @@ -255,7 +255,7 @@ func testMultipleWorkers(t *testing.T, q jobqueue.JobQueue) { defer close(done) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"octopus"}) + id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"octopus"}, []string{""}) require.NoError(t, err) require.NotEmpty(t, id) require.NotEmpty(t, tok) @@ -270,7 +270,7 @@ func testMultipleWorkers(t *testing.T, q jobqueue.JobQueue) { // This call to Dequeue() should not block on the one in the goroutine. id := pushTestJob(t, q, "clownfish", nil, nil) - r, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}) + r, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}, []string{""}) require.NoError(t, err) require.Equal(t, id, r) require.NotEmpty(t, tok) @@ -293,7 +293,7 @@ func testMultipleWorkersSingleJobType(t *testing.T, q jobqueue.JobQueue) { defer wg.Add(-1) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"clownfish"}) + id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"clownfish"}, []string{""}) require.NoError(t, err) require.NotEmpty(t, id) require.NotEmpty(t, tok) @@ -340,7 +340,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { // Cancel a running job, which should not dequeue the canceled job from above id = pushTestJob(t, q, "clownfish", nil, nil) require.NotEmpty(t, id) - r, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}) + r, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}, []string{""}) require.NoError(t, err) require.Equal(t, id, r) require.NotEmpty(t, tok) @@ -360,7 +360,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { // Cancel a finished job, which is a no-op id = pushTestJob(t, q, "clownfish", nil, nil) require.NotEmpty(t, id) - r, tok, deps, typ, args, err = q.Dequeue(context.Background(), []string{"clownfish"}) + r, tok, deps, typ, args, err = q.Dequeue(context.Background(), []string{"clownfish"}, []string{""}) require.NoError(t, err) require.Equal(t, id, r) require.NotEmpty(t, tok) @@ -385,7 +385,7 @@ func testHeartbeats(t *testing.T, q jobqueue.JobQueue) { // No heartbeats for queued job require.Empty(t, q.Heartbeats(time.Second*0)) - r, tok, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}) + r, tok, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) require.NoError(t, err) require.Equal(t, id, r) require.NotEmpty(t, tok) @@ -447,7 +447,7 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) { t.Run("cannot dequeue a non-pending job", func(t *testing.T) { one := pushTestJob(t, q, "octopus", nil, nil) - _, _, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}) + _, _, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) require.NoError(t, err) _, _, _, _, err = q.DequeueByID(context.Background(), one) diff --git a/internal/kojiapi/server.go b/internal/kojiapi/server.go index e73f8866c..b49130730 100644 --- a/internal/kojiapi/server.go +++ b/internal/kojiapi/server.go @@ -161,7 +161,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { Name: request.Name, Version: request.Version, Release: request.Release, - }) + }, "") if err != nil { // This is a programming error. panic(err) @@ -177,7 +177,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { KojiServer: request.Koji.Server, KojiDirectory: kojiDirectory, KojiFilename: kojiFilenames[i], - }, initID) + }, initID, "") if err != nil { // This is a programming error. panic(err) @@ -194,7 +194,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { KojiDirectory: kojiDirectory, TaskID: uint64(request.Koji.TaskId), StartTime: uint64(time.Now().Unix()), - }, initID, buildIDs) + }, initID, buildIDs, "") if err != nil { // This is a programming error. panic(err) diff --git a/internal/kojiapi/server_test.go b/internal/kojiapi/server_test.go index e3cd1a1dc..f133a7e0b 100644 --- a/internal/kojiapi/server_test.go +++ b/internal/kojiapi/server_test.go @@ -14,6 +14,8 @@ import ( "time" "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/osbuild/osbuild-composer/internal/distro/test_distro" "github.com/osbuild/osbuild-composer/internal/kojiapi" "github.com/osbuild/osbuild-composer/internal/kojiapi/api" @@ -23,7 +25,6 @@ import ( "github.com/osbuild/osbuild-composer/internal/test" "github.com/osbuild/osbuild-composer/internal/worker" "github.com/osbuild/osbuild-composer/internal/worker/clienterrors" - "github.com/stretchr/testify/require" ) func newTestKojiServer(t *testing.T, dir string) (*kojiapi.Server, *worker.Server) { @@ -334,7 +335,7 @@ func TestCompose(t *testing.T) { wg.Add(1) go func(t *testing.T, result worker.KojiInitJobResult) { - _, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"koji-init"}) + _, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"koji-init"}, []string{""}) require.NoError(t, err) require.Equal(t, "koji-init", jobType) @@ -387,7 +388,7 @@ func TestCompose(t *testing.T) { c.composeReplyCode, c.composeReply, "id") wg.Wait() - _, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"osbuild-koji"}) + _, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"osbuild-koji"}, []string{""}) require.NoError(t, err) require.Equal(t, "osbuild-koji", jobType) @@ -403,7 +404,7 @@ func TestCompose(t *testing.T) { test.TestRoute(t, workerHandler, false, "PATCH", fmt.Sprintf("/api/worker/v1/jobs/%v", token), string(buildJobResult), http.StatusOK, fmt.Sprintf(`{"href":"/api/worker/v1/jobs/%v","id":"%v","kind":"UpdateJobResponse"}`, token, token)) - _, token, jobType, rawJob, _, err = workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"osbuild-koji"}) + _, token, jobType, rawJob, _, err = workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"osbuild-koji"}, []string{""}) require.NoError(t, err) require.Equal(t, "osbuild-koji", jobType) @@ -426,7 +427,7 @@ func TestCompose(t *testing.T) { }`, test_distro.TestArchName, test_distro.TestDistroName), http.StatusOK, fmt.Sprintf(`{"href":"/api/worker/v1/jobs/%v","id":"%v","kind":"UpdateJobResponse"}`, token, token)) - finalizeID, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"koji-finalize"}) + finalizeID, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"koji-finalize"}, []string{""}) require.NoError(t, err) require.Equal(t, "koji-finalize", jobType) @@ -507,7 +508,7 @@ func TestJobTypeValidation(t *testing.T) { Version: "42", Release: "1", } - initID, err := workers.EnqueueKojiInit(&initJob) + initID, err := workers.EnqueueKojiInit(&initJob, "") require.NoError(t, err) buildJobs := make([]worker.OSBuildKojiJob, nImages) @@ -521,7 +522,7 @@ func TestJobTypeValidation(t *testing.T) { KojiDirectory: "koji-server-test-dir", KojiFilename: fname, } - buildID, err := workers.EnqueueOSBuildKoji(fmt.Sprintf("fake-arch-%d", idx), &buildJob, initID) + buildID, err := workers.EnqueueOSBuildKoji(fmt.Sprintf("fake-arch-%d", idx), &buildJob, initID, "") require.NoError(t, err) buildJobs[idx] = buildJob @@ -539,7 +540,7 @@ func TestJobTypeValidation(t *testing.T) { TaskID: 0, StartTime: uint64(time.Now().Unix()), } - finalizeID, err := workers.EnqueueKojiFinalize(&finalizeJob, initID, buildJobIDs) + finalizeID, err := workers.EnqueueKojiFinalize(&finalizeJob, initID, buildJobIDs, "") require.NoError(t, err) // ----- Jobs queued - Test API endpoints (status, manifests, logs) ----- // diff --git a/internal/weldr/api.go b/internal/weldr/api.go index 7be7d1bbd..ab08f6145 100644 --- a/internal/weldr/api.go +++ b/internal/weldr/api.go @@ -2337,7 +2337,7 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request Build: imageType.BuildPipelines(), Payload: imageType.PayloadPipelines(), }, - }) + }, "") if err == nil { err = api.store.PushCompose(composeID, manifest, imageType, bp, size, targets, jobId, packageSets["packages"]) } diff --git a/internal/weldr/compose_test.go b/internal/weldr/compose_test.go index 926fa01d3..d63de4ace 100644 --- a/internal/weldr/compose_test.go +++ b/internal/weldr/compose_test.go @@ -7,12 +7,13 @@ import ( "os" "testing" + "github.com/stretchr/testify/require" + "github.com/osbuild/osbuild-composer/internal/distro" "github.com/osbuild/osbuild-composer/internal/distro/test_distro" rpmmd_mock "github.com/osbuild/osbuild-composer/internal/mocks/rpmmd" "github.com/osbuild/osbuild-composer/internal/worker" "github.com/osbuild/osbuild-composer/internal/worker/clienterrors" - "github.com/stretchr/testify/require" ) func TestComposeStatusFromLegacyError(t *testing.T) { @@ -41,10 +42,10 @@ func TestComposeStatusFromLegacyError(t *testing.T) { t.Fatalf("error creating osbuild manifest: %v", err) } - jobId, err := api.workers.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) + jobId, err := api.workers.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "") require.NoError(t, err) - j, token, _, _, _, err := api.workers.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + j, token, _, _, _, err := api.workers.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""}) require.NoError(t, err) require.Equal(t, jobId, j) @@ -87,10 +88,10 @@ func TestComposeStatusFromJobError(t *testing.T) { t.Fatalf("error creating osbuild manifest: %v", err) } - jobId, err := api.workers.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) + jobId, err := api.workers.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "") require.NoError(t, err) - j, token, _, _, _, err := api.workers.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + j, token, _, _, _, err := api.workers.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""}) require.NoError(t, err) require.Equal(t, jobId, j) diff --git a/internal/worker/server.go b/internal/worker/server.go index 4ba38579a..88a2831d4 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -93,41 +93,41 @@ func (s *Server) WatchHeartbeats() { } } -func (s *Server) EnqueueOSBuild(arch string, job *OSBuildJob) (uuid.UUID, error) { - return s.enqueue("osbuild:"+arch, job, nil) +func (s *Server) EnqueueOSBuild(arch string, job *OSBuildJob, channel string) (uuid.UUID, error) { + return s.enqueue("osbuild:"+arch, job, nil, channel) } -func (s *Server) EnqueueOSBuildAsDependency(arch string, job *OSBuildJob, manifestID uuid.UUID) (uuid.UUID, error) { - return s.enqueue("osbuild:"+arch, job, []uuid.UUID{manifestID}) +func (s *Server) EnqueueOSBuildAsDependency(arch string, job *OSBuildJob, manifestID uuid.UUID, channel string) (uuid.UUID, error) { + return s.enqueue("osbuild:"+arch, job, []uuid.UUID{manifestID}, channel) } -func (s *Server) EnqueueOSBuildKoji(arch string, job *OSBuildKojiJob, initID uuid.UUID) (uuid.UUID, error) { - return s.enqueue("osbuild-koji:"+arch, job, []uuid.UUID{initID}) +func (s *Server) EnqueueOSBuildKoji(arch string, job *OSBuildKojiJob, initID uuid.UUID, channel string) (uuid.UUID, error) { + return s.enqueue("osbuild-koji:"+arch, job, []uuid.UUID{initID}, channel) } -func (s *Server) EnqueueOSBuildKojiAsDependency(arch string, job *OSBuildKojiJob, manifestID, initID uuid.UUID) (uuid.UUID, error) { - return s.enqueue("osbuild-koji:"+arch, job, []uuid.UUID{initID, manifestID}) +func (s *Server) EnqueueOSBuildKojiAsDependency(arch string, job *OSBuildKojiJob, manifestID, initID uuid.UUID, channel string) (uuid.UUID, error) { + return s.enqueue("osbuild-koji:"+arch, job, []uuid.UUID{initID, manifestID}, channel) } -func (s *Server) EnqueueKojiInit(job *KojiInitJob) (uuid.UUID, error) { - return s.enqueue("koji-init", job, nil) +func (s *Server) EnqueueKojiInit(job *KojiInitJob, channel string) (uuid.UUID, error) { + return s.enqueue("koji-init", job, nil, channel) } -func (s *Server) EnqueueKojiFinalize(job *KojiFinalizeJob, initID uuid.UUID, buildIDs []uuid.UUID) (uuid.UUID, error) { - return s.enqueue("koji-finalize", job, append([]uuid.UUID{initID}, buildIDs...)) +func (s *Server) EnqueueKojiFinalize(job *KojiFinalizeJob, initID uuid.UUID, buildIDs []uuid.UUID, channel string) (uuid.UUID, error) { + return s.enqueue("koji-finalize", job, append([]uuid.UUID{initID}, buildIDs...), channel) } -func (s *Server) EnqueueDepsolve(job *DepsolveJob) (uuid.UUID, error) { - return s.enqueue("depsolve", job, nil) +func (s *Server) EnqueueDepsolve(job *DepsolveJob, channel string) (uuid.UUID, error) { + return s.enqueue("depsolve", job, nil, channel) } -func (s *Server) EnqueueManifestJobByID(job *ManifestJobByID, parent uuid.UUID) (uuid.UUID, error) { - return s.enqueue("manifest-id-only", job, []uuid.UUID{parent}) +func (s *Server) EnqueueManifestJobByID(job *ManifestJobByID, parent uuid.UUID, channel string) (uuid.UUID, error) { + return s.enqueue("manifest-id-only", job, []uuid.UUID{parent}, channel) } -func (s *Server) enqueue(jobType string, job interface{}, dependencies []uuid.UUID) (uuid.UUID, error) { +func (s *Server) enqueue(jobType string, job interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error) { prometheus.EnqueueJobMetrics(jobType) - return s.jobs.Enqueue(jobType, job, dependencies) + return s.jobs.Enqueue(jobType, job, dependencies, channel) } func (s *Server) OSBuildJobStatus(id uuid.UUID, result *OSBuildJobResult) (*JobStatus, []uuid.UUID, error) { @@ -289,7 +289,7 @@ func (s *Server) jobStatus(id uuid.UUID, result interface{}) (string, *JobStatus // OSBuildJob returns the parameters of an OSBuildJob func (s *Server) OSBuildJob(id uuid.UUID, job *OSBuildJob) error { - jobType, rawArgs, _, err := s.jobs.Job(id) + jobType, rawArgs, _, _, err := s.jobs.Job(id) if err != nil { return err } @@ -307,7 +307,7 @@ func (s *Server) OSBuildJob(id uuid.UUID, job *OSBuildJob) error { // OSBuildKojiJob returns the parameters of an OSBuildKojiJob func (s *Server) OSBuildKojiJob(id uuid.UUID, job *OSBuildKojiJob) error { - jobType, rawArgs, _, err := s.jobs.Job(id) + jobType, rawArgs, _, _, err := s.jobs.Job(id) if err != nil { return err } @@ -325,7 +325,7 @@ func (s *Server) OSBuildKojiJob(id uuid.UUID, job *OSBuildKojiJob) error { // JobType returns the type of the job func (s *Server) JobType(id uuid.UUID) (string, error) { - jobType, _, _, err := s.jobs.Job(id) + jobType, _, _, _, err := s.jobs.Job(id) // the architecture is internally encdode in the job type, but hide that // from this API return strings.Split(jobType, ":")[0], err @@ -389,15 +389,15 @@ func (s *Server) DeleteArtifacts(id uuid.UUID) error { return os.RemoveAll(path.Join(s.artifactsDir, id.String())) } -func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) { - return s.requestJob(ctx, arch, jobTypes, uuid.Nil) +func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) { + return s.requestJob(ctx, arch, jobTypes, uuid.Nil, channels) } func (s *Server) RequestJobById(ctx context.Context, arch string, requestedJobId uuid.UUID) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) { - return s.requestJob(ctx, arch, []string{}, requestedJobId) + return s.requestJob(ctx, arch, []string{}, requestedJobId, nil) } -func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, requestedJobId uuid.UUID) ( +func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, requestedJobId uuid.UUID, channels []string) ( jobId uuid.UUID, token uuid.UUID, jobType string, args json.RawMessage, dynamicArgs []json.RawMessage, err error) { // treat osbuild jobs specially until we have found a generic way to // specify dequeuing restrictions. For now, we only have one @@ -425,7 +425,7 @@ func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, jobId = requestedJobId token, depIDs, jobType, args, err = s.jobs.DequeueByID(dequeueCtx, requestedJobId) } else { - jobId, token, depIDs, jobType, args, err = s.jobs.Dequeue(dequeueCtx, jts) + jobId, token, depIDs, jobType, args, err = s.jobs.Dequeue(dequeueCtx, jts, channels) } if err != nil { return @@ -550,7 +550,7 @@ func (h *apiHandlers) RequestJob(ctx echo.Context) error { return err } - jobId, token, jobType, jobArgs, dynamicJobArgs, err := h.server.RequestJob(ctx.Request().Context(), body.Arch, body.Types) + jobId, token, jobType, jobArgs, dynamicJobArgs, err := h.server.RequestJob(ctx.Request().Context(), body.Arch, body.Types, []string{""}) if err != nil { if err == jobqueue.ErrDequeueTimeout { return ctx.JSON(http.StatusNoContent, api.ObjectReference{ diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go index e3f4a1323..8db49b776 100644 --- a/internal/worker/server_test.go +++ b/internal/worker/server_test.go @@ -129,7 +129,7 @@ func TestCreate(t *testing.T) { server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() - _, err = server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) + _, err = server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "") require.NoError(t, err) test.TestRoute(t, handler, false, "POST", "/api/worker/v1/jobs", @@ -158,10 +158,10 @@ func TestCancel(t *testing.T) { server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() - jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) + jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "") require.NoError(t, err) - j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""}) require.NoError(t, err) require.Equal(t, jobId, j) require.Equal(t, "osbuild", typ) @@ -199,10 +199,10 @@ func TestUpdate(t *testing.T) { server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() - jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) + jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "") require.NoError(t, err) - j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""}) require.NoError(t, err) require.Equal(t, jobId, j) require.Equal(t, "osbuild", typ) @@ -238,10 +238,10 @@ func TestArgs(t *testing.T) { Payload: []string{"x", "y", "z"}, }, } - jobId, err := server.EnqueueOSBuild(arch.Name(), &job) + jobId, err := server.EnqueueOSBuild(arch.Name(), &job, "") require.NoError(t, err) - _, _, _, args, _, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + _, _, _, args, _, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""}) require.NoError(t, err) require.NotNil(t, args) @@ -272,10 +272,10 @@ func TestUpload(t *testing.T) { server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() - jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) + jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "") require.NoError(t, err) - j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""}) require.NoError(t, err) require.Equal(t, jobID, j) require.Equal(t, "osbuild", typ) @@ -306,10 +306,10 @@ func TestUploadAlteredBasePath(t *testing.T) { server := newTestServer(t, tempdir, time.Duration(0), "/api/image-builder-worker/v1") handler := server.Handler() - jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) + jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "") require.NoError(t, err) - j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""}) require.NoError(t, err) require.Equal(t, jobID, j) require.Equal(t, "osbuild", typ) @@ -379,7 +379,7 @@ func TestOAuth(t *testing.T) { t.Fatalf("error creating osbuild manifest: %v", err) } - _, err = workerServer.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) + _, err = workerServer.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "") require.NoError(t, err) client, err := worker.NewClient(proxySrv.URL, nil, &offlineToken, &oauthSrv.URL, "/api/image-builder-worker/v1") @@ -404,7 +404,7 @@ func TestTimeout(t *testing.T) { } server := newTestServer(t, tempdir, time.Millisecond*10, "/api/image-builder-worker/v1") - _, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + _, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""}) require.Equal(t, jobqueue.ErrDequeueTimeout, err) test.TestRoute(t, server.Handler(), false, "POST", "/api/image-builder-worker/v1/jobs", `{"arch":"arch","types":["types"]}`, http.StatusNoContent, @@ -424,10 +424,10 @@ func TestRequestJobById(t *testing.T) { server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() - depsolveJobId, err := server.EnqueueDepsolve(&worker.DepsolveJob{}) + depsolveJobId, err := server.EnqueueDepsolve(&worker.DepsolveJob{}, "") require.NoError(t, err) - jobId, err := server.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobId) + jobId, err := server.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobId, "") require.NoError(t, err) test.TestRoute(t, server.Handler(), false, "POST", "/api/worker/v1/jobs", `{"arch":"arch","types":["manifest-id-only"]}`, http.StatusBadRequest, @@ -436,7 +436,7 @@ func TestRequestJobById(t *testing.T) { _, _, _, _, _, err = server.RequestJobById(context.Background(), arch.Name(), jobId) require.Error(t, jobqueue.ErrNotPending, err) - _, token, _, _, _, err := server.RequestJob(context.Background(), arch.Name(), []string{"depsolve"}) + _, token, _, _, _, err := server.RequestJob(context.Background(), arch.Name(), []string{"depsolve"}, []string{""}) require.NoError(t, err) depsolveJR, err := json.Marshal(worker.DepsolveJobResult{}) @@ -472,7 +472,7 @@ func TestMixedOSBuildJob(t *testing.T) { Manifest: emptyManifestV2, ImageName: "no-pipeline-names", } - oldJobID, err := server.EnqueueOSBuild("x", &oldJob) + oldJobID, err := server.EnqueueOSBuild("x", &oldJob, "") require.NoError(err) newJob := worker.OSBuildJob{ @@ -483,7 +483,7 @@ func TestMixedOSBuildJob(t *testing.T) { Payload: []string{"other", "pipelines"}, }, } - newJobID, err := server.EnqueueOSBuild("x", &newJob) + newJobID, err := server.EnqueueOSBuild("x", &newJob, "") require.NoError(err) var oldJobRead worker.OSBuildJob @@ -511,7 +511,7 @@ func TestMixedOSBuildJob(t *testing.T) { // don't block forever if the jobs weren't added or can't be retrieved ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() - id, token, _, _, _, err := server.RequestJob(ctx, "x", []string{"osbuild"}) + id, token, _, _, _, err := server.RequestJob(ctx, "x", []string{"osbuild"}, []string{""}) require.NoError(err) return id, token } @@ -612,9 +612,9 @@ func TestMixedOSBuildKojiJob(t *testing.T) { enqueueKojiJob := func(job *worker.OSBuildKojiJob) uuid.UUID { initJob := new(worker.KojiInitJob) - initJobID, err := server.EnqueueKojiInit(initJob) + initJobID, err := server.EnqueueKojiInit(initJob, "") require.NoError(err) - jobID, err := server.EnqueueOSBuildKoji("k", job, initJobID) + jobID, err := server.EnqueueOSBuildKoji("k", job, initJobID, "") require.NoError(err) return jobID } @@ -659,7 +659,7 @@ func TestMixedOSBuildKojiJob(t *testing.T) { for idx := uint(0); idx < 2; idx++ { ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() - _, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"koji-init"}) + _, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"koji-init"}, []string{""}) require.NoError(err) require.NoError(server.FinishJob(token, nil)) } @@ -668,7 +668,7 @@ func TestMixedOSBuildKojiJob(t *testing.T) { // don't block forever if the jobs weren't added or can't be retrieved ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() - id, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"osbuild-koji"}) + id, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"osbuild-koji"}, []string{""}) require.NoError(err) return id, token } @@ -769,10 +769,10 @@ func TestDepsolveLegacyErrorConversion(t *testing.T) { } server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") - depsolveJobId, err := server.EnqueueDepsolve(&worker.DepsolveJob{}) + depsolveJobId, err := server.EnqueueDepsolve(&worker.DepsolveJob{}, "") require.NoError(t, err) - _, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{"depsolve"}) + _, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{"depsolve"}, []string{""}) require.NoError(t, err) reason := "Depsolve failed" @@ -812,7 +812,7 @@ func TestMixedOSBuildJobErrors(t *testing.T) { Manifest: emptyManifestV2, ImageName: "no-pipeline-names", } - oldJobID, err := server.EnqueueOSBuild("x", &oldJob) + oldJobID, err := server.EnqueueOSBuild("x", &oldJob, "") require.NoError(err) newJob := worker.OSBuildJob{ @@ -823,7 +823,7 @@ func TestMixedOSBuildJobErrors(t *testing.T) { Payload: []string{"other", "pipelines"}, }, } - newJobID, err := server.EnqueueOSBuild("x", &newJob) + newJobID, err := server.EnqueueOSBuild("x", &newJob, "") require.NoError(err) oldJobRead := new(worker.OSBuildJob) @@ -844,7 +844,7 @@ func TestMixedOSBuildJobErrors(t *testing.T) { // don't block forever if the jobs weren't added or can't be retrieved ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() - id, token, _, _, _, err := server.RequestJob(ctx, "x", []string{"osbuild"}) + id, token, _, _, _, err := server.RequestJob(ctx, "x", []string{"osbuild"}, []string{""}) require.NoError(err) return id, token } @@ -917,9 +917,9 @@ func TestMixedOSBuildKojiJobErrors(t *testing.T) { enqueueKojiJob := func(job *worker.OSBuildKojiJob) uuid.UUID { initJob := new(worker.KojiInitJob) - initJobID, err := server.EnqueueKojiInit(initJob) + initJobID, err := server.EnqueueKojiInit(initJob, "") require.NoError(err) - jobID, err := server.EnqueueOSBuildKoji("k", job, initJobID) + jobID, err := server.EnqueueOSBuildKoji("k", job, initJobID, "") require.NoError(err) return jobID } @@ -957,7 +957,7 @@ func TestMixedOSBuildKojiJobErrors(t *testing.T) { for idx := uint(0); idx < 2; idx++ { ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() - _, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"koji-init"}) + _, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"koji-init"}, []string{""}) require.NoError(err) require.NoError(server.FinishJob(token, nil)) } @@ -966,7 +966,7 @@ func TestMixedOSBuildKojiJobErrors(t *testing.T) { // don't block forever if the jobs weren't added or can't be retrieved ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() - id, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"osbuild-koji"}) + id, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"osbuild-koji"}, []string{""}) require.NoError(err) return id, token }