From 0d4479bbcd87f86d05b4f22d2476ae1c75595488 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Budai?= Date: Thu, 30 Jan 2020 12:49:16 +0100 Subject: [PATCH] worker: save result.json in the composer instead of the worker In the future remote workers will be introduced. Obviously, the remote worker cannot support the local target. Unfortunately, the current implementation of storing the osbuild result is dependant on it. This commit moves the responsibility of storing osbuild result to the composer process instead of the worker process. The result is transferred from a worker to a composer using extended HTTP API. --- cmd/osbuild-worker/main.go | 15 +++---- internal/jobqueue/api.go | 10 ++--- internal/jobqueue/job.go | 81 +++++++++++--------------------------- internal/store/store.go | 24 ++++++++++- 4 files changed, 57 insertions(+), 73 deletions(-) diff --git a/cmd/osbuild-worker/main.go b/cmd/osbuild-worker/main.go index 25ed464c5..5e5f7b72c 100644 --- a/cmd/osbuild-worker/main.go +++ b/cmd/osbuild-worker/main.go @@ -10,6 +10,7 @@ import ( "net" "net/http" + "github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/jobqueue" "github.com/osbuild/osbuild-composer/internal/store" ) @@ -54,9 +55,9 @@ func (c *ComposerClient) AddJob() (*jobqueue.Job, error) { return job, nil } -func (c *ComposerClient) UpdateJob(job *jobqueue.Job, status string, image *store.Image) error { +func (c *ComposerClient) UpdateJob(job *jobqueue.Job, status string, image *store.Image, result *common.ComposeResult) error { var b bytes.Buffer - json.NewEncoder(&b).Encode(&jobqueue.JobStatus{status, image}) + json.NewEncoder(&b).Encode(&jobqueue.JobStatus{status, image, result}) req, err := http.NewRequest("PATCH", "http://localhost/job-queue/v1/jobs/"+job.ID.String(), &b) if err != nil { return err @@ -83,26 +84,26 @@ func handleJob(client *ComposerClient) error { return err } - err = client.UpdateJob(job, "RUNNING", nil) + err = client.UpdateJob(job, "RUNNING", nil, nil) if err != nil { return err } fmt.Printf("Running job %s\n", job.ID.String()) - image, err, errs := job.Run() + image, result, err, errs := job.Run() if err != nil { log.Printf(" Job failed: %v", err) - return client.UpdateJob(job, "FAILED", nil) + return client.UpdateJob(job, "FAILED", nil, result) } for _, err := range errs { if err != nil { log.Printf(" Job target error: %v", err) - return client.UpdateJob(job, "FAILED", nil) + return client.UpdateJob(job, "FAILED", nil, result) } } - return client.UpdateJob(job, "FINISHED", image) + return client.UpdateJob(job, "FINISHED", image, result) } func main() { diff --git a/internal/jobqueue/api.go b/internal/jobqueue/api.go index 3aec611c8..0bffa1681 100644 --- a/internal/jobqueue/api.go +++ b/internal/jobqueue/api.go @@ -93,10 +93,10 @@ func (api *API) addJobHandler(writer http.ResponseWriter, request *http.Request, writer.WriteHeader(http.StatusCreated) json.NewEncoder(writer).Encode(Job{ - ID: nextJob.ComposeID, - Distro: nextJob.Distro, - Pipeline: nextJob.Pipeline, - Targets: nextJob.Targets, + ID: nextJob.ComposeID, + Distro: nextJob.Distro, + Pipeline: nextJob.Pipeline, + Targets: nextJob.Targets, OutputType: nextJob.OutputType, }) } @@ -121,7 +121,7 @@ func (api *API) updateJobHandler(writer http.ResponseWriter, request *http.Reque return } - err = api.store.UpdateCompose(id, body.Status, body.Image) + err = api.store.UpdateCompose(id, body.Status, body.Image, body.Result) if err != nil { switch err.(type) { case *store.NotFoundError: diff --git a/internal/jobqueue/job.go b/internal/jobqueue/job.go index 8eb64906d..7480e0601 100644 --- a/internal/jobqueue/job.go +++ b/internal/jobqueue/job.go @@ -9,6 +9,7 @@ import ( "github.com/google/uuid" + "github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/distro" "github.com/osbuild/osbuild-composer/internal/pipeline" "github.com/osbuild/osbuild-composer/internal/store" @@ -25,15 +26,16 @@ type Job struct { } type JobStatus struct { - Status string `json:"status"` - Image *store.Image `json:"image"` + Status string `json:"status"` + Image *store.Image `json:"image"` + Result *common.ComposeResult `json:"result"` } -func (job *Job) Run() (*store.Image, error, []error) { +func (job *Job) Run() (*store.Image, *common.ComposeResult, error, []error) { distros := distro.NewRegistry([]string{"/etc/osbuild-composer", "/usr/share/osbuild-composer"}) d := distros.GetDistro(job.Distro) if d == nil { - return nil, fmt.Errorf("unknown distro: %s", job.Distro), nil + return nil, nil, fmt.Errorf("unknown distro: %s", job.Distro), nil } build := pipeline.Build{ @@ -42,18 +44,18 @@ func (job *Job) Run() (*store.Image, error, []error) { buildFile, err := ioutil.TempFile("", "osbuild-worker-build-env-*") if err != nil { - return nil, err, nil + return nil, nil, err, nil } defer os.Remove(buildFile.Name()) err = json.NewEncoder(buildFile).Encode(build) if err != nil { - return nil, fmt.Errorf("error encoding build environment: %v", err), nil + return nil, nil, fmt.Errorf("error encoding build environment: %v", err), nil } tmpStore, err := ioutil.TempDir("/var/tmp", "osbuild-store") if err != nil { - return nil, fmt.Errorf("error setting up osbuild store: %v", err), nil + return nil, nil, fmt.Errorf("error setting up osbuild store: %v", err), nil } defer os.RemoveAll(tmpStore) @@ -67,43 +69,39 @@ func (job *Job) Run() (*store.Image, error, []error) { stdin, err := cmd.StdinPipe() if err != nil { - return nil, fmt.Errorf("error setting up stdin for osbuild: %v", err), nil + return nil, nil, fmt.Errorf("error setting up stdin for osbuild: %v", err), nil } stdout, err := cmd.StdoutPipe() if err != nil { - return nil, fmt.Errorf("error setting up stdout for osbuild: %v", err), nil + return nil, nil, fmt.Errorf("error setting up stdout for osbuild: %v", err), nil } err = cmd.Start() if err != nil { - return nil, fmt.Errorf("error starting osbuild: %v", err), nil + return nil, nil, fmt.Errorf("error starting osbuild: %v", err), nil } err = json.NewEncoder(stdin).Encode(job.Pipeline) if err != nil { - return nil, fmt.Errorf("error encoding osbuild pipeline: %v", err), nil + return nil, nil, fmt.Errorf("error encoding osbuild pipeline: %v", err), nil } stdin.Close() - var result struct { - TreeID string `json:"tree_id"` - OutputID string `json:"output_id"` - Build *json.RawMessage `json:"build,omitempty"` - Stages *json.RawMessage `json:"stages,omitempty"` - Assembler *json.RawMessage `json:"assembler,omitempty"` - Success *json.RawMessage `json:"success,omitempty"` - } + var result common.ComposeResult err = json.NewDecoder(stdout).Decode(&result) if err != nil { - return nil, fmt.Errorf("error decoding osbuild output: %#v", err), nil + return nil, nil, fmt.Errorf("error decoding osbuild output: %#v", err), nil } - cmdError := cmd.Wait() + err = cmd.Wait() + if err != nil { + return nil, &result, err, nil + } filename, mimeType, err := d.FilenameFromType(job.OutputType) if err != nil { - return nil, fmt.Errorf("cannot fetch information about output type %s: %v", job.OutputType, err), nil + return nil, &result, fmt.Errorf("cannot fetch information about output type %s: %v", job.OutputType, err), nil } var image store.Image @@ -113,34 +111,6 @@ func (job *Job) Run() (*store.Image, error, []error) { for _, t := range job.Targets { switch options := t.Options.(type) { case *target.LocalTargetOptions: - err := os.MkdirAll(options.Location, 0755) - if err != nil { - r = append(r, err) - continue - } - - // Make sure the directory ownership is correct, even if there are errors later - err = runCommand("chown", "_osbuild-composer:_osbuild-composer", options.Location) - if err != nil { - r = append(r, err) - continue - } - - jobFile, err := os.Create(options.Location + "/result.json") - if err != nil { - r = append(r, err) - continue - } - - err = json.NewEncoder(jobFile).Encode(result) - if err != nil { - r = append(r, err) - continue - } - - if cmdError != nil { - continue - } err = runCommand("cp", "-a", "-L", tmpStore+"/refs/"+result.OutputID+"/.", options.Location) if err != nil { @@ -163,7 +133,7 @@ func (job *Job) Run() (*store.Image, error, []error) { fileStat, err := file.Stat() if err != nil { - return nil, err, nil + return nil, &result, err, nil } image = store.Image{ @@ -173,9 +143,6 @@ func (job *Job) Run() (*store.Image, error, []error) { } case *target.AWSTargetOptions: - if cmdError != nil { - continue - } a, err := awsupload.New(options.Region, options.AccessKeyID, options.SecretAccessKey) if err != nil { @@ -206,11 +173,7 @@ func (job *Job) Run() (*store.Image, error, []error) { r = append(r, nil) } - if cmdError != nil { - return nil, cmdError, nil - } - - return &image, nil, r + return &image, &result, nil, r } func runCommand(command string, params ...string) error { diff --git a/internal/store/store.go b/internal/store/store.go index 25392fdff..296ca41ac 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -22,6 +22,7 @@ import ( "time" "github.com/osbuild/osbuild-composer/internal/blueprint" + "github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/distro" "github.com/osbuild/osbuild-composer/internal/pipeline" "github.com/osbuild/osbuild-composer/internal/rpmmd" @@ -459,9 +460,16 @@ func (s *Store) PushCompose(composeID uuid.UUID, bp *blueprint.Blueprint, checks targets := []*target.Target{} if s.stateDir != nil { + outputDir := *s.stateDir + "/outputs/" + composeID.String() + + err := os.MkdirAll(outputDir, 0755) + if err != nil { + return fmt.Errorf("cannot create output directory for job %v: %#v", composeID, err) + } + targets = append(targets, target.NewLocalTarget( &target.LocalTargetOptions{ - Location: *s.stateDir + "/outputs/" + composeID.String(), + Location: outputDir, }, )) } @@ -542,7 +550,7 @@ func (s *Store) PopCompose() Job { return job } -func (s *Store) UpdateCompose(composeID uuid.UUID, status string, image *Image) error { +func (s *Store) UpdateCompose(composeID uuid.UUID, status string, image *Image, result *common.ComposeResult) error { return s.change(func() error { compose, exists := s.Composes[composeID] if !exists { @@ -551,6 +559,18 @@ func (s *Store) UpdateCompose(composeID uuid.UUID, status string, image *Image) if compose.QueueStatus == "WAITING" { return &NotPendingError{"compose has not been popped"} } + + // write result into file + if s.stateDir != nil && result != nil { + f, err := os.Create(*s.stateDir + "/outputs/" + composeID.String() + "/result.json") + + if err != nil { + return fmt.Errorf("cannot open result.json for job %v: %#v", composeID, err) + } + + json.NewEncoder(f).Encode(result) + } + switch status { case "RUNNING": switch compose.QueueStatus {