osbuild-composer: split out the actual service

Split the actual service into its own type `Composer` in composer.go.
main.go now (more or less) contains only collecting configuration from
the environment and the file system, as well as activation file
descriptors.

Aside from making the code easier to grok, this is a first step towards
running composer in a different environment than the one set up by
systemd.
This commit is contained in:
Lars Karlitski 2020-09-27 17:58:47 +02:00 committed by Tom Gundersen
parent 13d5129b56
commit 7aca756156
2 changed files with 315 additions and 211 deletions

View file

@ -0,0 +1,268 @@
package main
import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io/ioutil"
"log"
"net"
"os"
"path"
"github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue"
"github.com/osbuild/osbuild-composer/internal/kojiapi"
"github.com/osbuild/osbuild-composer/internal/rpmmd"
"github.com/osbuild/osbuild-composer/internal/store"
"github.com/osbuild/osbuild-composer/internal/upload/koji"
"github.com/osbuild/osbuild-composer/internal/weldr"
"github.com/osbuild/osbuild-composer/internal/worker"
"github.com/osbuild/osbuild-composer/internal/distro"
"github.com/osbuild/osbuild-composer/internal/distro/fedora31"
"github.com/osbuild/osbuild-composer/internal/distro/fedora32"
"github.com/osbuild/osbuild-composer/internal/distro/rhel8"
)
type Composer struct {
config *ComposerConfigFile
stateDir string
cacheDir string
logger *log.Logger
distros *distro.Registry
rpm rpmmd.RPMMD
workers *worker.Server
weldr *weldr.API
koji *kojiapi.Server
weldrListener, localWorkerListener, workerListener, kojiListener net.Listener
}
func NewComposer(config *ComposerConfigFile, stateDir, cacheDir string, logger *log.Logger) (*Composer, error) {
c := Composer{
config: config,
stateDir: stateDir,
cacheDir: cacheDir,
logger: logger,
}
queueDir, err := c.ensureStateDirectory("jobs", 0700)
if err != nil {
return nil, err
}
artifactsDir, err := c.ensureStateDirectory("artifacts", 0755)
if err != nil {
return nil, err
}
c.distros, err = distro.NewRegistry(fedora31.New(), fedora32.New(), rhel8.New())
if err != nil {
return nil, fmt.Errorf("Error loading distros: %v", err)
}
c.rpm = rpmmd.NewRPMMD(path.Join(c.cacheDir, "rpmmd"), "/usr/libexec/osbuild-composer/dnf-json")
// construct job types of the form osbuild:{arch} for all arches
jobTypes := []string{"osbuild"}
jobTypesMap := map[string]bool{}
for _, name := range c.distros.List() {
d := c.distros.GetDistro(name)
for _, arch := range d.ListArches() {
jt := "osbuild:" + arch
if !jobTypesMap[jt] {
jobTypesMap[jt] = true
jobTypes = append(jobTypes, jt)
}
}
}
jobs, err := fsjobqueue.New(queueDir, jobTypes)
if err != nil {
return nil, fmt.Errorf("cannot create jobqueue: %v", err)
}
c.workers = worker.NewServer(c.logger, jobs, artifactsDir)
return &c, nil
}
func (c *Composer) InitWeldr(repoPaths []string, weldrListener, localWorkerListener net.Listener) error {
archName := common.CurrentArch()
hostDistro, beta, err := c.distros.FromHost()
if err != nil {
return err
}
arch, err := hostDistro.GetArch(archName)
if err != nil {
return fmt.Errorf("Host distro does not support host architecture: %v", err)
}
// TODO: refactor to be more generic
name := hostDistro.Name()
if beta {
name += "-beta"
}
repos, err := rpmmd.LoadRepositories(repoPaths, name)
if err != nil {
return fmt.Errorf("Error loading repositories for %s: %v", hostDistro.Name(), err)
}
store := store.New(&c.stateDir, arch, c.logger)
compatOutputDir := path.Join(c.stateDir, "outputs")
c.weldr = weldr.New(c.rpm, arch, hostDistro, repos[archName], c.logger, store, c.workers, compatOutputDir)
c.weldrListener = weldrListener
c.localWorkerListener = localWorkerListener
return nil
}
func (c *Composer) InitKoji(cert, key string, l net.Listener) error {
servers := make(map[string]koji.GSSAPICredentials)
for name, creds := range c.config.Koji.Servers {
if creds.Kerberos != nil {
servers[name] = koji.GSSAPICredentials{
Principal: creds.Kerberos.Principal,
KeyTab: creds.Kerberos.KeyTab,
}
}
}
c.koji = kojiapi.NewServer(c.logger, c.workers, c.rpm, c.distros, servers)
tlsConfig, err := createTLSConfig(&connectionConfig{
CACertFile: c.config.Koji.CA,
ServerKeyFile: key,
ServerCertFile: cert,
AllowedDomains: c.config.Koji.AllowedDomains,
})
if err != nil {
return fmt.Errorf("Error creating TLS configuration for Koji API: %v", err)
}
c.kojiListener = tls.NewListener(l, tlsConfig)
return nil
}
func (c *Composer) InitRemoteWorkers(cert, key string, l net.Listener) error {
tlsConfig, err := createTLSConfig(&connectionConfig{
CACertFile: c.config.Worker.CA,
ServerKeyFile: key,
ServerCertFile: cert,
AllowedDomains: c.config.Worker.AllowedDomains,
})
if err != nil {
return fmt.Errorf("Error creating TLS configuration for remote worker API: %v", err)
}
c.workerListener = tls.NewListener(l, tlsConfig)
return nil
}
// Start Composer with all the APIs that had their respective Init*() called.
//
// Running without the weldr API is currently not supported.
func (c *Composer) Start() error {
if c.weldr == nil {
return errors.New("weldr was not initialized")
}
if c.localWorkerListener != nil {
go func() {
err := c.workers.Serve(c.localWorkerListener)
if err != nil {
panic(err)
}
}()
}
if c.workerListener != nil {
go func() {
err := c.workers.Serve(c.workerListener)
if err != nil {
panic(err)
}
}()
}
if c.kojiListener != nil {
go func() {
err := c.koji.Serve(c.kojiListener)
if err != nil {
panic(err)
}
}()
}
return c.weldr.Serve(c.weldrListener)
}
func (c *Composer) ensureStateDirectory(name string, perm os.FileMode) (string, error) {
d := path.Join(c.stateDir, name)
err := os.Mkdir(d, perm)
if err != nil && !os.IsExist(err) {
return "", fmt.Errorf("cannot create state directory %s: %v", name, err)
}
return d, nil
}
type connectionConfig struct {
// CA used for client certificate validation. If empty, then the CAs
// trusted by the host system are used.
CACertFile string
ServerKeyFile string
ServerCertFile string
AllowedDomains []string
}
func createTLSConfig(c *connectionConfig) (*tls.Config, error) {
var roots *x509.CertPool
if c.CACertFile != "" {
caCertPEM, err := ioutil.ReadFile(c.CACertFile)
if err != nil {
return nil, err
}
roots = x509.NewCertPool()
ok := roots.AppendCertsFromPEM(caCertPEM)
if !ok {
panic("failed to parse root certificate")
}
}
cert, err := tls.LoadX509KeyPair(c.ServerCertFile, c.ServerKeyFile)
if err != nil {
return nil, err
}
return &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: roots,
VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
for _, chain := range verifiedChains {
for _, domain := range c.AllowedDomains {
if chain[0].VerifyHostname(domain) == nil {
return nil
}
}
}
return errors.New("domain not in allowlist")
},
}, nil
}

