diff --git a/cmd/osbuild-composer-dbjobqueue-tests/main_test.go b/cmd/osbuild-composer-dbjobqueue-tests/main_test.go index 1134fc3c8..be3f20ed2 100644 --- a/cmd/osbuild-composer-dbjobqueue-tests/main_test.go +++ b/cmd/osbuild-composer-dbjobqueue-tests/main_test.go @@ -82,19 +82,19 @@ func testJobsUptoByType(t *testing.T, q *dbjobqueue.DBJobQueue) { date85 := time.Date(1985, time.January, 1, 0, 0, 0, 0, time.UTC) date90 := time.Date(1990, time.January, 1, 0, 0, 0, 0, time.UTC) - id80, err := q.Enqueue("octopus", nil, nil) + id80, err := q.Enqueue("octopus", nil, nil, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id80) - _,_,_,_,_, err = q.Dequeue(context.Background(), []string{"octopus"}) + _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) require.NoError(t, err) err = q.FinishJob(id80, nil) require.NoError(t, err) setFinishedAt(t, q, id80, date80) - id85, err := q.Enqueue("octopus", nil, nil) + id85, err := q.Enqueue("octopus", nil, nil, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id85) - _,_,_,_,_, err = q.Dequeue(context.Background(), []string{"octopus"}) + _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) require.NoError(t, err) err = q.FinishJob(id85, nil) require.NoError(t, err) @@ -111,65 +111,65 @@ func testJobsUptoByType(t *testing.T, q *dbjobqueue.DBJobQueue) { func testDeleteJobAndDependencies(t *testing.T, q *dbjobqueue.DBJobQueue) { // id1 -> id2 -> id3 - id1, err := q.Enqueue("octopus", nil, nil) + id1, err := q.Enqueue("octopus", nil, nil, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id1) - id2, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}) + id2, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id2) - id3, err := q.Enqueue("octopus", nil, []uuid.UUID{id2}) + id3, err := q.Enqueue("octopus", nil, []uuid.UUID{id2}, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id3) - c1, err := q.Enqueue("octopus", nil, nil) + c1, err := q.Enqueue("octopus", nil, nil, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, c1) - c2, err := q.Enqueue("octopus", nil, []uuid.UUID{c1}) + c2, err := q.Enqueue("octopus", nil, []uuid.UUID{c1}, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, c2) - c3, err := q.Enqueue("octopus", nil, []uuid.UUID{c2}) + c3, err := q.Enqueue("octopus", nil, []uuid.UUID{c2}, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, c3) controls := []uuid.UUID{c1, c2, c3} - _,_,_, err = q.Job(c1) + _, _, _, _, err = q.Job(c1) require.NoError(t, err) require.NoError(t, q.DeleteJobIncludingDependencies(id3)) for _, id := range []uuid.UUID{id1, id2, id3} { - _,_,_, err = q.Job(id) + _, _, _, _, err = q.Job(id) require.ErrorIs(t, err, jobqueue.ErrNotExist) } // controls should still exist for _, c := range controls { - _,_,_, err = q.Job(c) + _, _, _, _, err = q.Job(c) require.NoError(t, err) } // id1 -> id2 -> id4 && id3 -> id4 - id1, err = q.Enqueue("octopus", nil, nil) + id1, err = q.Enqueue("octopus", nil, nil, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id1) - id2, err = q.Enqueue("octopus", nil, []uuid.UUID{id1}) + id2, err = q.Enqueue("octopus", nil, []uuid.UUID{id1}, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id2) - id3, err = q.Enqueue("octopus", nil, nil) + id3, err = q.Enqueue("octopus", nil, nil, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id3) - id4, err := q.Enqueue("octopus", nil, []uuid.UUID{id2, id3}) + id4, err := q.Enqueue("octopus", nil, []uuid.UUID{id2, id3}, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id4) require.NoError(t, q.DeleteJobIncludingDependencies(id4)) for _, id := range []uuid.UUID{id1, id2, id3, id4} { - _,_,_, err = q.Job(id) + _, _, _, _, err = q.Job(id) require.ErrorIs(t, err, jobqueue.ErrNotExist) } // controls should still exist for _, c := range controls { - _,_,_, err = q.Job(c) + _, _, _, _, err = q.Job(c) require.NoError(t, err) } @@ -177,45 +177,44 @@ func testDeleteJobAndDependencies(t *testing.T, q *dbjobqueue.DBJobQueue) { // situation as it does not occur in the service. This should be changed once we allow // multiple build job per depsolve job, and the depsolve job should only be removed once all // the build jobs have been dealt with. - id1, err = q.Enqueue("octopus", nil, nil) + id1, err = q.Enqueue("octopus", nil, nil, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id1) - id2a, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}) + id2a, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id2a) - id2b, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}) + id2b, err := q.Enqueue("octopus", nil, []uuid.UUID{id1}, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id2b) - id3, err = q.Enqueue("octopus", nil, []uuid.UUID{id2a}) + id3, err = q.Enqueue("octopus", nil, []uuid.UUID{id2a}, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id3) require.NoError(t, q.DeleteJobIncludingDependencies(id3)) for _, id := range []uuid.UUID{id1, id2a, id3} { - _,_,_, err = q.Job(id) + _, _, _, _, err = q.Job(id) require.ErrorIs(t, err, jobqueue.ErrNotExist) } // id2b still exists - _,_,_, err = q.Job(id2b) + _, _, _, _, err = q.Job(id2b) require.NoError(t, err) // id2b can still be deleted with it's dependencies missing require.NoError(t, q.DeleteJobIncludingDependencies(id2b)) - _,_,_, err = q.Job(id2b) + _, _, _, _, err = q.Job(id2b) require.ErrorIs(t, err, jobqueue.ErrNotExist) // controls should still exist for _, c := range controls { - _,_,_, err = q.Job(c) + _, _, _, _, err = q.Job(c) require.NoError(t, err) } require.NoError(t, q.DeleteJobIncludingDependencies(uuid.Nil)) // controls should still exist for _, c := range controls { - _,_,_, err = q.Job(c) + _, _, _, _, err = q.Job(c) require.NoError(t, err) } } - diff --git a/internal/cloudapi/v2/v2.go b/internal/cloudapi/v2/v2.go index b653a6ae5..01bec0d7f 100644 --- a/internal/cloudapi/v2/v2.go +++ b/internal/cloudapi/v2/v2.go @@ -465,12 +465,12 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { var id uuid.UUID if request.Koji != nil { - id, err = enqueueKojiCompose(h.server.workers, uint64(request.Koji.TaskId), request.Koji.Server, request.Koji.Name, request.Koji.Version, request.Koji.Release, distribution, bp, manifestSeed, irs) + id, err = enqueueKojiCompose(h.server.workers, uint64(request.Koji.TaskId), request.Koji.Server, request.Koji.Name, request.Koji.Version, request.Koji.Release, distribution, bp, manifestSeed, irs, "") if err != nil { return err } } else { - id, err = enqueueCompose(h.server.workers, distribution, bp, manifestSeed, irs) + id, err = enqueueCompose(h.server.workers, distribution, bp, manifestSeed, irs, "") if err != nil { return err } @@ -488,7 +488,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { }) } -func enqueueCompose(workers *worker.Server, distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest) (uuid.UUID, error) { +func enqueueCompose(workers *worker.Server, distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) { var id uuid.UUID if len(irs) != 1 { return id, HTTPError(ErrorInvalidNumberOfImageBuilds) @@ -502,12 +502,12 @@ func enqueueCompose(workers *worker.Server, distribution distro.Distro, bp bluep Arch: ir.arch.Name(), Releasever: distribution.Releasever(), PackageSetsRepos: ir.packageSetsRepositories, - }) + }, channel) if err != nil { return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) } - manifestJobID, err := workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobID) + manifestJobID, err := workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobID, channel) if err != nil { return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) } @@ -519,7 +519,7 @@ func enqueueCompose(workers *worker.Server, distribution distro.Distro, bp bluep Build: ir.imageType.BuildPipelines(), Payload: ir.imageType.PayloadPipelines(), }, - }, manifestJobID) + }, manifestJobID, channel) if err != nil { return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) } @@ -530,7 +530,7 @@ func enqueueCompose(workers *worker.Server, distribution distro.Distro, bp bluep return id, nil } -func enqueueKojiCompose(workers *worker.Server, taskID uint64, server, name, version, release string, distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest) (uuid.UUID, error) { +func enqueueKojiCompose(workers *worker.Server, taskID uint64, server, name, version, release string, distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) { var id uuid.UUID kojiDirectory := "osbuild-composer-koji-" + uuid.New().String() @@ -539,7 +539,7 @@ func enqueueKojiCompose(workers *worker.Server, taskID uint64, server, name, ver Name: name, Version: version, Release: release, - }) + }, channel) if err != nil { return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) } @@ -554,12 +554,12 @@ func enqueueKojiCompose(workers *worker.Server, taskID uint64, server, name, ver Arch: ir.arch.Name(), Releasever: distribution.Releasever(), PackageSetsRepos: ir.packageSetsRepositories, - }) + }, channel) if err != nil { return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) } - manifestJobID, err := workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobID) + manifestJobID, err := workers.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobID, channel) if err != nil { return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) } @@ -581,7 +581,7 @@ func enqueueKojiCompose(workers *worker.Server, taskID uint64, server, name, ver KojiServer: server, KojiDirectory: kojiDirectory, KojiFilename: kojiFilename, - }, manifestJobID, initID) + }, manifestJobID, initID, channel) if err != nil { return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) } @@ -600,7 +600,7 @@ func enqueueKojiCompose(workers *worker.Server, taskID uint64, server, name, ver KojiDirectory: kojiDirectory, TaskID: taskID, StartTime: uint64(time.Now().Unix()), - }, initID, buildIDs) + }, initID, buildIDs, channel) if err != nil { return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err) } diff --git a/internal/cloudapi/v2/v2_koji_test.go b/internal/cloudapi/v2/v2_koji_test.go index 24d4d46ef..f1ab0ccf2 100644 --- a/internal/cloudapi/v2/v2_koji_test.go +++ b/internal/cloudapi/v2/v2_koji_test.go @@ -12,6 +12,8 @@ import ( "time" "github.com/google/uuid" + "github.com/stretchr/testify/require" + v2 "github.com/osbuild/osbuild-composer/internal/cloudapi/v2" "github.com/osbuild/osbuild-composer/internal/distro/test_distro" "github.com/osbuild/osbuild-composer/internal/kojiapi/api" @@ -19,7 +21,6 @@ import ( "github.com/osbuild/osbuild-composer/internal/test" "github.com/osbuild/osbuild-composer/internal/worker" "github.com/osbuild/osbuild-composer/internal/worker/clienterrors" - "github.com/stretchr/testify/require" ) type jobResult struct { @@ -368,7 +369,7 @@ func TestKojiCompose(t *testing.T) { c.composeReplyCode, c.composeReply, "id", "operation_id") // handle koji-init - _, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"koji-init"}) + _, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"koji-init"}, []string{""}) require.NoError(t, err) require.Equal(t, "koji-init", jobType) @@ -386,7 +387,7 @@ func TestKojiCompose(t *testing.T) { fmt.Sprintf(`{"href":"/api/worker/v1/jobs/%v","id":"%v","kind":"UpdateJobResponse"}`, token, token)) // handle osbuild-koji #1 - _, token, jobType, rawJob, _, err = workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild-koji"}) + _, token, jobType, rawJob, _, err = workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild-koji"}, []string{""}) require.NoError(t, err) require.Equal(t, "osbuild-koji", jobType) @@ -403,7 +404,7 @@ func TestKojiCompose(t *testing.T) { fmt.Sprintf(`{"href":"/api/worker/v1/jobs/%v","id":"%v","kind":"UpdateJobResponse"}`, token, token)) // handle osbuild-koji #2 - _, token, jobType, rawJob, _, err = workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild-koji"}) + _, token, jobType, rawJob, _, err = workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild-koji"}, []string{""}) require.NoError(t, err) require.Equal(t, "osbuild-koji", jobType) @@ -427,7 +428,7 @@ func TestKojiCompose(t *testing.T) { fmt.Sprintf(`{"href":"/api/worker/v1/jobs/%v","id":"%v","kind":"UpdateJobResponse"}`, token, token)) // handle koji-finalize - finalizeID, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"koji-finalize"}) + finalizeID, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"koji-finalize"}, []string{""}) require.NoError(t, err) require.Equal(t, "koji-finalize", jobType) @@ -510,7 +511,7 @@ func TestKojiJobTypeValidation(t *testing.T) { Version: "42", Release: "1", } - initID, err := workers.EnqueueKojiInit(&initJob) + initID, err := workers.EnqueueKojiInit(&initJob, "") require.NoError(t, err) buildJobs := make([]worker.OSBuildKojiJob, nImages) @@ -524,7 +525,7 @@ func TestKojiJobTypeValidation(t *testing.T) { KojiDirectory: "koji-server-test-dir", KojiFilename: fname, } - buildID, err := workers.EnqueueOSBuildKoji(fmt.Sprintf("fake-arch-%d", idx), &buildJob, initID) + buildID, err := workers.EnqueueOSBuildKoji(fmt.Sprintf("fake-arch-%d", idx), &buildJob, initID, "") require.NoError(t, err) buildJobs[idx] = buildJob @@ -542,7 +543,7 @@ func TestKojiJobTypeValidation(t *testing.T) { TaskID: 0, StartTime: uint64(time.Now().Unix()), } - finalizeID, err := workers.EnqueueKojiFinalize(&finalizeJob, initID, buildJobIDs) + finalizeID, err := workers.EnqueueKojiFinalize(&finalizeJob, initID, buildJobIDs, "") require.NoError(t, err) // ----- Jobs queued - Test API endpoints (status, manifests, logs) ----- // diff --git a/internal/cloudapi/v2/v2_test.go b/internal/cloudapi/v2/v2_test.go index 85f958c5e..21fe950af 100644 --- a/internal/cloudapi/v2/v2_test.go +++ b/internal/cloudapi/v2/v2_test.go @@ -39,7 +39,7 @@ func newV2Server(t *testing.T, dir string) (*v2.Server, *worker.Server, context. depsolveContext, cancel := context.WithCancel(context.Background()) go func() { for { - _, token, _, _, _, err := workerServer.RequestJob(context.Background(), test_distro.TestDistroName, []string{"depsolve"}) + _, token, _, _, _, err := workerServer.RequestJob(context.Background(), test_distro.TestDistroName, []string{"depsolve"}, []string{""}) if err != nil { continue } @@ -575,7 +575,7 @@ func TestComposeStatusSuccess(t *testing.T) { "kind": "ComposeId" }`, "id") - jobId, token, jobType, args, dynArgs, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"}) + jobId, token, jobType, args, dynArgs, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"}, []string{""}) require.NoError(t, err) require.Equal(t, "osbuild", jobType) @@ -647,7 +647,7 @@ func TestComposeStatusFailure(t *testing.T) { "kind": "ComposeId" }`, "id") - jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"}) + jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"}, []string{""}) require.NoError(t, err) require.Equal(t, "osbuild", jobType) @@ -699,7 +699,7 @@ func TestComposeLegacyError(t *testing.T) { "kind": "ComposeId" }`, "id") - jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"}) + jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"}, []string{""}) require.NoError(t, err) require.Equal(t, "osbuild", jobType) @@ -754,7 +754,7 @@ func TestComposeJobError(t *testing.T) { "kind": "ComposeId" }`, "id") - jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"}) + jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{"osbuild"}, []string{""}) require.NoError(t, err) require.Equal(t, "osbuild", jobType) diff --git a/internal/jobqueue/dbjobqueue/dbjobqueue.go b/internal/jobqueue/dbjobqueue/dbjobqueue.go index 84f289a03..f3d129633 100644 --- a/internal/jobqueue/dbjobqueue/dbjobqueue.go +++ b/internal/jobqueue/dbjobqueue/dbjobqueue.go @@ -28,7 +28,7 @@ const ( sqlListen = `LISTEN jobs` sqlUnlisten = `UNLISTEN jobs` - sqlEnqueue = `INSERT INTO jobs(id, type, args, queued_at) VALUES ($1, $2, $3, NOW())` + sqlEnqueue = `INSERT INTO jobs(id, type, args, queued_at, channel) VALUES ($1, $2, $3, NOW(), $4)` sqlDequeue = ` UPDATE jobs SET token = $1, started_at = now() @@ -37,7 +37,7 @@ const ( FROM ready_jobs -- use ANY here, because "type in ()" doesn't work with bound parameters -- literal syntax for this is '{"a", "b"}': https://www.postgresql.org/docs/13/arrays.html - WHERE type = ANY($2) + WHERE type = ANY($2) AND channel = ANY($3) LIMIT 1 FOR UPDATE SKIP LOCKED ) @@ -62,7 +62,7 @@ const ( WHERE job_id = $1` sqlQueryJob = ` - SELECT type, args, started_at, finished_at, canceled + SELECT type, args, channel, started_at, finished_at, canceled FROM jobs WHERE id = $1` sqlQueryJobStatus = ` @@ -141,7 +141,7 @@ func (q *DBJobQueue) Close() { q.pool.Close() } -func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID) (uuid.UUID, error) { +func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error) { conn, err := q.pool.Acquire(context.Background()) if err != nil { return uuid.Nil, fmt.Errorf("error connecting to database: %v", err) @@ -160,7 +160,7 @@ func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu }() id := uuid.New() - _, err = conn.Exec(context.Background(), sqlEnqueue, id, jobType, args) + _, err = conn.Exec(context.Background(), sqlEnqueue, id, jobType, args, channel) if err != nil { return uuid.Nil, fmt.Errorf("error enqueuing job: %v", err) } @@ -187,7 +187,7 @@ func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu return id, nil } -func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { +func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { // Return early if the context is already canceled. if err := ctx.Err(); err != nil { return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout @@ -216,7 +216,7 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, var started, queued *time.Time token := uuid.New() for { - err = conn.QueryRow(ctx, sqlDequeue, token, jobTypes).Scan(&id, &token, &jobType, &args, &queued, &started) + err = conn.QueryRow(ctx, sqlDequeue, token, jobTypes, channels).Scan(&id, &token, &jobType, &args, &queued, &started) if err == nil { break } @@ -310,7 +310,7 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error { var started, finished *time.Time var jobType string canceled := false - err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, nil, &started, &finished, &canceled) + err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, nil, nil, &started, &finished, &canceled) if err == pgx.ErrNoRows { return jobqueue.ErrNotExist } @@ -409,14 +409,14 @@ func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, result json.RawMes } // Job returns all the parameters that define a job (everything provided during Enqueue). -func (q *DBJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, err error) { +func (q *DBJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, channel string, err error) { conn, err := q.pool.Acquire(context.Background()) if err != nil { return } defer conn.Release() - err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, &args, nil, nil, nil) + err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, &args, &channel, nil, nil, nil) if err == pgx.ErrNoRows { err = jobqueue.ErrNotExist return diff --git a/internal/jobqueue/dbjobqueue/schemas/002_channels.sql b/internal/jobqueue/dbjobqueue/schemas/002_channels.sql new file mode 100644 index 000000000..5c46e88f4 --- /dev/null +++ b/internal/jobqueue/dbjobqueue/schemas/002_channels.sql @@ -0,0 +1,15 @@ +ALTER TABLE jobs +ADD COLUMN channel varchar NOT NULL DEFAULT ''; + +-- We added a column, thus we have to recreate the view. +CREATE OR REPLACE VIEW ready_jobs AS +SELECT * +FROM jobs +WHERE started_at IS NULL + AND canceled = FALSE + AND id NOT IN ( + SELECT job_id + FROM job_dependencies JOIN jobs ON dependency_id = id + WHERE finished_at IS NULL +) +ORDER BY queued_at ASC diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index 949c7b256..e3c8c5797 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -67,6 +67,7 @@ type job struct { Args json.RawMessage `json:"args,omitempty"` Dependencies []uuid.UUID `json:"dependencies"` Result json.RawMessage `json:"result,omitempty"` + Channel string `json:"channel"` QueuedAt time.Time `json:"queued_at,omitempty"` StartedAt time.Time `json:"started_at,omitempty"` @@ -127,7 +128,7 @@ func New(dir string) (*fsJobQueue, error) { return q, nil } -func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID) (uuid.UUID, error) { +func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error) { q.mu.Lock() defer q.mu.Unlock() @@ -137,6 +138,7 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu Type: jobType, Dependencies: dependencies, QueuedAt: time.Now(), + Channel: channel, } var err error @@ -172,7 +174,7 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu return j.Id, nil } -func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { +func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { q.mu.Lock() defer q.mu.Unlock() @@ -191,7 +193,7 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, for { var found bool var err error - j, found, err = q.dequeueSuitableJob(jobTypes) + j, found, err = q.dequeueSuitableJob(jobTypes, channels) if err != nil { return uuid.Nil, uuid.Nil, nil, "", nil, err } @@ -358,7 +360,7 @@ func (q *fsJobQueue) JobStatus(id uuid.UUID) (jobType string, result json.RawMes return } -func (q *fsJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, err error) { +func (q *fsJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, channel string, err error) { j, err := q.readJob(id) if err != nil { return @@ -367,6 +369,7 @@ func (q *fsJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, de jobType = j.Type args = j.Args dependencies = j.Dependencies + channel = j.Channel return } @@ -476,10 +479,11 @@ func (q *fsJobQueue) hasAllFinishedDependencies(j *job) (bool, error) { // - must be pending // - its dependencies must be finished // - must be of one of the type from jobTypes +// - must be of one of the channel from channels // // If a suitable job is not found, false is returned. // If an error occurs during the search, it's returned. -func (q *fsJobQueue) dequeueSuitableJob(jobTypes []string) (*job, bool, error) { +func (q *fsJobQueue) dequeueSuitableJob(jobTypes []string, channels []string) (*job, bool, error) { el := q.pending.Front() for el != nil { id := el.Value.(uuid.UUID) @@ -489,7 +493,7 @@ func (q *fsJobQueue) dequeueSuitableJob(jobTypes []string) (*job, bool, error) { return nil, false, err } - if !jobMatchesCriteria(j, jobTypes) { + if !jobMatchesCriteria(j, jobTypes, channels) { el = el.Next() continue } @@ -527,12 +531,17 @@ func (q *fsJobQueue) removePendingJob(id uuid.UUID) { // // Criteria: // - the job's type is one of the acceptedJobTypes -func jobMatchesCriteria(j *job, acceptedJobTypes []string) bool { - for _, jt := range acceptedJobTypes { - if jt == j.Type { - return true +// - the job's channel is one of the acceptedChannels +func jobMatchesCriteria(j *job, acceptedJobTypes []string, acceptedChannels []string) bool { + contains := func(slice []string, str string) bool { + for _, item := range slice { + if str == item { + return true + } } + + return false } - return false + return contains(acceptedJobTypes, j.Type) && contains(acceptedChannels, j.Channel) } diff --git a/internal/jobqueue/jobqueue.go b/internal/jobqueue/jobqueue.go index 02cb98066..28ae645d2 100644 --- a/internal/jobqueue/jobqueue.go +++ b/internal/jobqueue/jobqueue.go @@ -32,16 +32,16 @@ type JobQueue interface { // have finished. // // Returns the id of the new job, or an error. - Enqueue(jobType string, args interface{}, dependencies []uuid.UUID) (uuid.UUID, error) + Enqueue(jobType string, args interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error) // Dequeues a job, blocking until one is available. // - // Waits until a job with a type of any of `jobTypes` is available, or `ctx` is - // canceled. + // Waits until a job with a type of any of `jobTypes` and any of `channels` + // is available, or `ctx` is canceled. // // Returns the job's id, token, dependencies, type, and arguments, or an error. Arguments // can be unmarshaled to the type given in Enqueue(). - Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) + Dequeue(ctx context.Context, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) // Dequeues a pending job by its ID in a non-blocking way. // @@ -67,7 +67,7 @@ type JobQueue interface { JobStatus(id uuid.UUID) (jobType string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) // Job returns all the parameters that define a job (everything provided during Enqueue). - Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, err error) + Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, channel string, err error) // Find job by token, this will return an error if the job hasn't been dequeued IdFromToken(token uuid.UUID) (id uuid.UUID, err error) diff --git a/internal/jobqueue/jobqueuetest/jobqueuetest.go b/internal/jobqueue/jobqueuetest/jobqueuetest.go index b7f403a3e..9e4e9e51e 100644 --- a/internal/jobqueue/jobqueuetest/jobqueuetest.go +++ b/internal/jobqueue/jobqueuetest/jobqueuetest.go @@ -45,14 +45,14 @@ func TestJobQueue(t *testing.T, makeJobQueue MakeJobQueue) { func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID) uuid.UUID { t.Helper() - id, err := q.Enqueue(jobType, args, dependencies) + id, err := q.Enqueue(jobType, args, dependencies, "") require.NoError(t, err) require.NotEmpty(t, id) return id } func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result interface{}, deps []uuid.UUID) uuid.UUID { - id, tok, d, typ, args, err := q.Dequeue(context.Background(), []string{jobType}) + id, tok, d, typ, args, err := q.Dequeue(context.Background(), []string{jobType}, []string{""}) require.NoError(t, err) require.NotEmpty(t, id) require.NotEmpty(t, tok) @@ -68,18 +68,18 @@ func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result func testErrors(t *testing.T, q jobqueue.JobQueue) { // not serializable to JSON - id, err := q.Enqueue("test", make(chan string), nil) + id, err := q.Enqueue("test", make(chan string), nil, "") require.Error(t, err) require.Equal(t, uuid.Nil, id) // invalid dependency - id, err = q.Enqueue("test", "arg0", []uuid.UUID{uuid.New()}) + id, err = q.Enqueue("test", "arg0", []uuid.UUID{uuid.New()}, "") require.Error(t, err) require.Equal(t, uuid.Nil, id) // token gets removed pushTestJob(t, q, "octopus", nil, nil) - id, tok, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}) + id, tok, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) require.NoError(t, err) require.NotEmpty(t, tok) @@ -110,7 +110,7 @@ func testArgs(t *testing.T, q jobqueue.JobQueue) { var parsedArgs argument - id, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"octopus"}) + id, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) require.NoError(t, err) require.Equal(t, two, id) require.NotEmpty(t, tok) @@ -121,13 +121,13 @@ func testArgs(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, twoargs, parsedArgs) // Read job params after Dequeue - jtype, jargs, jdeps, err := q.Job(id) + jtype, jargs, jdeps, _, err := q.Job(id) require.NoError(t, err) require.Equal(t, args, jargs) require.Equal(t, deps, jdeps) require.Equal(t, typ, jtype) - id, tok, deps, typ, args, err = q.Dequeue(context.Background(), []string{"fish"}) + id, tok, deps, typ, args, err = q.Dequeue(context.Background(), []string{"fish"}, []string{""}) require.NoError(t, err) require.Equal(t, one, id) require.NotEmpty(t, tok) @@ -137,13 +137,13 @@ func testArgs(t *testing.T, q jobqueue.JobQueue) { require.NoError(t, err) require.Equal(t, oneargs, parsedArgs) - jtype, jargs, jdeps, err = q.Job(id) + jtype, jargs, jdeps, _, err = q.Job(id) require.NoError(t, err) require.Equal(t, args, jargs) require.Equal(t, deps, jdeps) require.Equal(t, typ, jtype) - _, _, _, err = q.Job(uuid.New()) + _, _, _, _, err = q.Job(uuid.New()) require.Error(t, err) } @@ -156,7 +156,7 @@ func testJobTypes(t *testing.T, q jobqueue.JobQueue) { ctx, cancel := context.WithCancel(context.Background()) cancel() - id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"zebra"}) + id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"zebra"}, []string{""}) require.Equal(t, err, jobqueue.ErrDequeueTimeout) require.Equal(t, uuid.Nil, id) require.Equal(t, uuid.Nil, tok) @@ -168,12 +168,12 @@ func testJobTypes(t *testing.T, q jobqueue.JobQueue) { func testDequeueTimeout(t *testing.T, q jobqueue.JobQueue) { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*20) defer cancel() - _, _, _, _, _, err := q.Dequeue(ctx, []string{"octopus"}) + _, _, _, _, _, err := q.Dequeue(ctx, []string{"octopus"}, []string{""}) require.Equal(t, jobqueue.ErrDequeueTimeout, err) ctx2, cancel2 := context.WithCancel(context.Background()) cancel2() - _, _, _, _, _, err = q.Dequeue(ctx2, []string{"octopus"}) + _, _, _, _, _, err = q.Dequeue(ctx2, []string{"octopus"}, []string{""}) require.Equal(t, jobqueue.ErrDequeueTimeout, err) } @@ -255,7 +255,7 @@ func testMultipleWorkers(t *testing.T, q jobqueue.JobQueue) { defer close(done) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"octopus"}) + id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"octopus"}, []string{""}) require.NoError(t, err) require.NotEmpty(t, id) require.NotEmpty(t, tok) @@ -270,7 +270,7 @@ func testMultipleWorkers(t *testing.T, q jobqueue.JobQueue) { // This call to Dequeue() should not block on the one in the goroutine. id := pushTestJob(t, q, "clownfish", nil, nil) - r, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}) + r, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}, []string{""}) require.NoError(t, err) require.Equal(t, id, r) require.NotEmpty(t, tok) @@ -293,7 +293,7 @@ func testMultipleWorkersSingleJobType(t *testing.T, q jobqueue.JobQueue) { defer wg.Add(-1) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"clownfish"}) + id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"clownfish"}, []string{""}) require.NoError(t, err) require.NotEmpty(t, id) require.NotEmpty(t, tok) @@ -340,7 +340,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { // Cancel a running job, which should not dequeue the canceled job from above id = pushTestJob(t, q, "clownfish", nil, nil) require.NotEmpty(t, id) - r, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}) + r, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}, []string{""}) require.NoError(t, err) require.Equal(t, id, r) require.NotEmpty(t, tok) @@ -360,7 +360,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { // Cancel a finished job, which is a no-op id = pushTestJob(t, q, "clownfish", nil, nil) require.NotEmpty(t, id) - r, tok, deps, typ, args, err = q.Dequeue(context.Background(), []string{"clownfish"}) + r, tok, deps, typ, args, err = q.Dequeue(context.Background(), []string{"clownfish"}, []string{""}) require.NoError(t, err) require.Equal(t, id, r) require.NotEmpty(t, tok) @@ -385,7 +385,7 @@ func testHeartbeats(t *testing.T, q jobqueue.JobQueue) { // No heartbeats for queued job require.Empty(t, q.Heartbeats(time.Second*0)) - r, tok, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}) + r, tok, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) require.NoError(t, err) require.Equal(t, id, r) require.NotEmpty(t, tok) @@ -447,7 +447,7 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) { t.Run("cannot dequeue a non-pending job", func(t *testing.T) { one := pushTestJob(t, q, "octopus", nil, nil) - _, _, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}) + _, _, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) require.NoError(t, err) _, _, _, _, err = q.DequeueByID(context.Background(), one) diff --git a/internal/kojiapi/server.go b/internal/kojiapi/server.go index e73f8866c..b49130730 100644 --- a/internal/kojiapi/server.go +++ b/internal/kojiapi/server.go @@ -161,7 +161,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { Name: request.Name, Version: request.Version, Release: request.Release, - }) + }, "") if err != nil { // This is a programming error. panic(err) @@ -177,7 +177,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { KojiServer: request.Koji.Server, KojiDirectory: kojiDirectory, KojiFilename: kojiFilenames[i], - }, initID) + }, initID, "") if err != nil { // This is a programming error. panic(err) @@ -194,7 +194,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { KojiDirectory: kojiDirectory, TaskID: uint64(request.Koji.TaskId), StartTime: uint64(time.Now().Unix()), - }, initID, buildIDs) + }, initID, buildIDs, "") if err != nil { // This is a programming error. panic(err) diff --git a/internal/kojiapi/server_test.go b/internal/kojiapi/server_test.go index e3cd1a1dc..f133a7e0b 100644 --- a/internal/kojiapi/server_test.go +++ b/internal/kojiapi/server_test.go @@ -14,6 +14,8 @@ import ( "time" "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/osbuild/osbuild-composer/internal/distro/test_distro" "github.com/osbuild/osbuild-composer/internal/kojiapi" "github.com/osbuild/osbuild-composer/internal/kojiapi/api" @@ -23,7 +25,6 @@ import ( "github.com/osbuild/osbuild-composer/internal/test" "github.com/osbuild/osbuild-composer/internal/worker" "github.com/osbuild/osbuild-composer/internal/worker/clienterrors" - "github.com/stretchr/testify/require" ) func newTestKojiServer(t *testing.T, dir string) (*kojiapi.Server, *worker.Server) { @@ -334,7 +335,7 @@ func TestCompose(t *testing.T) { wg.Add(1) go func(t *testing.T, result worker.KojiInitJobResult) { - _, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"koji-init"}) + _, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"koji-init"}, []string{""}) require.NoError(t, err) require.Equal(t, "koji-init", jobType) @@ -387,7 +388,7 @@ func TestCompose(t *testing.T) { c.composeReplyCode, c.composeReply, "id") wg.Wait() - _, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"osbuild-koji"}) + _, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"osbuild-koji"}, []string{""}) require.NoError(t, err) require.Equal(t, "osbuild-koji", jobType) @@ -403,7 +404,7 @@ func TestCompose(t *testing.T) { test.TestRoute(t, workerHandler, false, "PATCH", fmt.Sprintf("/api/worker/v1/jobs/%v", token), string(buildJobResult), http.StatusOK, fmt.Sprintf(`{"href":"/api/worker/v1/jobs/%v","id":"%v","kind":"UpdateJobResponse"}`, token, token)) - _, token, jobType, rawJob, _, err = workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"osbuild-koji"}) + _, token, jobType, rawJob, _, err = workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"osbuild-koji"}, []string{""}) require.NoError(t, err) require.Equal(t, "osbuild-koji", jobType) @@ -426,7 +427,7 @@ func TestCompose(t *testing.T) { }`, test_distro.TestArchName, test_distro.TestDistroName), http.StatusOK, fmt.Sprintf(`{"href":"/api/worker/v1/jobs/%v","id":"%v","kind":"UpdateJobResponse"}`, token, token)) - finalizeID, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"koji-finalize"}) + finalizeID, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArchName, []string{"koji-finalize"}, []string{""}) require.NoError(t, err) require.Equal(t, "koji-finalize", jobType) @@ -507,7 +508,7 @@ func TestJobTypeValidation(t *testing.T) { Version: "42", Release: "1", } - initID, err := workers.EnqueueKojiInit(&initJob) + initID, err := workers.EnqueueKojiInit(&initJob, "") require.NoError(t, err) buildJobs := make([]worker.OSBuildKojiJob, nImages) @@ -521,7 +522,7 @@ func TestJobTypeValidation(t *testing.T) { KojiDirectory: "koji-server-test-dir", KojiFilename: fname, } - buildID, err := workers.EnqueueOSBuildKoji(fmt.Sprintf("fake-arch-%d", idx), &buildJob, initID) + buildID, err := workers.EnqueueOSBuildKoji(fmt.Sprintf("fake-arch-%d", idx), &buildJob, initID, "") require.NoError(t, err) buildJobs[idx] = buildJob @@ -539,7 +540,7 @@ func TestJobTypeValidation(t *testing.T) { TaskID: 0, StartTime: uint64(time.Now().Unix()), } - finalizeID, err := workers.EnqueueKojiFinalize(&finalizeJob, initID, buildJobIDs) + finalizeID, err := workers.EnqueueKojiFinalize(&finalizeJob, initID, buildJobIDs, "") require.NoError(t, err) // ----- Jobs queued - Test API endpoints (status, manifests, logs) ----- // diff --git a/internal/weldr/api.go b/internal/weldr/api.go index 7be7d1bbd..ab08f6145 100644 --- a/internal/weldr/api.go +++ b/internal/weldr/api.go @@ -2337,7 +2337,7 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request Build: imageType.BuildPipelines(), Payload: imageType.PayloadPipelines(), }, - }) + }, "") if err == nil { err = api.store.PushCompose(composeID, manifest, imageType, bp, size, targets, jobId, packageSets["packages"]) } diff --git a/internal/weldr/compose_test.go b/internal/weldr/compose_test.go index 926fa01d3..d63de4ace 100644 --- a/internal/weldr/compose_test.go +++ b/internal/weldr/compose_test.go @@ -7,12 +7,13 @@ import ( "os" "testing" + "github.com/stretchr/testify/require" + "github.com/osbuild/osbuild-composer/internal/distro" "github.com/osbuild/osbuild-composer/internal/distro/test_distro" rpmmd_mock "github.com/osbuild/osbuild-composer/internal/mocks/rpmmd" "github.com/osbuild/osbuild-composer/internal/worker" "github.com/osbuild/osbuild-composer/internal/worker/clienterrors" - "github.com/stretchr/testify/require" ) func TestComposeStatusFromLegacyError(t *testing.T) { @@ -41,10 +42,10 @@ func TestComposeStatusFromLegacyError(t *testing.T) { t.Fatalf("error creating osbuild manifest: %v", err) } - jobId, err := api.workers.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) + jobId, err := api.workers.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "") require.NoError(t, err) - j, token, _, _, _, err := api.workers.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + j, token, _, _, _, err := api.workers.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""}) require.NoError(t, err) require.Equal(t, jobId, j) @@ -87,10 +88,10 @@ func TestComposeStatusFromJobError(t *testing.T) { t.Fatalf("error creating osbuild manifest: %v", err) } - jobId, err := api.workers.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) + jobId, err := api.workers.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "") require.NoError(t, err) - j, token, _, _, _, err := api.workers.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + j, token, _, _, _, err := api.workers.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""}) require.NoError(t, err) require.Equal(t, jobId, j) diff --git a/internal/worker/server.go b/internal/worker/server.go index 4ba38579a..88a2831d4 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -93,41 +93,41 @@ func (s *Server) WatchHeartbeats() { } } -func (s *Server) EnqueueOSBuild(arch string, job *OSBuildJob) (uuid.UUID, error) { - return s.enqueue("osbuild:"+arch, job, nil) +func (s *Server) EnqueueOSBuild(arch string, job *OSBuildJob, channel string) (uuid.UUID, error) { + return s.enqueue("osbuild:"+arch, job, nil, channel) } -func (s *Server) EnqueueOSBuildAsDependency(arch string, job *OSBuildJob, manifestID uuid.UUID) (uuid.UUID, error) { - return s.enqueue("osbuild:"+arch, job, []uuid.UUID{manifestID}) +func (s *Server) EnqueueOSBuildAsDependency(arch string, job *OSBuildJob, manifestID uuid.UUID, channel string) (uuid.UUID, error) { + return s.enqueue("osbuild:"+arch, job, []uuid.UUID{manifestID}, channel) } -func (s *Server) EnqueueOSBuildKoji(arch string, job *OSBuildKojiJob, initID uuid.UUID) (uuid.UUID, error) { - return s.enqueue("osbuild-koji:"+arch, job, []uuid.UUID{initID}) +func (s *Server) EnqueueOSBuildKoji(arch string, job *OSBuildKojiJob, initID uuid.UUID, channel string) (uuid.UUID, error) { + return s.enqueue("osbuild-koji:"+arch, job, []uuid.UUID{initID}, channel) } -func (s *Server) EnqueueOSBuildKojiAsDependency(arch string, job *OSBuildKojiJob, manifestID, initID uuid.UUID) (uuid.UUID, error) { - return s.enqueue("osbuild-koji:"+arch, job, []uuid.UUID{initID, manifestID}) +func (s *Server) EnqueueOSBuildKojiAsDependency(arch string, job *OSBuildKojiJob, manifestID, initID uuid.UUID, channel string) (uuid.UUID, error) { + return s.enqueue("osbuild-koji:"+arch, job, []uuid.UUID{initID, manifestID}, channel) } -func (s *Server) EnqueueKojiInit(job *KojiInitJob) (uuid.UUID, error) { - return s.enqueue("koji-init", job, nil) +func (s *Server) EnqueueKojiInit(job *KojiInitJob, channel string) (uuid.UUID, error) { + return s.enqueue("koji-init", job, nil, channel) } -func (s *Server) EnqueueKojiFinalize(job *KojiFinalizeJob, initID uuid.UUID, buildIDs []uuid.UUID) (uuid.UUID, error) { - return s.enqueue("koji-finalize", job, append([]uuid.UUID{initID}, buildIDs...)) +func (s *Server) EnqueueKojiFinalize(job *KojiFinalizeJob, initID uuid.UUID, buildIDs []uuid.UUID, channel string) (uuid.UUID, error) { + return s.enqueue("koji-finalize", job, append([]uuid.UUID{initID}, buildIDs...), channel) } -func (s *Server) EnqueueDepsolve(job *DepsolveJob) (uuid.UUID, error) { - return s.enqueue("depsolve", job, nil) +func (s *Server) EnqueueDepsolve(job *DepsolveJob, channel string) (uuid.UUID, error) { + return s.enqueue("depsolve", job, nil, channel) } -func (s *Server) EnqueueManifestJobByID(job *ManifestJobByID, parent uuid.UUID) (uuid.UUID, error) { - return s.enqueue("manifest-id-only", job, []uuid.UUID{parent}) +func (s *Server) EnqueueManifestJobByID(job *ManifestJobByID, parent uuid.UUID, channel string) (uuid.UUID, error) { + return s.enqueue("manifest-id-only", job, []uuid.UUID{parent}, channel) } -func (s *Server) enqueue(jobType string, job interface{}, dependencies []uuid.UUID) (uuid.UUID, error) { +func (s *Server) enqueue(jobType string, job interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error) { prometheus.EnqueueJobMetrics(jobType) - return s.jobs.Enqueue(jobType, job, dependencies) + return s.jobs.Enqueue(jobType, job, dependencies, channel) } func (s *Server) OSBuildJobStatus(id uuid.UUID, result *OSBuildJobResult) (*JobStatus, []uuid.UUID, error) { @@ -289,7 +289,7 @@ func (s *Server) jobStatus(id uuid.UUID, result interface{}) (string, *JobStatus // OSBuildJob returns the parameters of an OSBuildJob func (s *Server) OSBuildJob(id uuid.UUID, job *OSBuildJob) error { - jobType, rawArgs, _, err := s.jobs.Job(id) + jobType, rawArgs, _, _, err := s.jobs.Job(id) if err != nil { return err } @@ -307,7 +307,7 @@ func (s *Server) OSBuildJob(id uuid.UUID, job *OSBuildJob) error { // OSBuildKojiJob returns the parameters of an OSBuildKojiJob func (s *Server) OSBuildKojiJob(id uuid.UUID, job *OSBuildKojiJob) error { - jobType, rawArgs, _, err := s.jobs.Job(id) + jobType, rawArgs, _, _, err := s.jobs.Job(id) if err != nil { return err } @@ -325,7 +325,7 @@ func (s *Server) OSBuildKojiJob(id uuid.UUID, job *OSBuildKojiJob) error { // JobType returns the type of the job func (s *Server) JobType(id uuid.UUID) (string, error) { - jobType, _, _, err := s.jobs.Job(id) + jobType, _, _, _, err := s.jobs.Job(id) // the architecture is internally encdode in the job type, but hide that // from this API return strings.Split(jobType, ":")[0], err @@ -389,15 +389,15 @@ func (s *Server) DeleteArtifacts(id uuid.UUID) error { return os.RemoveAll(path.Join(s.artifactsDir, id.String())) } -func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) { - return s.requestJob(ctx, arch, jobTypes, uuid.Nil) +func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) { + return s.requestJob(ctx, arch, jobTypes, uuid.Nil, channels) } func (s *Server) RequestJobById(ctx context.Context, arch string, requestedJobId uuid.UUID) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) { - return s.requestJob(ctx, arch, []string{}, requestedJobId) + return s.requestJob(ctx, arch, []string{}, requestedJobId, nil) } -func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, requestedJobId uuid.UUID) ( +func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, requestedJobId uuid.UUID, channels []string) ( jobId uuid.UUID, token uuid.UUID, jobType string, args json.RawMessage, dynamicArgs []json.RawMessage, err error) { // treat osbuild jobs specially until we have found a generic way to // specify dequeuing restrictions. For now, we only have one @@ -425,7 +425,7 @@ func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, jobId = requestedJobId token, depIDs, jobType, args, err = s.jobs.DequeueByID(dequeueCtx, requestedJobId) } else { - jobId, token, depIDs, jobType, args, err = s.jobs.Dequeue(dequeueCtx, jts) + jobId, token, depIDs, jobType, args, err = s.jobs.Dequeue(dequeueCtx, jts, channels) } if err != nil { return @@ -550,7 +550,7 @@ func (h *apiHandlers) RequestJob(ctx echo.Context) error { return err } - jobId, token, jobType, jobArgs, dynamicJobArgs, err := h.server.RequestJob(ctx.Request().Context(), body.Arch, body.Types) + jobId, token, jobType, jobArgs, dynamicJobArgs, err := h.server.RequestJob(ctx.Request().Context(), body.Arch, body.Types, []string{""}) if err != nil { if err == jobqueue.ErrDequeueTimeout { return ctx.JSON(http.StatusNoContent, api.ObjectReference{ diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go index e3f4a1323..8db49b776 100644 --- a/internal/worker/server_test.go +++ b/internal/worker/server_test.go @@ -129,7 +129,7 @@ func TestCreate(t *testing.T) { server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() - _, err = server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) + _, err = server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "") require.NoError(t, err) test.TestRoute(t, handler, false, "POST", "/api/worker/v1/jobs", @@ -158,10 +158,10 @@ func TestCancel(t *testing.T) { server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() - jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) + jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "") require.NoError(t, err) - j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""}) require.NoError(t, err) require.Equal(t, jobId, j) require.Equal(t, "osbuild", typ) @@ -199,10 +199,10 @@ func TestUpdate(t *testing.T) { server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() - jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) + jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "") require.NoError(t, err) - j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""}) require.NoError(t, err) require.Equal(t, jobId, j) require.Equal(t, "osbuild", typ) @@ -238,10 +238,10 @@ func TestArgs(t *testing.T) { Payload: []string{"x", "y", "z"}, }, } - jobId, err := server.EnqueueOSBuild(arch.Name(), &job) + jobId, err := server.EnqueueOSBuild(arch.Name(), &job, "") require.NoError(t, err) - _, _, _, args, _, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + _, _, _, args, _, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""}) require.NoError(t, err) require.NotNil(t, args) @@ -272,10 +272,10 @@ func TestUpload(t *testing.T) { server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() - jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) + jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "") require.NoError(t, err) - j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""}) require.NoError(t, err) require.Equal(t, jobID, j) require.Equal(t, "osbuild", typ) @@ -306,10 +306,10 @@ func TestUploadAlteredBasePath(t *testing.T) { server := newTestServer(t, tempdir, time.Duration(0), "/api/image-builder-worker/v1") handler := server.Handler() - jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) + jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "") require.NoError(t, err) - j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""}) require.NoError(t, err) require.Equal(t, jobID, j) require.Equal(t, "osbuild", typ) @@ -379,7 +379,7 @@ func TestOAuth(t *testing.T) { t.Fatalf("error creating osbuild manifest: %v", err) } - _, err = workerServer.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) + _, err = workerServer.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}, "") require.NoError(t, err) client, err := worker.NewClient(proxySrv.URL, nil, &offlineToken, &oauthSrv.URL, "/api/image-builder-worker/v1") @@ -404,7 +404,7 @@ func TestTimeout(t *testing.T) { } server := newTestServer(t, tempdir, time.Millisecond*10, "/api/image-builder-worker/v1") - _, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + _, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}, []string{""}) require.Equal(t, jobqueue.ErrDequeueTimeout, err) test.TestRoute(t, server.Handler(), false, "POST", "/api/image-builder-worker/v1/jobs", `{"arch":"arch","types":["types"]}`, http.StatusNoContent, @@ -424,10 +424,10 @@ func TestRequestJobById(t *testing.T) { server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() - depsolveJobId, err := server.EnqueueDepsolve(&worker.DepsolveJob{}) + depsolveJobId, err := server.EnqueueDepsolve(&worker.DepsolveJob{}, "") require.NoError(t, err) - jobId, err := server.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobId) + jobId, err := server.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobId, "") require.NoError(t, err) test.TestRoute(t, server.Handler(), false, "POST", "/api/worker/v1/jobs", `{"arch":"arch","types":["manifest-id-only"]}`, http.StatusBadRequest, @@ -436,7 +436,7 @@ func TestRequestJobById(t *testing.T) { _, _, _, _, _, err = server.RequestJobById(context.Background(), arch.Name(), jobId) require.Error(t, jobqueue.ErrNotPending, err) - _, token, _, _, _, err := server.RequestJob(context.Background(), arch.Name(), []string{"depsolve"}) + _, token, _, _, _, err := server.RequestJob(context.Background(), arch.Name(), []string{"depsolve"}, []string{""}) require.NoError(t, err) depsolveJR, err := json.Marshal(worker.DepsolveJobResult{}) @@ -472,7 +472,7 @@ func TestMixedOSBuildJob(t *testing.T) { Manifest: emptyManifestV2, ImageName: "no-pipeline-names", } - oldJobID, err := server.EnqueueOSBuild("x", &oldJob) + oldJobID, err := server.EnqueueOSBuild("x", &oldJob, "") require.NoError(err) newJob := worker.OSBuildJob{ @@ -483,7 +483,7 @@ func TestMixedOSBuildJob(t *testing.T) { Payload: []string{"other", "pipelines"}, }, } - newJobID, err := server.EnqueueOSBuild("x", &newJob) + newJobID, err := server.EnqueueOSBuild("x", &newJob, "") require.NoError(err) var oldJobRead worker.OSBuildJob @@ -511,7 +511,7 @@ func TestMixedOSBuildJob(t *testing.T) { // don't block forever if the jobs weren't added or can't be retrieved ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() - id, token, _, _, _, err := server.RequestJob(ctx, "x", []string{"osbuild"}) + id, token, _, _, _, err := server.RequestJob(ctx, "x", []string{"osbuild"}, []string{""}) require.NoError(err) return id, token } @@ -612,9 +612,9 @@ func TestMixedOSBuildKojiJob(t *testing.T) { enqueueKojiJob := func(job *worker.OSBuildKojiJob) uuid.UUID { initJob := new(worker.KojiInitJob) - initJobID, err := server.EnqueueKojiInit(initJob) + initJobID, err := server.EnqueueKojiInit(initJob, "") require.NoError(err) - jobID, err := server.EnqueueOSBuildKoji("k", job, initJobID) + jobID, err := server.EnqueueOSBuildKoji("k", job, initJobID, "") require.NoError(err) return jobID } @@ -659,7 +659,7 @@ func TestMixedOSBuildKojiJob(t *testing.T) { for idx := uint(0); idx < 2; idx++ { ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() - _, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"koji-init"}) + _, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"koji-init"}, []string{""}) require.NoError(err) require.NoError(server.FinishJob(token, nil)) } @@ -668,7 +668,7 @@ func TestMixedOSBuildKojiJob(t *testing.T) { // don't block forever if the jobs weren't added or can't be retrieved ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() - id, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"osbuild-koji"}) + id, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"osbuild-koji"}, []string{""}) require.NoError(err) return id, token } @@ -769,10 +769,10 @@ func TestDepsolveLegacyErrorConversion(t *testing.T) { } server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") - depsolveJobId, err := server.EnqueueDepsolve(&worker.DepsolveJob{}) + depsolveJobId, err := server.EnqueueDepsolve(&worker.DepsolveJob{}, "") require.NoError(t, err) - _, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{"depsolve"}) + _, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{"depsolve"}, []string{""}) require.NoError(t, err) reason := "Depsolve failed" @@ -812,7 +812,7 @@ func TestMixedOSBuildJobErrors(t *testing.T) { Manifest: emptyManifestV2, ImageName: "no-pipeline-names", } - oldJobID, err := server.EnqueueOSBuild("x", &oldJob) + oldJobID, err := server.EnqueueOSBuild("x", &oldJob, "") require.NoError(err) newJob := worker.OSBuildJob{ @@ -823,7 +823,7 @@ func TestMixedOSBuildJobErrors(t *testing.T) { Payload: []string{"other", "pipelines"}, }, } - newJobID, err := server.EnqueueOSBuild("x", &newJob) + newJobID, err := server.EnqueueOSBuild("x", &newJob, "") require.NoError(err) oldJobRead := new(worker.OSBuildJob) @@ -844,7 +844,7 @@ func TestMixedOSBuildJobErrors(t *testing.T) { // don't block forever if the jobs weren't added or can't be retrieved ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() - id, token, _, _, _, err := server.RequestJob(ctx, "x", []string{"osbuild"}) + id, token, _, _, _, err := server.RequestJob(ctx, "x", []string{"osbuild"}, []string{""}) require.NoError(err) return id, token } @@ -917,9 +917,9 @@ func TestMixedOSBuildKojiJobErrors(t *testing.T) { enqueueKojiJob := func(job *worker.OSBuildKojiJob) uuid.UUID { initJob := new(worker.KojiInitJob) - initJobID, err := server.EnqueueKojiInit(initJob) + initJobID, err := server.EnqueueKojiInit(initJob, "") require.NoError(err) - jobID, err := server.EnqueueOSBuildKoji("k", job, initJobID) + jobID, err := server.EnqueueOSBuildKoji("k", job, initJobID, "") require.NoError(err) return jobID } @@ -957,7 +957,7 @@ func TestMixedOSBuildKojiJobErrors(t *testing.T) { for idx := uint(0); idx < 2; idx++ { ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() - _, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"koji-init"}) + _, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"koji-init"}, []string{""}) require.NoError(err) require.NoError(server.FinishJob(token, nil)) } @@ -966,7 +966,7 @@ func TestMixedOSBuildKojiJobErrors(t *testing.T) { // don't block forever if the jobs weren't added or can't be retrieved ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() - id, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"osbuild-koji"}) + id, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"osbuild-koji"}, []string{""}) require.NoError(err) return id, token }