worker: require workers to pass their architecture

Jobs are scheduled with type "osbuild:{arch}", to ensure that workers
only get jobs with the right architecture assigned.
This commit is contained in:
Lars Karlitski 2020-09-20 20:16:18 +02:00 committed by Tom Gundersen
parent 44c2144994
commit 9008a1defc
10 changed files with 59 additions and 19 deletions

View file

@ -106,18 +106,32 @@ func main() {
log.Fatalf("cannot create queue directory: %v", err)
}
jobs, err := fsjobqueue.New(queueDir, []string{"osbuild"})
distros, err := distro.NewRegistry(fedora31.New(), fedora32.New(), rhel8.New())
if err != nil {
log.Fatalf("Error loading distros: %v", err)
}
// construct job types of the form osbuild:{arch} for all arches
jobTypes := []string{"osbuild"}
jobTypesMap := map[string]bool{}
for _, name := range distros.List() {
d := distros.GetDistro(name)
for _, arch := range d.ListArches() {
jt := "osbuild:" + arch
if !jobTypesMap[jt] {
jobTypesMap[jt] = true
jobTypes = append(jobTypes, jt)
}
}
}
jobs, err := fsjobqueue.New(queueDir, jobTypes)
if err != nil {
log.Fatalf("cannot create jobqueue: %v", err)
}
rpm := rpmmd.NewRPMMD(path.Join(cacheDirectory, "rpmmd"), "/usr/libexec/osbuild-composer/dnf-json")
distros, err := distro.NewRegistry(fedora31.New(), fedora32.New(), rhel8.New())
if err != nil {
log.Fatalf("Error loading distros: %v", err)
}
workerServer := worker.NewServer(logger, jobs, "")
cloudServer := cloudapi.NewServer(workerServer, rpm, distros)

View file

@ -180,7 +180,21 @@ func main() {
log.Fatalf("cannot create queue directory: %v", err)
}
jobs, err := fsjobqueue.New(queueDir, []string{"osbuild"})
// construct job types of the form osbuild:{arch} for all arches
jobTypes := []string{"osbuild"}
jobTypesMap := map[string]bool{}
for _, name := range distros.List() {
d := distros.GetDistro(name)
for _, arch := range d.ListArches() {
jt := "osbuild:" + arch
if !jobTypesMap[jt] {
jobTypesMap[jt] = true
jobTypes = append(jobTypes, jt)
}
}
}
jobs, err := fsjobqueue.New(queueDir, jobTypes)
if err != nil {
log.Fatalf("cannot create jobqueue: %v", err)
}

View file