View file

@ -1,80 +1,22 @@
package main
import (
"crypto/tls"
"crypto/x509"
"errors"
"flag"
"io/ioutil"
"log"
"os"
"path"
"github.com/osbuild/osbuild-composer/internal/distro/fedora31"
"github.com/osbuild/osbuild-composer/internal/distro/fedora32"
"github.com/osbuild/osbuild-composer/internal/distro/rhel8"
"github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue"
"github.com/osbuild/osbuild-composer/internal/kojiapi"
"github.com/osbuild/osbuild-composer/internal/upload/koji"
"github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/distro"
"github.com/osbuild/osbuild-composer/internal/rpmmd"
"github.com/osbuild/osbuild-composer/internal/store"
"github.com/osbuild/osbuild-composer/internal/weldr"
"github.com/osbuild/osbuild-composer/internal/worker"
"github.com/coreos/go-systemd/activation"
)
const configFile = "/etc/osbuild-composer/osbuild-composer.toml"
const (
configFile = "/etc/osbuild-composer/osbuild-composer.toml"
ServerKeyFile = "/etc/osbuild-composer/composer-key.pem"
ServerCertFile = "/etc/osbuild-composer/composer-crt.pem"
)
type connectionConfig struct {
// CA used for client certificate validation. If empty, then the CAs
// trusted by the host system are used.
CACertFile string
ServerKeyFile string
ServerCertFile string
AllowedDomains []string
}
func createTLSConfig(c *connectionConfig) (*tls.Config, error) {
var roots *x509.CertPool
if c.CACertFile != "" {
caCertPEM, err := ioutil.ReadFile(c.CACertFile)
if err != nil {
return nil, err
}
roots = x509.NewCertPool()
ok := roots.AppendCertsFromPEM(caCertPEM)
if !ok {
panic("failed to parse root certificate")
}
}
cert, err := tls.LoadX509KeyPair(c.ServerCertFile, c.ServerKeyFile)
if err != nil {
return nil, err
}
return &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: roots,
VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
for _, chain := range verifiedChains {
for _, domain := range c.AllowedDomains {
if chain[0].VerifyHostname(domain) == nil {
return nil
}
}
}
return errors.New("domain not in allowlist")
},
}, nil
var repositoryConfigs = []string{
"/etc/osbuild-composer",
"/usr/share/osbuild-composer",
}
func main() {
@ -82,6 +24,11 @@ func main() {
flag.BoolVar(&verbose, "v", false, "Print access log")
flag.Parse()
var logger *log.Logger
if verbose {
logger = log.New(os.Stdout, "", 0)
}
config, err := LoadConfig(configFile)
if err != nil {
if os.IsNotExist(err) {
@ -102,169 +49,58 @@ func main() {
log.Fatal("STATE_DIRECTORY is not set. Is the service file missing StateDirectory=?")
}
cacheDir, ok := os.LookupEnv("CACHE_DIRECTORY")
if !ok {
log.Fatal("CACHE_DIRECTORY is not set. Is the service file missing CacheDirectory=?")
}
composer, err := NewComposer(config, stateDir, cacheDir, logger)
if err != nil {
log.Fatalf("%v", err)
}
listeners, err := activation.ListenersWithNames()
if err != nil {
log.Fatalf("Could not get listening sockets: " + err.Error())
}
composerListeners, exists := listeners["osbuild-composer.socket"]
if !exists {
if l, exists := listeners["osbuild-composer.socket"]; exists {
if len(l) != 2 {
log.Fatalf("Expected two listeners in osbuild-composer.socket, but found %d", len(l))
}
err = composer.InitWeldr(repositoryConfigs, l[0], l[1])
if err != nil {
log.Fatalf("Error initializing weldr API: %v", err)
}
} else {
log.Fatalf("osbuild-composer.socket doesn't exist")
}
if len(composerListeners) != 2 {
log.Fatalf("Expected two listeners in osbuild-composer.socket, but found %d", len(composerListeners))
}
weldrListener := composerListeners[0]
jobListener := composerListeners[1]
cacheDirectory, ok := os.LookupEnv("CACHE_DIRECTORY")
if !ok {
log.Fatal("CACHE_DIRECTORY is not set. Is the service file missing CacheDirectory=?")
}
rpm := rpmmd.NewRPMMD(path.Join(cacheDirectory, "rpmmd"), "/usr/libexec/osbuild-composer/dnf-json")
distros, err := distro.NewRegistry(fedora31.New(), fedora32.New(), rhel8.New())
if err != nil {
log.Fatalf("Error loading distros: %v", err)
}
distribution, beta, err := distros.FromHost()
if err != nil {
log.Fatalf("Could not determine distro from host: " + err.Error())
}
arch, err := distribution.GetArch(common.CurrentArch())
if err != nil {
log.Fatalf("Host distro does not support host architecture: " + err.Error())
}
// TODO: refactor to be more generic
name := distribution.Name()
if beta {
name += "-beta"
}
repoMap, err := rpmmd.LoadRepositories([]string{"/etc/osbuild-composer", "/usr/share/osbuild-composer"}, name)
if err != nil {
log.Fatalf("Could not load repositories for %s: %v", distribution.Name(), err)
}
var logger *log.Logger
if verbose {
logger = log.New(os.Stdout, "", 0)
}
store := store.New(&stateDir, arch, logger)
queueDir := path.Join(stateDir, "jobs")
err = os.Mkdir(queueDir, 0700)
if err != nil && !os.IsExist(err) {
log.Fatalf("cannot create queue directory: %v", err)
}
// construct job types of the form osbuild:{arch} for all arches
jobTypes := []string{"osbuild"}
jobTypesMap := map[string]bool{}
for _, name := range distros.List() {
d := distros.GetDistro(name)
for _, arch := range d.ListArches() {
jt := "osbuild:" + arch
if !jobTypesMap[jt] {
jobTypesMap[jt] = true
jobTypes = append(jobTypes, jt)
}
}
}
jobs, err := fsjobqueue.New(queueDir, jobTypes)
if err != nil {
log.Fatalf("cannot create jobqueue: %v", err)
}
artifactsDir := path.Join(stateDir, "artifacts")
err = os.Mkdir(artifactsDir, 0755)
if err != nil && !os.IsExist(err) {
log.Fatalf("cannot create artifacts directory: %v", err)
}
compatOutputDir := path.Join(stateDir, "outputs")
workers := worker.NewServer(logger, jobs, artifactsDir)
weldrAPI := weldr.New(rpm, arch, distribution, repoMap[common.CurrentArch()], logger, store, workers, compatOutputDir)
go func() {
err := workers.Serve(jobListener)
common.PanicOnError(err)
}()
// Optionally run Koji API
if kojiListeners, exists := listeners["osbuild-composer-koji.socket"]; exists {
kojiServers := make(map[string]koji.GSSAPICredentials)
for server, creds := range config.Koji.Servers {
if creds.Kerberos == nil {
// For now we only support Kerberos authentication.
continue
}
kojiServers[server] = koji.GSSAPICredentials{
Principal: creds.Kerberos.Principal,
KeyTab: creds.Kerberos.KeyTab,
}
}
kojiServer := kojiapi.NewServer(logger, workers, rpm, distros, kojiServers)
tlsConfig, err := createTLSConfig(&connectionConfig{
CACertFile: config.Koji.CA,
ServerKeyFile: "/etc/osbuild-composer/composer-key.pem",
ServerCertFile: "/etc/osbuild-composer/composer-crt.pem",
AllowedDomains: config.Koji.AllowedDomains,
})
if err != nil {
log.Fatalf("TLS configuration cannot be created: " + err.Error())
}
if len(kojiListeners) != 1 {
// Use Fatal to call os.Exit with non-zero return value
if l, exists := listeners["osbuild-composer-koji.socket"]; exists {
if len(l) != 1 {
log.Fatal("The osbuild-composer-koji.socket unit is misconfigured. It should contain only one socket.")
}
kojiListener := tls.NewListener(kojiListeners[0], tlsConfig)
go func() {
err = kojiServer.Serve(kojiListener)
// If the koji server fails, take down the whole process, not just a single goroutine
log.Fatal("osbuild-composer-koji.socket failed: ", err)
}()
err = composer.InitKoji(ServerCertFile, ServerKeyFile, l[0])
if err != nil {
log.Fatalf("Error initializing koji API: %v", err)
}
}
if remoteWorkerListeners, exists := listeners["osbuild-remote-worker.socket"]; exists {
if len(remoteWorkerListeners) != 1 {
if l, exists := listeners["osbuild-remote-worker.socket"]; exists {
if len(l) != 1 {
log.Fatal("The osbuild-remote-worker.socket unit is misconfigured. It should contain only one socket.")
}
log.Printf("Starting remote listener\n")
tlsConfig, err := createTLSConfig(&connectionConfig{
CACertFile: config.Worker.CA,
ServerKeyFile: "/etc/osbuild-composer/composer-key.pem",
ServerCertFile: "/etc/osbuild-composer/composer-crt.pem",
AllowedDomains: config.Worker.AllowedDomains,
})
err = composer.InitRemoteWorkers(ServerCertFile, ServerKeyFile, l[0])
if err != nil {
log.Fatalf("TLS configuration cannot be created: " + err.Error())
log.Fatalf("Error initializing worker API: %v", err)
}
listener := tls.NewListener(remoteWorkerListeners[0], tlsConfig)
go func() {
err := workers.Serve(listener)
common.PanicOnError(err)
}()
}
err = weldrAPI.Serve(weldrListener)
common.PanicOnError(err)
err = composer.Start()
if err != nil {
log.Fatalf("%v", err)
}
}