debian-forge-composer/cmd/osbuild-worker/main.go
Tom Gundersen 6002a128b8 osbuild-worker: don't flush cache between jobs
Until osbuild-14, the images were unconditionally kept in the cache,
meaning the cache could grow very large. Now only the downloaded RPMs
are saved, which greatly limits how big it can grow.

Having the RPMs cached should speed up all but the first image build a
lot, so we should take advantage of that by not flushing the cache
between each build.

The cache is still flushed when the worker is stopped / restarted.

This moves the cache from /var/tmp/osbulid-worker* to
/var/cache/osbulid-worker/osbulid-worker-*. This means that each worker
gets a dedicated cache, in case there are several on one machine. In the
future we may want to combine them and only ever have one cache, but for
that we need improvements in parallel access and cache-cleanup.

Signed-off-by: Tom Gundersen <teg@jklm.no>
2020-06-07 19:22:52 +02:00

233 lines
5.2 KiB
Go

package main
import (
"crypto/tls"
"crypto/x509"
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"os"
"path"
"github.com/google/uuid"
"github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/target"
"github.com/osbuild/osbuild-composer/internal/upload/awsupload"
"github.com/osbuild/osbuild-composer/internal/upload/azure"
"github.com/osbuild/osbuild-composer/internal/worker"
)
type connectionConfig struct {
CACertFile string
ClientKeyFile string
ClientCertFile string
}
func createTLSConfig(config *connectionConfig) (*tls.Config, error) {
caCertPEM, err := ioutil.ReadFile(config.CACertFile)
if err != nil {
return nil, err
}
roots := x509.NewCertPool()
ok := roots.AppendCertsFromPEM(caCertPEM)
if !ok {
return nil, errors.New("failed to append root certificate")
}
cert, err := tls.LoadX509KeyPair(config.ClientCertFile, config.ClientKeyFile)
if err != nil {
return nil, err
}
return &tls.Config{
RootCAs: roots,
Certificates: []tls.Certificate{cert},
}, nil
}
type TargetsError struct {
Errors []error
}
func (e *TargetsError) Error() string {
errString := fmt.Sprintf("%d target(s) errored:\n", len(e.Errors))
for _, err := range e.Errors {
errString += err.Error() + "\n"
}
return errString
}
func RunJob(job *worker.Job, cacheDir string, uploadFunc func(uuid.UUID, string, io.Reader) error) (*common.ComposeResult, error) {
outputDirectory := path.Join(cacheDir, "output")
store := path.Join(cacheDir, "store")
result, err := RunOSBuild(job.Manifest, store, outputDirectory, os.Stderr)
if err != nil {
return nil, err
}
var r []error
for _, t := range job.Targets {
switch options := t.Options.(type) {
case *target.LocalTargetOptions:
f, err := os.Open(path.Join(outputDirectory, options.Filename))
if err != nil {
r = append(r, err)
continue
}
err = uploadFunc(job.Id, options.Filename, f)
if err != nil {
r = append(r, err)
continue
}
case *target.AWSTargetOptions:
a, err := awsupload.New(options.Region, options.AccessKeyID, options.SecretAccessKey)
if err != nil {
r = append(r, err)
continue
}
if options.Key == "" {
options.Key = job.Id.String()
}
_, err = a.Upload(path.Join(outputDirectory, options.Filename), options.Bucket, options.Key)
if err != nil {
r = append(r, err)
continue
}
/* TODO: communicate back the AMI */
_, err = a.Register(t.ImageName, options.Bucket, options.Key)
if err != nil {
r = append(r, err)
continue
}
case *target.AzureTargetOptions:
credentials := azure.Credentials{
StorageAccount: options.StorageAccount,
StorageAccessKey: options.StorageAccessKey,
}
metadata := azure.ImageMetadata{
ContainerName: options.Container,
ImageName: t.ImageName,
}
const azureMaxUploadGoroutines = 4
err := azure.UploadImage(
credentials,
metadata,
path.Join(outputDirectory, options.Filename),
azureMaxUploadGoroutines,
)
if err != nil {
r = append(r, err)
continue
}
default:
r = append(r, fmt.Errorf("invalid target type"))
}
}
err = os.RemoveAll(outputDirectory)
if err != nil {
log.Printf("Error removing osbuild output directory (%s): %v", outputDirectory, err)
}
if len(r) > 0 {
return result, &TargetsError{r}
}
return result, nil
}
func main() {
var unix bool
flag.BoolVar(&unix, "unix", false, "Interpret 'address' as a path to a unix domain socket instead of a network address")
flag.Usage = func() {
fmt.Fprintf(flag.CommandLine.Output(), "Usage: %s [-unix] address\n", os.Args[0])
flag.PrintDefaults()
os.Exit(0)
}
flag.Parse()
address := flag.Arg(0)
if address == "" {
flag.Usage()
}
cacheDirectory, ok := os.LookupEnv("CACHE_DIRECTORY")
if !ok {
log.Fatal("CACHE_DIRECTORY is not set. Is the service file missing CacheDirectory=?")
}
var client *worker.Client
if unix {
client = worker.NewClientUnix(address)
} else {
conf, err := createTLSConfig(&connectionConfig{
CACertFile: "/etc/osbuild-composer/ca-crt.pem",
ClientKeyFile: "/etc/osbuild-composer/worker-key.pem",
ClientCertFile: "/etc/osbuild-composer/worker-crt.pem",
})
if err != nil {
log.Fatalf("Error creating TLS config: %v", err)
}
client = worker.NewClient(address, conf)
}
tmpdir, err := ioutil.TempDir(cacheDirectory, "osbuild-worker-*")
if err != nil {
log.Fatalf("error setting up osbuild output directory: %v", err)
}
defer func() {
err := os.RemoveAll(tmpdir)
if err != nil {
log.Printf("Error removing osbuild-worker cache (%s): %v", tmpdir, err)
}
}()
for {
fmt.Println("Waiting for a new job...")
job, err := client.AddJob()
if err != nil {
log.Fatal(err)
}
fmt.Printf("Running job %s\n", job.Id)
var status common.ImageBuildState
result, err := RunJob(job, tmpdir, client.UploadImage)
if err != nil {
log.Printf(" Job failed: %v", err)
status = common.IBFailed
// If the error comes from osbuild, retrieve the result
if osbuildError, ok := err.(*OSBuildError); ok {
result = osbuildError.Result
}
} else {
status = common.IBFinished
}
err = client.UpdateJob(job, status, result)
if err != nil {
log.Fatalf("Error reporting job result: %v", err)
}
}
}