jobqueue: move Job.Run() to the worker

This makes the jobqueue package independent of forking osbuild, the
choices for which (exact invocation, location of the cache directory)
should be made in the worker.
This commit is contained in:
Lars Karlitski 2020-04-04 11:34:25 +02:00 committed by Tom Gundersen
parent d3b9a3515d
commit 1ece08414c
3 changed files with 123 additions and 133 deletions

View file

@ -3,15 +3,21 @@ package main
import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"os/exec"
"path"
"github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/jobqueue"
"github.com/osbuild/osbuild-composer/internal/target"
"github.com/osbuild/osbuild-composer/internal/upload/awsupload"
)
type connectionConfig struct {
@ -43,6 +49,122 @@ func createTLSConfig(config *connectionConfig) (*tls.Config, error) {
}, nil
}
type TargetsError struct {
Errors []error
}
func (e *TargetsError) Error() string {
errString := fmt.Sprintf("%d target(s) errored:\n", len(e.Errors))
for _, err := range e.Errors {
errString += err.Error() + "\n"
}
return errString
}
func RunJob(job *jobqueue.Job, uploadFunc func(*jobqueue.Job, io.Reader) error) (*common.ComposeResult, error) {
tmpStore, err := ioutil.TempDir("/var/tmp", "osbuild-store")
if err != nil {
return nil, fmt.Errorf("error setting up osbuild store: %v", err)
}
// FIXME: how to handle errors in defer?
defer os.RemoveAll(tmpStore)
cmd := exec.Command(
"osbuild",
"--store", tmpStore,
"--json", "-",
)
cmd.Stderr = os.Stderr
stdin, err := cmd.StdinPipe()
if err != nil {
return nil, fmt.Errorf("error setting up stdin for osbuild: %v", err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
return nil, fmt.Errorf("error setting up stdout for osbuild: %v", err)
}
err = cmd.Start()
if err != nil {
return nil, fmt.Errorf("error starting osbuild: %v", err)
}
err = json.NewEncoder(stdin).Encode(job.Manifest)
if err != nil {
return nil, fmt.Errorf("error encoding osbuild pipeline: %v", err)
}
// FIXME: handle or comment this possible error
_ = stdin.Close()
var result common.ComposeResult
err = json.NewDecoder(stdout).Decode(&result)
if err != nil {
return nil, fmt.Errorf("error decoding osbuild output: %#v", err)
}
err = cmd.Wait()
if err != nil {
return &result, err
}
var r []error
for _, t := range job.Targets {
switch options := t.Options.(type) {
case *target.LocalTargetOptions:
f, err := os.Open(path.Join(tmpStore, "refs", result.OutputID, options.Filename))
if err != nil {
r = append(r, err)
continue
}
err = uploadFunc(job, f)
if err != nil {
r = append(r, err)
continue
}
case *target.AWSTargetOptions:
a, err := awsupload.New(options.Region, options.AccessKeyID, options.SecretAccessKey)
if err != nil {
r = append(r, err)
continue
}
if options.Key == "" {
options.Key = job.ComposeID.String()
}
_, err = a.Upload(path.Join(tmpStore, "refs", result.OutputID, options.Filename), options.Bucket, options.Key)
if err != nil {
r = append(r, err)
continue
}
/* TODO: communicate back the AMI */
_, err = a.Register(t.ImageName, options.Bucket, options.Key)
if err != nil {
r = append(r, err)
continue
}
case *target.AzureTargetOptions:
default:
r = append(r, fmt.Errorf("invalid target type"))
}
}
if len(r) > 0 {
return &result, &TargetsError{r}
}
return &result, nil
}
func main() {
var unix bool
flag.BoolVar(&unix, "unix", false, "Interpret 'address' as a path to a unix domain socket instead of a network address")
@ -86,7 +208,7 @@ func main() {
fmt.Printf("Running job %s\n", job.ComposeID.String())
var status common.ImageBuildState
result, err := job.Run(client)
result, err := RunJob(job, client.UploadImage)
if err != nil {
log.Printf(" Job failed: %v", err)
status = common.IBFailed