package main import ( "context" "crypto/tls" "crypto/x509" "errors" "flag" "fmt" "io" "io/ioutil" "log" "os" "os/exec" "path" "time" "github.com/google/uuid" "github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/target" "github.com/osbuild/osbuild-composer/internal/upload/awsupload" "github.com/osbuild/osbuild-composer/internal/upload/azure" "github.com/osbuild/osbuild-composer/internal/worker" ) type connectionConfig struct { CACertFile string ClientKeyFile string ClientCertFile string } func createTLSConfig(config *connectionConfig) (*tls.Config, error) { caCertPEM, err := ioutil.ReadFile(config.CACertFile) if err != nil { return nil, err } roots := x509.NewCertPool() ok := roots.AppendCertsFromPEM(caCertPEM) if !ok { return nil, errors.New("failed to append root certificate") } cert, err := tls.LoadX509KeyPair(config.ClientCertFile, config.ClientKeyFile) if err != nil { return nil, err } return &tls.Config{ RootCAs: roots, Certificates: []tls.Certificate{cert}, }, nil } type TargetsError struct { Errors []error } func (e *TargetsError) Error() string { errString := fmt.Sprintf("%d target(s) errored:\n", len(e.Errors)) for _, err := range e.Errors { errString += err.Error() + "\n" } return errString } func openAsStreamOptimizedVmdk(imagePath string) (*os.File, error) { newPath := imagePath + ".stream" cmd := exec.Command( "/usr/bin/qemu-img", "convert", "-O", "vmdk", "-o", "subformat=streamOptimized", imagePath, newPath) err := cmd.Run() if err != nil { return nil, err } f, err := os.Open(newPath) if err != nil { return nil, err } err = os.Remove(newPath) if err != nil { return nil, err } return f, err } func RunJob(job *worker.Job, store string, uploadFunc func(uuid.UUID, string, io.Reader) error) (*common.ComposeResult, error) { outputDirectory, err := ioutil.TempDir("/var/tmp", "osbuild-worker-*") if err != nil { return nil, fmt.Errorf("error creating temporary output directory: %v", err) } defer func() { err := os.RemoveAll(outputDirectory) if err != nil { log.Printf("Error removing temporary output directory (%s): %v", outputDirectory, err) } }() result, err := RunOSBuild(job.Manifest, store, outputDirectory, os.Stderr) if err != nil { return nil, err } var r []error for _, t := range job.Targets { switch options := t.Options.(type) { case *target.LocalTargetOptions: var f *os.File imagePath := path.Join(outputDirectory, options.Filename) if options.StreamOptimized { f, err = openAsStreamOptimizedVmdk(imagePath) if err != nil { r = append(r, err) continue } } else { f, err = os.Open(imagePath) if err != nil { r = append(r, err) continue } } err = uploadFunc(job.Id, options.Filename, f) if err != nil { r = append(r, err) continue } case *target.AWSTargetOptions: a, err := awsupload.New(options.Region, options.AccessKeyID, options.SecretAccessKey) if err != nil { r = append(r, err) continue } if options.Key == "" { options.Key = job.Id.String() } _, err = a.Upload(path.Join(outputDirectory, options.Filename), options.Bucket, options.Key) if err != nil { r = append(r, err) continue } /* TODO: communicate back the AMI */ _, err = a.Register(t.ImageName, options.Bucket, options.Key) if err != nil { r = append(r, err) continue } case *target.AzureTargetOptions: credentials := azure.Credentials{ StorageAccount: options.StorageAccount, StorageAccessKey: options.StorageAccessKey, } metadata := azure.ImageMetadata{ ContainerName: options.Container, ImageName: t.ImageName, } const azureMaxUploadGoroutines = 4 err := azure.UploadImage( credentials, metadata, path.Join(outputDirectory, options.Filename), azureMaxUploadGoroutines, ) if err != nil { r = append(r, err) continue } default: r = append(r, fmt.Errorf("invalid target type")) } } err = os.RemoveAll(outputDirectory) if err != nil { log.Printf("Error removing osbuild output directory (%s): %v", outputDirectory, err) } if len(r) > 0 { return result, &TargetsError{r} } return result, nil } // Regularly ask osbuild-composer if the compose we're currently working on was // canceled and exit the process if it was. // It would be cleaner to kill the osbuild process using (`exec.CommandContext` // or similar), but osbuild does not currently support this. Exiting here will // make systemd clean up the whole cgroup and restart this service. func WatchJob(ctx context.Context, client *worker.Client, job *worker.Job) { for { select { case <-time.After(15 * time.Second): if client.JobCanceled(job) { log.Println("Job was canceled. Exiting.") os.Exit(0) } case <-ctx.Done(): return } } } func main() { var unix bool flag.BoolVar(&unix, "unix", false, "Interpret 'address' as a path to a unix domain socket instead of a network address") flag.Usage = func() { fmt.Fprintf(flag.CommandLine.Output(), "Usage: %s [-unix] address\n", os.Args[0]) flag.PrintDefaults() os.Exit(0) } flag.Parse() address := flag.Arg(0) if address == "" { flag.Usage() } cacheDirectory, ok := os.LookupEnv("CACHE_DIRECTORY") if !ok { log.Fatal("CACHE_DIRECTORY is not set. Is the service file missing CacheDirectory=?") } store := path.Join(cacheDirectory, "osbuild-store") var client *worker.Client if unix { client = worker.NewClientUnix(address) } else { conf, err := createTLSConfig(&connectionConfig{ CACertFile: "/etc/osbuild-composer/ca-crt.pem", ClientKeyFile: "/etc/osbuild-composer/worker-key.pem", ClientCertFile: "/etc/osbuild-composer/worker-crt.pem", }) if err != nil { log.Fatalf("Error creating TLS config: %v", err) } client = worker.NewClient(address, conf) } for { fmt.Println("Waiting for a new job...") job, err := client.AddJob() if err != nil { log.Fatal(err) } fmt.Printf("Running job %s\n", job.Id) ctx, cancel := context.WithCancel(context.Background()) go WatchJob(ctx, client, job) var status common.ImageBuildState result, err := RunJob(job, store, client.UploadImage) if err != nil { log.Printf(" Job failed: %v", err) status = common.IBFailed // If the error comes from osbuild, retrieve the result if osbuildError, ok := err.(*OSBuildError); ok { result = osbuildError.Result } // Ensure we always have a non-nil result, composer doesn't like nils. // This can happen in cases when OSBuild crashes and doesn't produce // a meaningful output. E.g. when the machine runs of disk space. if result == nil { result = &common.ComposeResult{ Success: false, } } // set the success to false on every error. This is hacky but composer // currently relies only on this flag to decide whether a compose was // successful. There's no different way how to inform composer that // e.g. an upload fail. Therefore, this line reuses the osbuild success // flag to indicate all error kinds. result.Success = false } else { log.Printf(" 🎉 Job completed successfully: %s", job.Id) status = common.IBFinished } // signal to WatchJob() that it can stop watching cancel() err = client.UpdateJob(job, status, result) if err != nil { log.Fatalf("Error reporting job result: %v", err) } } }