Use case -------- If Endpoint is not set and Region is - upload to AWS S3 If both the Endpoint and Region are set - upload the Generic S3 via Weldr API If neither the Endpoint and Region are set - upload the Generic S3 via Composer API (use configuration) jobimpl-osbuild --------------- Add configuration fields for Generic S3 upload Support S3 upload requests coming from Weldr or Composer API to either AWS or Generic S3 Weldr API for Generic S3 requires that all connection parameters but the credentials be passed in the API call Composer API for Generic S3 requires that all conneciton parameters are taken from the configuration Adjust to the consolidation in Target and UploadOptions Target and UploadOptions ------------------------ Add the fields that were specific to the Generic S3 structures to the AWS S3 one Remove the structures for Generic S3 and always use the AWS S3 ones Worker Main ----------- Add Endpoint, Region, Bucket, CABundle and SkipSSLVerification to the configuration structure Pass the values to the Server Weldr API --------- Keep the generic.s3 provider name to maintain the API, but unmarshel into awsS3UploadSettings tests - api.sh -------------- Allow the caller to specifiy either AWS or Generic S3 upload targets for specific image types Implement the pieces required for testing upload to a Generic S3 service In some cases generalize the AWS S3 functions for reuse GitLab CI --------- Add test case for api.sh tests with edge-commit and generic S3
496 lines
14 KiB
Go
496 lines
14 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"os"
|
|
"path"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/BurntSushi/toml"
|
|
"github.com/aws/aws-sdk-go/aws"
|
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
|
"github.com/aws/aws-sdk-go/aws/ec2metadata"
|
|
"github.com/aws/aws-sdk-go/aws/session"
|
|
"github.com/aws/aws-sdk-go/service/autoscaling"
|
|
"github.com/sirupsen/logrus"
|
|
|
|
"github.com/osbuild/osbuild-composer/internal/common"
|
|
"github.com/osbuild/osbuild-composer/internal/dnfjson"
|
|
"github.com/osbuild/osbuild-composer/internal/upload/azure"
|
|
"github.com/osbuild/osbuild-composer/internal/upload/koji"
|
|
"github.com/osbuild/osbuild-composer/internal/worker"
|
|
)
|
|
|
|
const configFile = "/etc/osbuild-worker/osbuild-worker.toml"
|
|
const backoffDuration = time.Second * 10
|
|
|
|
type connectionConfig struct {
|
|
CACertFile string
|
|
ClientKeyFile string
|
|
ClientCertFile string
|
|
}
|
|
|
|
// Represents the implementation of a job type as defined by the worker API.
|
|
type JobImplementation interface {
|
|
Run(job worker.Job) error
|
|
}
|
|
|
|
func createTLSConfig(config *connectionConfig) (*tls.Config, error) {
|
|
caCertPEM, err := ioutil.ReadFile(config.CACertFile)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
roots := x509.NewCertPool()
|
|
ok := roots.AppendCertsFromPEM(caCertPEM)
|
|
if !ok {
|
|
return nil, errors.New("failed to append root certificate")
|
|
}
|
|
|
|
var certs []tls.Certificate
|
|
if config.ClientKeyFile != "" && config.ClientCertFile != "" {
|
|
cert, err := tls.LoadX509KeyPair(config.ClientCertFile, config.ClientKeyFile)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
certs = append(certs, cert)
|
|
}
|
|
|
|
return &tls.Config{
|
|
RootCAs: roots,
|
|
Certificates: certs,
|
|
MinVersion: tls.VersionTLS12,
|
|
}, nil
|
|
}
|
|
|
|
// Regularly ask osbuild-composer if the compose we're currently working on was
|
|
// canceled and exit the process if it was.
|
|
// It would be cleaner to kill the osbuild process using (`exec.CommandContext`
|
|
// or similar), but osbuild does not currently support this. Exiting here will
|
|
// make systemd clean up the whole cgroup and restart this service.
|
|
func WatchJob(ctx context.Context, job worker.Job) {
|
|
for {
|
|
select {
|
|
case <-time.After(15 * time.Second):
|
|
canceled, err := job.Canceled()
|
|
if err == nil && canceled {
|
|
logrus.Info("Job was canceled. Exiting.")
|
|
os.Exit(0)
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// protect an AWS instance from scaling and/or terminating.
|
|
func setProtection(protected bool) {
|
|
// create a new session
|
|
awsSession, err := session.NewSession()
|
|
if err != nil {
|
|
logrus.Debugf("Error getting an AWS session, %s", err)
|
|
return
|
|
}
|
|
|
|
// get the identity for the instanceID
|
|
identity, err := ec2metadata.New(awsSession).GetInstanceIdentityDocument()
|
|
if err != nil {
|
|
logrus.Debugf("Error getting the identity document, %s", err)
|
|
return
|
|
}
|
|
|
|
svc := autoscaling.New(awsSession, aws.NewConfig().WithRegion(identity.Region))
|
|
|
|
// get the autoscaling group info for the auto scaling group name
|
|
asInstanceInput := &autoscaling.DescribeAutoScalingInstancesInput{
|
|
InstanceIds: []*string{
|
|
aws.String(identity.InstanceID),
|
|
},
|
|
}
|
|
asInstanceOutput, err := svc.DescribeAutoScalingInstances(asInstanceInput)
|
|
if err != nil {
|
|
if aerr, ok := err.(awserr.Error); ok {
|
|
logrus.Warningf("Error getting the Autoscaling instances: %s %s", aerr.Code(), aerr.Error())
|
|
} else {
|
|
logrus.Errorf("Error getting the Autoscaling instances: unknown, %s", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
// make the request to protect (or unprotect) the instance
|
|
input := &autoscaling.SetInstanceProtectionInput{
|
|
AutoScalingGroupName: asInstanceOutput.AutoScalingInstances[0].AutoScalingGroupName,
|
|
InstanceIds: []*string{
|
|
aws.String(identity.InstanceID),
|
|
},
|
|
ProtectedFromScaleIn: aws.Bool(protected),
|
|
}
|
|
_, err = svc.SetInstanceProtection(input)
|
|
if err != nil {
|
|
if aerr, ok := err.(awserr.Error); ok {
|
|
logrus.Warningf("Error protecting instance: %s %s", aerr.Code(), aerr.Error())
|
|
} else {
|
|
logrus.Errorf("Error protecting instance: unknown, %s", err)
|
|
}
|
|
return
|
|
}
|
|
if protected {
|
|
logrus.Info("Instance protected")
|
|
} else {
|
|
logrus.Info("Instance protection removed")
|
|
}
|
|
}
|
|
|
|
// Requests and runs 1 job of specified type(s)
|
|
// Returning an error here will result in the worker backing off for a while and retrying
|
|
func RequestAndRunJob(client *worker.Client, acceptedJobTypes []string, jobImpls map[string]JobImplementation) error {
|
|
logrus.Debug("Waiting for a new job...")
|
|
job, err := client.RequestJob(acceptedJobTypes, common.CurrentArch())
|
|
if err == worker.ErrClientRequestJobTimeout {
|
|
logrus.Debugf("Requesting job timed out: %v", err)
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
logrus.Errorf("Requesting job failed: %v", err)
|
|
return err
|
|
}
|
|
|
|
impl, exists := jobImpls[job.Type()]
|
|
if !exists {
|
|
logrus.Errorf("Ignoring job with unknown type %s", job.Type())
|
|
return err
|
|
}
|
|
|
|
// Depsolve requests needs reactivity, since setting the protection can take up to 6s to timeout if the worker isn't
|
|
// in an AWS env, disable this setting for them.
|
|
if job.Type() != "depsolve" {
|
|
setProtection(true)
|
|
defer setProtection(false)
|
|
}
|
|
|
|
logrus.Infof("Running job '%s' (%s)\n", job.Id(), job.Type())
|
|
|
|
ctx, cancelWatcher := context.WithCancel(context.Background())
|
|
go WatchJob(ctx, job)
|
|
|
|
err = impl.Run(job)
|
|
cancelWatcher()
|
|
if err != nil {
|
|
logrus.Warnf("Job '%s' (%s) failed: %v", job.Id(), job.Type(), err)
|
|
// Don't return this error so the worker picks up the next job immediately
|
|
return nil
|
|
}
|
|
|
|
logrus.Infof("Job '%s' (%s) finished", job.Id(), job.Type())
|
|
return nil
|
|
}
|
|
|
|
func main() {
|
|
var config struct {
|
|
Composer *struct {
|
|
Proxy string `toml:"proxy"`
|
|
}
|
|
Koji map[string]struct {
|
|
Kerberos *struct {
|
|
Principal string `toml:"principal"`
|
|
KeyTab string `toml:"keytab"`
|
|
} `toml:"kerberos,omitempty"`
|
|
} `toml:"koji"`
|
|
GCP *struct {
|
|
Credentials string `toml:"credentials"`
|
|
} `toml:"gcp"`
|
|
Azure *struct {
|
|
Credentials string `toml:"credentials"`
|
|
} `toml:"azure"`
|
|
AWS *struct {
|
|
Credentials string `toml:"credentials"`
|
|
Bucket string `toml:"bucket"`
|
|
} `toml:"aws"`
|
|
GenericS3 *struct {
|
|
Credentials string `toml:"credentials"`
|
|
Endpoint string `toml:"endpoint"`
|
|
Region string `toml:"region"`
|
|
Bucket string `toml:"bucket"`
|
|
CABundle string `toml:"ca_bundle"`
|
|
SkipSSLVerification bool `toml:"skip_ssl_verification"`
|
|
} `toml:"generic_s3"`
|
|
Authentication *struct {
|
|
OAuthURL string `toml:"oauth_url"`
|
|
OfflineTokenPath string `toml:"offline_token"`
|
|
ClientId string `toml:"client_id"`
|
|
ClientSecretPath string `toml:"client_secret"`
|
|
} `toml:"authentication"`
|
|
RelaxTimeoutFactor uint `toml:"RelaxTimeoutFactor"`
|
|
BasePath string `toml:"base_path"`
|
|
DNFJson string `toml:"dnf-json"`
|
|
}
|
|
var unix bool
|
|
flag.BoolVar(&unix, "unix", false, "Interpret 'address' as a path to a unix domain socket instead of a network address")
|
|
|
|
flag.Usage = func() {
|
|
fmt.Fprintf(flag.CommandLine.Output(), "Usage: %s [-unix] address\n", os.Args[0])
|
|
flag.PrintDefaults()
|
|
}
|
|
|
|
flag.Parse()
|
|
|
|
address := flag.Arg(0)
|
|
if address == "" {
|
|
flag.Usage()
|
|
os.Exit(2)
|
|
}
|
|
|
|
_, err := toml.DecodeFile(configFile, &config)
|
|
if err == nil {
|
|
logrus.Info("Composer configuration:")
|
|
encoder := toml.NewEncoder(logrus.StandardLogger().WriterLevel(logrus.InfoLevel))
|
|
err := encoder.Encode(&config)
|
|
if err != nil {
|
|
logrus.Fatalf("Could not print config: %v", err)
|
|
}
|
|
} else if !os.IsNotExist(err) {
|
|
logrus.Fatalf("Could not load config file '%s': %v", configFile, err)
|
|
}
|
|
|
|
if config.BasePath == "" {
|
|
config.BasePath = "/api/worker/v1"
|
|
}
|
|
|
|
cacheDirectory, ok := os.LookupEnv("CACHE_DIRECTORY")
|
|
if !ok {
|
|
logrus.Fatal("CACHE_DIRECTORY is not set. Is the service file missing CacheDirectory=?")
|
|
}
|
|
store := path.Join(cacheDirectory, "osbuild-store")
|
|
rpmmd_cache := path.Join(cacheDirectory, "rpmmd")
|
|
output := path.Join(cacheDirectory, "output")
|
|
_ = os.Mkdir(output, os.ModeDir)
|
|
|
|
kojiServers := make(map[string]koji.GSSAPICredentials)
|
|
for server, creds := range config.Koji {
|
|
if creds.Kerberos == nil {
|
|
// For now we only support Kerberos authentication.
|
|
continue
|
|
}
|
|
kojiServers[server] = koji.GSSAPICredentials{
|
|
Principal: creds.Kerberos.Principal,
|
|
KeyTab: creds.Kerberos.KeyTab,
|
|
}
|
|
}
|
|
|
|
var client *worker.Client
|
|
if unix {
|
|
client = worker.NewClientUnix(worker.ClientConfig{
|
|
BaseURL: address,
|
|
BasePath: config.BasePath,
|
|
})
|
|
} else if config.Authentication != nil {
|
|
var conf *tls.Config
|
|
conConf := &connectionConfig{
|
|
CACertFile: "/etc/osbuild-composer/ca-crt.pem",
|
|
}
|
|
if _, err = os.Stat(conConf.CACertFile); err == nil {
|
|
conf, err = createTLSConfig(conConf)
|
|
if err != nil {
|
|
logrus.Fatalf("Error creating TLS config: %v", err)
|
|
}
|
|
}
|
|
|
|
token := ""
|
|
if config.Authentication.OfflineTokenPath != "" {
|
|
t, err := ioutil.ReadFile(config.Authentication.OfflineTokenPath)
|
|
if err != nil {
|
|
logrus.Fatalf("Could not read offline token: %v", err)
|
|
}
|
|
token = strings.TrimSpace(string(t))
|
|
}
|
|
|
|
clientSecret := ""
|
|
if config.Authentication.ClientSecretPath != "" {
|
|
cs, err := ioutil.ReadFile(config.Authentication.ClientSecretPath)
|
|
if err != nil {
|
|
logrus.Fatalf("Could not read client secret: %v", err)
|
|
}
|
|
clientSecret = strings.TrimSpace(string(cs))
|
|
}
|
|
|
|
proxy := ""
|
|
if config.Composer != nil && config.Composer.Proxy != "" {
|
|
proxy = config.Composer.Proxy
|
|
}
|
|
|
|
client, err = worker.NewClient(worker.ClientConfig{
|
|
BaseURL: fmt.Sprintf("https://%s", address),
|
|
TlsConfig: conf,
|
|
OfflineToken: token,
|
|
OAuthURL: config.Authentication.OAuthURL,
|
|
ClientId: config.Authentication.ClientId,
|
|
ClientSecret: clientSecret,
|
|
BasePath: config.BasePath,
|
|
ProxyURL: proxy,
|
|
})
|
|
if err != nil {
|
|
logrus.Fatalf("Error creating worker client: %v", err)
|
|
}
|
|
} else {
|
|
var conf *tls.Config
|
|
conConf := &connectionConfig{
|
|
CACertFile: "/etc/osbuild-composer/ca-crt.pem",
|
|
ClientKeyFile: "/etc/osbuild-composer/worker-key.pem",
|
|
ClientCertFile: "/etc/osbuild-composer/worker-crt.pem",
|
|
}
|
|
if _, err = os.Stat(conConf.CACertFile); err == nil {
|
|
conf, err = createTLSConfig(conConf)
|
|
if err != nil {
|
|
logrus.Fatalf("Error creating TLS config: %v", err)
|
|
}
|
|
}
|
|
|
|
proxy := ""
|
|
if config.Composer != nil && config.Composer.Proxy != "" {
|
|
proxy = config.Composer.Proxy
|
|
}
|
|
|
|
client, err = worker.NewClient(worker.ClientConfig{
|
|
BaseURL: fmt.Sprintf("https://%s", address),
|
|
TlsConfig: conf,
|
|
BasePath: config.BasePath,
|
|
ProxyURL: proxy,
|
|
})
|
|
if err != nil {
|
|
logrus.Fatalf("Error creating worker client: %v", err)
|
|
}
|
|
}
|
|
|
|
// Load Azure credentials early. If the credentials file is malformed,
|
|
// we can report the issue early instead of waiting for the first osbuild
|
|
// job with the org.osbuild.azure.image target.
|
|
var azureCredentials *azure.Credentials
|
|
if config.Azure != nil {
|
|
azureCredentials, err = azure.ParseAzureCredentialsFile(config.Azure.Credentials)
|
|
if err != nil {
|
|
logrus.Fatalf("cannot load azure credentials: %v", err)
|
|
}
|
|
}
|
|
|
|
// If the credentials are not provided in the configuration, then the
|
|
// worker will rely on the GCP library to authenticate using default means.
|
|
var gcpCredentials string
|
|
if config.GCP != nil {
|
|
gcpCredentials = config.GCP.Credentials
|
|
}
|
|
|
|
// If the credentials are not provided in the configuration, then the
|
|
// worker will look in $HOME/.aws/credentials or at the file pointed by
|
|
// the "AWS_SHARED_CREDENTIALS_FILE" variable.
|
|
var awsCredentials = ""
|
|
var awsBucket = ""
|
|
if config.AWS != nil {
|
|
awsCredentials = config.AWS.Credentials
|
|
awsBucket = config.AWS.Bucket
|
|
}
|
|
|
|
var genericS3Credentials = ""
|
|
var genericS3Endpoint = ""
|
|
var genericS3Region = ""
|
|
var genericS3Bucket = ""
|
|
var genericS3CABundle = ""
|
|
var genericS3SkipSSLVerification = false
|
|
if config.GenericS3 != nil {
|
|
genericS3Credentials = config.GenericS3.Credentials
|
|
genericS3Endpoint = config.GenericS3.Endpoint
|
|
genericS3Region = config.GenericS3.Region
|
|
genericS3Bucket = config.GenericS3.Bucket
|
|
genericS3CABundle = config.GenericS3.CABundle
|
|
genericS3SkipSSLVerification = config.GenericS3.SkipSSLVerification
|
|
}
|
|
|
|
// depsolve jobs can be done during other jobs
|
|
depsolveCtx, depsolveCtxCancel := context.WithCancel(context.Background())
|
|
solver := dnfjson.NewBaseSolver(rpmmd_cache)
|
|
if config.DNFJson != "" {
|
|
solver.SetDNFJSONPath(config.DNFJson)
|
|
}
|
|
defer depsolveCtxCancel()
|
|
go func() {
|
|
jobImpls := map[string]JobImplementation{
|
|
"depsolve": &DepsolveJobImpl{
|
|
Solver: solver,
|
|
},
|
|
}
|
|
acceptedJobTypes := []string{}
|
|
for jt := range jobImpls {
|
|
acceptedJobTypes = append(acceptedJobTypes, jt)
|
|
}
|
|
|
|
for {
|
|
err := RequestAndRunJob(client, acceptedJobTypes, jobImpls)
|
|
if err != nil {
|
|
logrus.Warn("Received error from RequestAndRunJob, backing off")
|
|
time.Sleep(backoffDuration)
|
|
}
|
|
|
|
select {
|
|
case <-depsolveCtx.Done():
|
|
return
|
|
default:
|
|
continue
|
|
}
|
|
|
|
}
|
|
}()
|
|
|
|
// non-depsolve job
|
|
jobImpls := map[string]JobImplementation{
|
|
"osbuild": &OSBuildJobImpl{
|
|
Store: store,
|
|
Output: output,
|
|
KojiServers: kojiServers,
|
|
GCPCreds: gcpCredentials,
|
|
AzureCreds: azureCredentials,
|
|
AWSCreds: awsCredentials,
|
|
AWSBucket: awsBucket,
|
|
S3Config: S3Configuration{
|
|
Creds: genericS3Credentials,
|
|
Endpoint: genericS3Endpoint,
|
|
Region: genericS3Region,
|
|
Bucket: genericS3Bucket,
|
|
CABundle: genericS3CABundle,
|
|
SkipSSLVerification: genericS3SkipSSLVerification,
|
|
},
|
|
},
|
|
"osbuild-koji": &OSBuildKojiJobImpl{
|
|
Store: store,
|
|
Output: output,
|
|
KojiServers: kojiServers,
|
|
relaxTimeoutFactor: config.RelaxTimeoutFactor,
|
|
},
|
|
"koji-init": &KojiInitJobImpl{
|
|
KojiServers: kojiServers,
|
|
relaxTimeoutFactor: config.RelaxTimeoutFactor,
|
|
},
|
|
"koji-finalize": &KojiFinalizeJobImpl{
|
|
KojiServers: kojiServers,
|
|
relaxTimeoutFactor: config.RelaxTimeoutFactor,
|
|
},
|
|
}
|
|
|
|
acceptedJobTypes := []string{}
|
|
for jt := range jobImpls {
|
|
acceptedJobTypes = append(acceptedJobTypes, jt)
|
|
}
|
|
|
|
for {
|
|
err = RequestAndRunJob(client, acceptedJobTypes, jobImpls)
|
|
if err != nil {
|
|
logrus.Warn("Received error from RequestAndRunJob, backing off")
|
|
time.Sleep(backoffDuration)
|
|
}
|
|
}
|
|
}
|