From 98fd290a08ed73c6c5a306bd1eba432e6de23f05 Mon Sep 17 00:00:00 2001 From: Tom Gundersen Date: Sun, 8 Nov 2020 22:12:30 +0000 Subject: [PATCH] worker: make Enqueue() specific for each job type Most of the worker API is now untyped, but keep Enqueu() typed to ensure the job objects match the names in the queue. This means we must add a version of Enqueue() for each job type we support. --- cmd/osbuild-composer/composer.go | 13 ++++++------- internal/cloudapi/server.go | 2 +- internal/kojiapi/server.go | 2 +- internal/weldr/api.go | 2 +- internal/weldr/api_test.go | 4 +++- internal/worker/server.go | 14 +++++++++++++- internal/worker/server_test.go | 4 ++-- 7 files changed, 27 insertions(+), 14 deletions(-) diff --git a/cmd/osbuild-composer/composer.go b/cmd/osbuild-composer/composer.go index cf3c089f6..ec479e49e 100644 --- a/cmd/osbuild-composer/composer.go +++ b/cmd/osbuild-composer/composer.go @@ -70,16 +70,15 @@ func NewComposer(config *ComposerConfigFile, stateDir, cacheDir string, logger * c.rpm = rpmmd.NewRPMMD(path.Join(c.cacheDir, "rpmmd"), "/usr/libexec/osbuild-composer/dnf-json") - // construct job types of the form osbuild:{arch} for all arches - jobTypes := []string{"osbuild"} - jobTypesMap := map[string]bool{} + // construct job types of the form osbuild:{arch} and osbuild-koji:{arch} for all arches + jobTypes := []string{"osbuild", "koji-init", "koji-finalize"} + archSet := map[string]bool{} for _, name := range c.distros.List() { d := c.distros.GetDistro(name) for _, arch := range d.ListArches() { - jt := "osbuild:" + arch - if !jobTypesMap[jt] { - jobTypesMap[jt] = true - jobTypes = append(jobTypes, jt) + if !archSet[arch] { + archSet[arch] = true + jobTypes = append(jobTypes, "osbuild:"+arch, "osbuild-koji:"+arch) } } } diff --git a/internal/cloudapi/server.go b/internal/cloudapi/server.go index 3e60438f9..8bc7aaf60 100644 --- a/internal/cloudapi/server.go +++ b/internal/cloudapi/server.go @@ -188,7 +188,7 @@ func (server *Server) Compose(w http.ResponseWriter, r *http.Request) { return } - id, err := server.workers.Enqueue(ir.arch, &worker.OSBuildJob{ + id, err := server.workers.EnqueueOSBuild(ir.arch, &worker.OSBuildJob{ Manifest: ir.manifest, Targets: targets, }) diff --git a/internal/kojiapi/server.go b/internal/kojiapi/server.go index a0b61a68d..2da09cccc 100644 --- a/internal/kojiapi/server.go +++ b/internal/kojiapi/server.go @@ -196,7 +196,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { }, } - id, err := h.server.workers.Enqueue(ir.arch, &job) + id, err := h.server.workers.EnqueueOSBuild(ir.arch, &job) if err != nil { // This is a programming errror. panic(err) diff --git a/internal/weldr/api.go b/internal/weldr/api.go index b7e469ea4..f13614c6a 100644 --- a/internal/weldr/api.go +++ b/internal/weldr/api.go @@ -1866,7 +1866,7 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request } else { var jobId uuid.UUID - jobId, err = api.workers.Enqueue(api.arch.Name(), &worker.OSBuildJob{ + jobId, err = api.workers.EnqueueOSBuild(api.arch.Name(), &worker.OSBuildJob{ Manifest: manifest, Targets: targets, ImageName: imageType.Filename(), diff --git a/internal/weldr/api_test.go b/internal/weldr/api_test.go index 2057a1b50..a0d7120d0 100644 --- a/internal/weldr/api_test.go +++ b/internal/weldr/api_test.go @@ -261,7 +261,9 @@ func TestBlueprintsInfoToml(t *testing.T) { Description: "Test", Version: "0.0.0", Packages: []blueprint.Package{ - {"httpd", "2.4.*"}, + { + Name: "httpd", + Version: "2.4.*"}, }, Groups: []blueprint.Group{}, Modules: []blueprint.Package{}, diff --git a/internal/worker/server.go b/internal/worker/server.go index 64465fb0a..5f1a106ea 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -83,10 +83,22 @@ func (s *Server) ServeHTTP(writer http.ResponseWriter, request *http.Request) { s.server.Handler.ServeHTTP(writer, request) } -func (s *Server) Enqueue(arch string, job *OSBuildJob) (uuid.UUID, error) { +func (s *Server) EnqueueOSBuild(arch string, job *OSBuildJob) (uuid.UUID, error) { return s.jobs.Enqueue("osbuild:"+arch, job, nil) } +func (s *Server) EnqueueOSBuildKoji(arch string, job *OSBuildKojiJob, initID uuid.UUID) (uuid.UUID, error) { + return s.jobs.Enqueue("osbuild-koji:"+arch, job, []uuid.UUID{initID}) +} + +func (s *Server) EnqueueKojiInit(job *KojiInitJob) (uuid.UUID, error) { + return s.jobs.Enqueue("koji-init", job, nil) +} + +func (s *Server) EnqueueKojiFinalize(job *KojiFinalizeJob, initID uuid.UUID, buildIDs []uuid.UUID) (uuid.UUID, error) { + return s.jobs.Enqueue("koji-finalize", job, append([]uuid.UUID{initID}, buildIDs...)) +} + func (s *Server) JobStatus(id uuid.UUID, result interface{}) (*JobStatus, error) { rawResult, queued, started, finished, canceled, err := s.jobs.JobStatus(id) if err != nil { diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go index 96ea76c5b..0ec8bcfc9 100644 --- a/internal/worker/server_test.go +++ b/internal/worker/server_test.go @@ -64,7 +64,7 @@ func TestCreate(t *testing.T) { } server := worker.NewServer(nil, testjobqueue.New(), "") - _, err = server.Enqueue(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) + _, err = server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) require.NoError(t, err) test.TestRoute(t, server, false, "POST", "/api/worker/v1/jobs", `{"types":["osbuild"],"arch":"x86_64"}`, http.StatusCreated, @@ -87,7 +87,7 @@ func TestCancel(t *testing.T) { } server := worker.NewServer(nil, testjobqueue.New(), "") - jobId, err := server.Enqueue(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) + jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) require.NoError(t, err) token, j, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"})