@ -69,6 +69,7 @@ func (server *Server) Compose(w http.ResponseWriter, r *http.Request) {
type imageRequest struct {
manifest distro.Manifest
arch string
}
imageRequests := make([]imageRequest, len(request.ImageRequests))
var targets []*target.Target
@ -128,6 +129,7 @@ func (server *Server) Compose(w http.ResponseWriter, r *http.Request) {
}
imageRequests[i].manifest = manifest
imageRequests[i].arch = arch.Name()
if len(ir.UploadRequests) != 1 {
http.Error(w, "Only compose requests with a single upload target are currently supported", http.StatusBadRequest)
@ -179,7 +181,7 @@ func (server *Server) Compose(w http.ResponseWriter, r *http.Request) {
return
}
id, err := server.workers.Enqueue(ir.manifest, targets)
id, err := server.workers.Enqueue(ir.arch, ir.manifest, targets)
if err != nil {
http.Error(w, "Failed to enqueue manifest", http.StatusInternalServerError)
return

View file

@ -95,6 +95,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
type imageRequest struct {
manifest distro.Manifest
arch string
filename string
}
imageRequests := make([]imageRequest, len(request.ImageRequests))
@ -135,6 +136,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
}
imageRequests[i].manifest = manifest
imageRequests[i].arch = arch.Name()
imageRequests[i].filename = imageType.Filename()
}
@ -170,7 +172,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Could not initialize build with koji: %v", err))
}
id, err := h.server.workers.Enqueue(ir.manifest, []*target.Target{
id, err := h.server.workers.Enqueue(ir.arch, ir.manifest, []*target.Target{
target.NewKojiTarget(&target.KojiTargetOptions{
BuildID: uint64(buildInfo.BuildID),
TaskID: uint64(request.Koji.TaskId),

View file

@ -1825,7 +1825,7 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request
} else {
var jobId uuid.UUID
jobId, err = api.workers.Enqueue(manifest, targets)
jobId, err = api.workers.Enqueue(api.arch.Name(), manifest, targets)
if err == nil {
err = api.store.PushCompose(composeID, manifest, imageType, bp, size, targets, jobId)
}

View file

@ -17,6 +17,7 @@ type Error struct {
// RequestJobJSONBody defines parameters for RequestJob.
type RequestJobJSONBody struct {
Arch string `json:"arch"`
Types []string `json:"types"`
}

View file

@ -90,8 +90,11 @@ paths:
type: string
enum:
- osbuild
arch:
type: string
required:
- types
- arch
description: ''
description: Requests a job. This operation blocks until a job is available.
parameters: []

View file

@ -84,6 +84,7 @@ func (c *Client) RequestJob() (Job, error) {
var buf bytes.Buffer
err = json.NewEncoder(&buf).Encode(api.RequestJobJSONRequestBody{
Types: []string{"osbuild"},
Arch: common.CurrentArch(),
})
if err != nil {
panic(err)

View file

@ -88,13 +88,13 @@ func (s *Server) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
s.server.Handler.ServeHTTP(writer, request)
}
func (s *Server) Enqueue(manifest distro.Manifest, targets []*target.Target) (uuid.UUID, error) {
func (s *Server) Enqueue(arch string, manifest distro.Manifest, targets []*target.Target) (uuid.UUID, error) {
job := OSBuildJob{
Manifest: manifest,
Targets: targets,
}
return s.jobs.Enqueue("osbuild", job, nil)
return s.jobs.Enqueue("osbuild:"+arch, job, nil)
}
func (s *Server) JobStatus(id uuid.UUID) (*JobStatus, error) {
@ -172,11 +172,14 @@ func (s *Server) DeleteArtifacts(id uuid.UUID) error {
return os.RemoveAll(path.Join(s.artifactsDir, id.String()))
}
func (s *Server) RequestOSBuildJob(ctx context.Context) (uuid.UUID, uuid.UUID, *OSBuildJob, error) {
func (s *Server) RequestOSBuildJob(ctx context.Context, arch string) (uuid.UUID, uuid.UUID, *OSBuildJob, error) {
token := uuid.New()
// wait on "osbuild" jobs for backwards compatiblity
jobTypes := []string{"osbuild", "osbuild:" + arch}
var args OSBuildJob
jobId, err := s.jobs.Dequeue(ctx, []string{"osbuild"}, &args)
jobId, err := s.jobs.Dequeue(ctx, jobTypes, &args)
if err != nil {
return uuid.Nil, uuid.Nil, nil, err
}
@ -262,7 +265,7 @@ func (h *apiHandlers) RequestJob(ctx echo.Context) error {
return echo.NewHTTPError(http.StatusBadRequest, "invalid job types")
}
token, jobId, jobArgs, err := h.server.RequestOSBuildJob(ctx.Request().Context())
token, jobId, jobArgs, err := h.server.RequestOSBuildJob(ctx.Request().Context(), body.Arch)
if err != nil {
return err
}

View file

@ -64,10 +64,10 @@ func TestCreate(t *testing.T) {
}
server := worker.NewServer(nil, testjobqueue.New(), "")
_, err = server.Enqueue(manifest, nil)
_, err = server.Enqueue(arch.Name(), manifest, nil)
require.NoError(t, err)
test.TestRoute(t, server, false, "POST", "/jobs", `{"types":["osbuild"]}`, http.StatusCreated,
test.TestRoute(t, server, false, "POST", "/jobs", `{"types":["osbuild"],"arch":"x86_64"}`, http.StatusCreated,
`{"type":"osbuild","args":{"manifest":{"pipeline":{},"sources":{}}}}`, "id", "location", "artifact_location")
}
@ -87,10 +87,10 @@ func TestCancel(t *testing.T) {
}
server := worker.NewServer(nil, testjobqueue.New(), "")
jobId, err := server.Enqueue(manifest, nil)
jobId, err := server.Enqueue(arch.Name(), manifest, nil)
require.NoError(t, err)
token, j, _, err := server.RequestOSBuildJob(context.Background())
token, j, _, err := server.RequestOSBuildJob(context.Background(), arch.Name())
require.NoError(t, err)
require.Equal(t, jobId, j)