debian-forge-composer/internal/jobqueue/job.go
Ondřej Budai 6902f730cb worker: upload local target image using jobqueue api
Prior this commit local target copied the image from a worker to a composer
using cp(1) command. This prevented the local target to work on remote
workers.

This commit switches the local target implementation to using the jobqueue
API introduced in the previous commit. I had some concerns about speed
of this solution (imho nothing can beat pure cp(1) implementation) but
ad hoc sanity tests showed the copying of the image using the jobqueue API
when running the worker on the same machine as the composer is still
more or less instant.
2020-02-14 11:53:38 +01:00

183 lines
4.2 KiB
Go

package jobqueue
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"os/exec"
"github.com/google/uuid"
"github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/distro"
"github.com/osbuild/osbuild-composer/internal/pipeline"
"github.com/osbuild/osbuild-composer/internal/target"
"github.com/osbuild/osbuild-composer/internal/upload/awsupload"
)
type Job struct {
ID uuid.UUID `json:"id"`
ImageBuildID int `json:"image_build_id"`
Distro string `json:"distro"`
Pipeline *pipeline.Pipeline `json:"pipeline"`
Targets []*target.Target `json:"targets"`
OutputType string `json:"output_type"`
}
type JobStatus struct {
Status common.ImageBuildState `json:"status"`
Result *common.ComposeResult `json:"result"`
}
type TargetsError struct {
Errors []error
}
func (e *TargetsError) Error() string {
errString := fmt.Sprintf("%d target(s) errored:\n", len(e.Errors))
for _, err := range e.Errors {
errString += err.Error() + "\n"
}
return errString
}
func (job *Job) Run(uploader LocalTargetUploader) (*common.ComposeResult, error) {
distros := distro.NewRegistry([]string{"/etc/osbuild-composer", "/usr/share/osbuild-composer"})
d := distros.GetDistro(job.Distro)
if d == nil {
return nil, fmt.Errorf("unknown distro: %s", job.Distro)
}
build := pipeline.Build{
Runner: d.Runner(),
}
buildFile, err := ioutil.TempFile("", "osbuild-worker-build-env-*")
if err != nil {
return nil, err
}
// FIXME: how to handle errors in defer?
defer os.Remove(buildFile.Name())
err = json.NewEncoder(buildFile).Encode(build)
if err != nil {
return nil, fmt.Errorf("error encoding build environment: %v", err)
}
tmpStore, err := ioutil.TempDir("/var/tmp", "osbuild-store")
if err != nil {
return nil, fmt.Errorf("error setting up osbuild store: %v", err)
}
// FIXME: how to handle errors in defer?
defer os.RemoveAll(tmpStore)
cmd := exec.Command(
"osbuild",
"--store", tmpStore,
"--build-env", buildFile.Name(),
"--json", "-",
)
cmd.Stderr = os.Stderr
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, fmt.Errorf("error setting up stdin for osbuild: %v", err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("error setting up stdout for osbuild: %v", err)
}
err = cmd.Start()
if err != nil {
return nil, fmt.Errorf("error starting osbuild: %v", err)
}
err = json.NewEncoder(stdin).Encode(job.Pipeline)
if err != nil {
return nil, fmt.Errorf("error encoding osbuild pipeline: %v", err)
}
// FIXME: handle or comment this possible error
_ = stdin.Close()
var result common.ComposeResult
err = json.NewDecoder(stdout).Decode(&result)
if err != nil {
return nil, fmt.Errorf("error decoding osbuild output: %#v", err)
}
err = cmd.Wait()
if err != nil {
return &result, err
}
var r []error
for _, t := range job.Targets {
switch options := t.Options.(type) {
case *target.LocalTargetOptions:
filename, _, err := d.FilenameFromType(job.OutputType)
if err != nil {
r = append(r, err)
continue
}
f, err := os.Open(tmpStore + "/refs/" + result.OutputID + "/" + filename)
if err != nil {
r = append(r, err)
continue
}
err = uploader.UploadImage(job, f)
if err != nil {
r = append(r, err)
continue
}
case *target.AWSTargetOptions:
a, err := awsupload.New(options.Region, options.AccessKeyID, options.SecretAccessKey)
if err != nil {
r = append(r, err)
continue
}
if options.Key == "" {
options.Key = job.ID.String()
}
_, err = a.Upload(tmpStore+"/refs/"+result.OutputID+"/image.raw.xz", options.Bucket, options.Key)
if err != nil {
r = append(r, err)
continue
}
/* TODO: communicate back the AMI */
_, err = a.Register(t.ImageName, options.Bucket, options.Key)
if err != nil {
r = append(r, err)
continue
}
case *target.AzureTargetOptions:
default:
r = append(r, fmt.Errorf("invalid target type"))
}
}
if len(r) > 0 {
return &result, &TargetsError{r}
}
return &result, nil
}
func runCommand(command string, params ...string) error {
cp := exec.Command(command, params...)
cp.Stderr = os.Stderr
cp.Stdout = os.Stdout
return cp.Run()
}