From 9008a1defc4c077468f00964d63bf69d10ea82d0 Mon Sep 17 00:00:00 2001 From: Lars Karlitski Date: Sun, 20 Sep 2020 20:16:18 +0200 Subject: [PATCH] worker: require workers to pass their architecture Jobs are scheduled with type "osbuild:{arch}", to ensure that workers only get jobs with the right architecture assigned. --- cmd/osbuild-composer-cloud/main.go | 26 ++++++++++++++++++++------ cmd/osbuild-composer/main.go | 16 +++++++++++++++- internal/cloudapi/server.go | 4 +++- internal/kojiapi/server.go | 4 +++- internal/weldr/api.go | 2 +- internal/worker/api/api.gen.go | 1 + internal/worker/api/openapi.yml | 3 +++ internal/worker/client.go | 1 + internal/worker/server.go | 13 ++++++++----- internal/worker/server_test.go | 8 ++++---- 10 files changed, 59 insertions(+), 19 deletions(-) diff --git a/cmd/osbuild-composer-cloud/main.go b/cmd/osbuild-composer-cloud/main.go index ad0beb2a7..6becccf82 100644 --- a/cmd/osbuild-composer-cloud/main.go +++ b/cmd/osbuild-composer-cloud/main.go @@ -106,18 +106,32 @@ func main() { log.Fatalf("cannot create queue directory: %v", err) } - jobs, err := fsjobqueue.New(queueDir, []string{"osbuild"}) + distros, err := distro.NewRegistry(fedora31.New(), fedora32.New(), rhel8.New()) + if err != nil { + log.Fatalf("Error loading distros: %v", err) + } + + // construct job types of the form osbuild:{arch} for all arches + jobTypes := []string{"osbuild"} + jobTypesMap := map[string]bool{} + for _, name := range distros.List() { + d := distros.GetDistro(name) + for _, arch := range d.ListArches() { + jt := "osbuild:" + arch + if !jobTypesMap[jt] { + jobTypesMap[jt] = true + jobTypes = append(jobTypes, jt) + } + } + } + + jobs, err := fsjobqueue.New(queueDir, jobTypes) if err != nil { log.Fatalf("cannot create jobqueue: %v", err) } rpm := rpmmd.NewRPMMD(path.Join(cacheDirectory, "rpmmd"), "/usr/libexec/osbuild-composer/dnf-json") - distros, err := distro.NewRegistry(fedora31.New(), fedora32.New(), rhel8.New()) - if err != nil { - log.Fatalf("Error loading distros: %v", err) - } - workerServer := worker.NewServer(logger, jobs, "") cloudServer := cloudapi.NewServer(workerServer, rpm, distros) diff --git a/cmd/osbuild-composer/main.go b/cmd/osbuild-composer/main.go index 20455bb2b..593d6daa5 100644 --- a/cmd/osbuild-composer/main.go +++ b/cmd/osbuild-composer/main.go @@ -180,7 +180,21 @@ func main() { log.Fatalf("cannot create queue directory: %v", err) } - jobs, err := fsjobqueue.New(queueDir, []string{"osbuild"}) + // construct job types of the form osbuild:{arch} for all arches + jobTypes := []string{"osbuild"} + jobTypesMap := map[string]bool{} + for _, name := range distros.List() { + d := distros.GetDistro(name) + for _, arch := range d.ListArches() { + jt := "osbuild:" + arch + if !jobTypesMap[jt] { + jobTypesMap[jt] = true + jobTypes = append(jobTypes, jt) + } + } + } + + jobs, err := fsjobqueue.New(queueDir, jobTypes) if err != nil { log.Fatalf("cannot create jobqueue: %v", err) } diff --git a/internal/cloudapi/server.go b/internal/cloudapi/server.go index 178a98e99..8bfd909e9 100644 --- a/internal/cloudapi/server.go +++ b/internal/cloudapi/server.go @@ -69,6 +69,7 @@ func (server *Server) Compose(w http.ResponseWriter, r *http.Request) { type imageRequest struct { manifest distro.Manifest + arch string } imageRequests := make([]imageRequest, len(request.ImageRequests)) var targets []*target.Target @@ -128,6 +129,7 @@ func (server *Server) Compose(w http.ResponseWriter, r *http.Request) { } imageRequests[i].manifest = manifest + imageRequests[i].arch = arch.Name() if len(ir.UploadRequests) != 1 { http.Error(w, "Only compose requests with a single upload target are currently supported", http.StatusBadRequest) @@ -179,7 +181,7 @@ func (server *Server) Compose(w http.ResponseWriter, r *http.Request) { return } - id, err := server.workers.Enqueue(ir.manifest, targets) + id, err := server.workers.Enqueue(ir.arch, ir.manifest, targets) if err != nil { http.Error(w, "Failed to enqueue manifest", http.StatusInternalServerError) return diff --git a/internal/kojiapi/server.go b/internal/kojiapi/server.go index d77fcf462..1b2337ac2 100644 --- a/internal/kojiapi/server.go +++ b/internal/kojiapi/server.go @@ -95,6 +95,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { type imageRequest struct { manifest distro.Manifest + arch string filename string } imageRequests := make([]imageRequest, len(request.ImageRequests)) @@ -135,6 +136,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { } imageRequests[i].manifest = manifest + imageRequests[i].arch = arch.Name() imageRequests[i].filename = imageType.Filename() } @@ -170,7 +172,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error { return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Could not initialize build with koji: %v", err)) } - id, err := h.server.workers.Enqueue(ir.manifest, []*target.Target{ + id, err := h.server.workers.Enqueue(ir.arch, ir.manifest, []*target.Target{ target.NewKojiTarget(&target.KojiTargetOptions{ BuildID: uint64(buildInfo.BuildID), TaskID: uint64(request.Koji.TaskId), diff --git a/internal/weldr/api.go b/internal/weldr/api.go index a4be5ca3e..e1edcb606 100644 --- a/internal/weldr/api.go +++ b/internal/weldr/api.go @@ -1825,7 +1825,7 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request } else { var jobId uuid.UUID - jobId, err = api.workers.Enqueue(manifest, targets) + jobId, err = api.workers.Enqueue(api.arch.Name(), manifest, targets) if err == nil { err = api.store.PushCompose(composeID, manifest, imageType, bp, size, targets, jobId) } diff --git a/internal/worker/api/api.gen.go b/internal/worker/api/api.gen.go index 1b8b9c2d0..3d925e49e 100644 --- a/internal/worker/api/api.gen.go +++ b/internal/worker/api/api.gen.go @@ -17,6 +17,7 @@ type Error struct { // RequestJobJSONBody defines parameters for RequestJob. type RequestJobJSONBody struct { + Arch string `json:"arch"` Types []string `json:"types"` } diff --git a/internal/worker/api/openapi.yml b/internal/worker/api/openapi.yml index 1ba39e94a..146f83f93 100644 --- a/internal/worker/api/openapi.yml +++ b/internal/worker/api/openapi.yml @@ -90,8 +90,11 @@ paths: type: string enum: - osbuild + arch: + type: string required: - types + - arch description: '' description: Requests a job. This operation blocks until a job is available. parameters: [] diff --git a/internal/worker/client.go b/internal/worker/client.go index 2ab7d2678..619862bd7 100644 --- a/internal/worker/client.go +++ b/internal/worker/client.go @@ -84,6 +84,7 @@ func (c *Client) RequestJob() (Job, error) { var buf bytes.Buffer err = json.NewEncoder(&buf).Encode(api.RequestJobJSONRequestBody{ Types: []string{"osbuild"}, + Arch: common.CurrentArch(), }) if err != nil { panic(err) diff --git a/internal/worker/server.go b/internal/worker/server.go index 71c4672f0..4e4a596d9 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -88,13 +88,13 @@ func (s *Server) ServeHTTP(writer http.ResponseWriter, request *http.Request) { s.server.Handler.ServeHTTP(writer, request) } -func (s *Server) Enqueue(manifest distro.Manifest, targets []*target.Target) (uuid.UUID, error) { +func (s *Server) Enqueue(arch string, manifest distro.Manifest, targets []*target.Target) (uuid.UUID, error) { job := OSBuildJob{ Manifest: manifest, Targets: targets, } - return s.jobs.Enqueue("osbuild", job, nil) + return s.jobs.Enqueue("osbuild:"+arch, job, nil) } func (s *Server) JobStatus(id uuid.UUID) (*JobStatus, error) { @@ -172,11 +172,14 @@ func (s *Server) DeleteArtifacts(id uuid.UUID) error { return os.RemoveAll(path.Join(s.artifactsDir, id.String())) } -func (s *Server) RequestOSBuildJob(ctx context.Context) (uuid.UUID, uuid.UUID, *OSBuildJob, error) { +func (s *Server) RequestOSBuildJob(ctx context.Context, arch string) (uuid.UUID, uuid.UUID, *OSBuildJob, error) { token := uuid.New() + // wait on "osbuild" jobs for backwards compatiblity + jobTypes := []string{"osbuild", "osbuild:" + arch} + var args OSBuildJob - jobId, err := s.jobs.Dequeue(ctx, []string{"osbuild"}, &args) + jobId, err := s.jobs.Dequeue(ctx, jobTypes, &args) if err != nil { return uuid.Nil, uuid.Nil, nil, err } @@ -262,7 +265,7 @@ func (h *apiHandlers) RequestJob(ctx echo.Context) error { return echo.NewHTTPError(http.StatusBadRequest, "invalid job types") } - token, jobId, jobArgs, err := h.server.RequestOSBuildJob(ctx.Request().Context()) + token, jobId, jobArgs, err := h.server.RequestOSBuildJob(ctx.Request().Context(), body.Arch) if err != nil { return err } diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go index 28b9883aa..e91af891c 100644 --- a/internal/worker/server_test.go +++ b/internal/worker/server_test.go @@ -64,10 +64,10 @@ func TestCreate(t *testing.T) { } server := worker.NewServer(nil, testjobqueue.New(), "") - _, err = server.Enqueue(manifest, nil) + _, err = server.Enqueue(arch.Name(), manifest, nil) require.NoError(t, err) - test.TestRoute(t, server, false, "POST", "/jobs", `{"types":["osbuild"]}`, http.StatusCreated, + test.TestRoute(t, server, false, "POST", "/jobs", `{"types":["osbuild"],"arch":"x86_64"}`, http.StatusCreated, `{"type":"osbuild","args":{"manifest":{"pipeline":{},"sources":{}}}}`, "id", "location", "artifact_location") } @@ -87,10 +87,10 @@ func TestCancel(t *testing.T) { } server := worker.NewServer(nil, testjobqueue.New(), "") - jobId, err := server.Enqueue(manifest, nil) + jobId, err := server.Enqueue(arch.Name(), manifest, nil) require.NoError(t, err) - token, j, _, err := server.RequestOSBuildJob(context.Background()) + token, j, _, err := server.RequestOSBuildJob(context.Background(), arch.Name()) require.NoError(t, err) require.Equal(t, jobId, j)