debian-forge-composer/cmd/osbuild-composer/composer.go
Tomáš Hozza cdf57e5bc1 osbuild-composer/config: support specifying distro aliases
Add new configuration option `distro_aliases`, which is a map of
strings, allowing to specify distro name alias for supported
distributions.

Define aliases for RHEL major versions without the minor version
specified.

For now, the distro aliases map is not used by any API
implementation and it is ignored.

Signed-off-by: Tomáš Hozza <thozza@redhat.com>
2024-01-26 11:32:34 +01:00

451 lines
11 KiB
Go

package main
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"log"
"net"
"net/http"
"os"
"os/signal"
"path"
"syscall"
"time"
"github.com/prometheus/client_golang/prometheus/promhttp"
logrus "github.com/sirupsen/logrus"
"github.com/osbuild/osbuild-composer/pkg/jobqueue"
"github.com/osbuild/osbuild-composer/pkg/jobqueue/dbjobqueue"
"github.com/osbuild/images/pkg/distrofactory"
"github.com/osbuild/osbuild-composer/internal/auth"
"github.com/osbuild/osbuild-composer/internal/cloudapi"
v2 "github.com/osbuild/osbuild-composer/internal/cloudapi/v2"
"github.com/osbuild/osbuild-composer/internal/dnfjson"
"github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue"
"github.com/osbuild/osbuild-composer/internal/weldr"
"github.com/osbuild/osbuild-composer/internal/worker"
)
type Composer struct {
config *ComposerConfigFile
stateDir string
cacheDir string
logger *log.Logger
distros *distrofactory.Factory
solver *dnfjson.BaseSolver
workers *worker.Server
weldr *weldr.API
api *cloudapi.Server
weldrListener, localWorkerListener, workerListener, apiListener, promListener net.Listener
}
func NewComposer(config *ComposerConfigFile, stateDir, cacheDir string) (*Composer, error) {
c := Composer{
config: config,
stateDir: stateDir,
cacheDir: cacheDir,
}
workerConfig := worker.Config{
BasePath: config.Worker.BasePath,
JWTEnabled: config.Worker.EnableJWT,
TenantProviderFields: config.Worker.JWTTenantProviderFields,
}
var err error
if config.Worker.EnableArtifacts {
workerConfig.ArtifactsDir, err = c.ensureStateDirectory("artifacts", 0755)
if err != nil {
return nil, err
}
}
c.distros = distrofactory.NewDefault()
// TODO: set the c.config.DistroAliases to the distrofactory
// More work is needed to make the distro aliases behavior consistent for Weldr API,
// specifically for picking the correct repositories definition.
c.solver = dnfjson.NewBaseSolver(path.Join(c.cacheDir, "rpmmd"))
c.solver.SetDNFJSONPath(c.config.DNFJson)
var jobs jobqueue.JobQueue
if config.Worker.PGDatabase != "" {
dbURL := fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=%s",
config.Worker.PGUser,
config.Worker.PGPassword,
config.Worker.PGHost,
config.Worker.PGPort,
config.Worker.PGDatabase,
config.Worker.PGSSLMode,
)
if config.Worker.PGMaxConns > 0 {
dbURL += fmt.Sprintf("&pool_max_conns=%d", config.Worker.PGMaxConns)
}
jobs, err = dbjobqueue.New(dbURL)
if err != nil {
return nil, fmt.Errorf("cannot create jobqueue: %v", err)
}
} else {
queueDir, err := c.ensureStateDirectory("jobs", 0700)
if err != nil {
return nil, err
}
jobs, err = fsjobqueue.New(queueDir)
if err != nil {
return nil, fmt.Errorf("cannot create jobqueue: %v", err)
}
}
workerConfig.RequestJobTimeout, err = time.ParseDuration(config.Worker.RequestJobTimeout)
if err != nil {
return nil, fmt.Errorf("Unable to parse request job timeout: %v", err)
}
c.workers = worker.NewServer(c.logger, jobs, workerConfig)
return &c, nil
}
func (c *Composer) InitWeldr(repoPaths []string, weldrListener net.Listener,
distrosImageTypeDenylist map[string][]string) (err error) {
c.weldr, err = weldr.New(repoPaths, c.stateDir, c.solver, c.distros, c.logger, c.workers, distrosImageTypeDenylist)
if err != nil {
return err
}
c.weldrListener = weldrListener
// Preload the Metadata for all the supported distros
c.weldr.PreloadMetadata()
return nil
}
func (c *Composer) InitMetricsAPI(prometheus net.Listener) {
c.promListener = prometheus
}
func (c *Composer) InitAPI(cert, key string, enableTLS bool, enableMTLS bool, enableJWT bool, l net.Listener) error {
config := v2.ServerConfig{
JWTEnabled: c.config.Koji.EnableJWT,
TenantProviderFields: c.config.Koji.JWTTenantProviderFields,
}
c.api = cloudapi.NewServer(c.workers, c.distros, config)
if !enableTLS {
c.apiListener = l
return nil
}
// If both are off or both are on, error out
if enableJWT == enableMTLS {
return fmt.Errorf("API: Either mTLS or JWT authentication must be enabled")
}
clientAuth := tls.RequireAndVerifyClientCert
if enableJWT {
// jwt enabled => tls listener without client auth
clientAuth = tls.NoClientCert
}
tlsConfig, err := createTLSConfig(&connectionConfig{
CACertFile: c.config.Koji.CA,
ServerKeyFile: key,
ServerCertFile: cert,
AllowedDomains: c.config.Koji.AllowedDomains,
ClientAuth: clientAuth,
})
if err != nil {
return fmt.Errorf("Error creating TLS configuration: %v", err)
}
c.apiListener = tls.NewListener(l, tlsConfig)
return nil
}
func (c *Composer) InitLocalWorker(l net.Listener) {
c.localWorkerListener = l
}
func (c *Composer) InitRemoteWorkers(cert, key string, enableTLS bool, enableMTLS bool, enableJWT bool, l net.Listener) error {
if !enableTLS {
c.workerListener = l
return nil
}
// If both are off or both are on, error out
if enableJWT == enableMTLS {
return fmt.Errorf("Remote worker API: Either mTLS or JWT authentication must be enabled")
}
clientAuth := tls.RequireAndVerifyClientCert
if enableJWT {
// jwt enabled => tls listener without client auth
clientAuth = tls.NoClientCert
}
tlsConfig, err := createTLSConfig(&connectionConfig{
CACertFile: c.config.Worker.CA,
ServerKeyFile: key,
ServerCertFile: cert,
AllowedDomains: c.config.Worker.AllowedDomains,
ClientAuth: clientAuth,
})
if err != nil {
return fmt.Errorf("Error creating TLS configuration for remote worker API: %v", err)
}
c.workerListener = tls.NewListener(l, tlsConfig)
return nil
}
// Start Composer with all the APIs that had their respective Init*() called.
//
// Running without the weldr API is currently not supported.
func (c *Composer) Start() error {
// sanity checks
if c.localWorkerListener == nil && c.workerListener == nil {
logrus.Fatal("neither the local worker socket nor the remote worker socket is enabled, osbuild-composer is useless without workers")
}
if c.apiListener == nil && c.weldrListener == nil {
logrus.Fatal("neither the weldr API socket nor the composer API socket is enabled, osbuild-composer is useless without one of these APIs enabled")
}
var localWorkerAPI, remoteWorkerAPI, composerAPI, prometheusAPI *http.Server
if c.localWorkerListener != nil {
localWorkerAPI = &http.Server{
ErrorLog: c.logger,
Handler: c.workers.Handler(),
ReadHeaderTimeout: 5 * time.Second,
}
go func() {
err := localWorkerAPI.Serve(c.localWorkerListener)
if err != nil && err != http.ErrServerClosed {
panic(err)
}
}()
}
if c.workerListener != nil {
handler := c.workers.Handler()
var err error
if c.config.Worker.EnableJWT {
keysURLs := c.config.Worker.JWTKeysURLs
handler, err = auth.BuildJWTAuthHandler(
keysURLs,
c.config.Worker.JWTKeysCA,
c.config.Worker.JWTACLFile,
[]string{
"/api/image-builder-worker/v1/openapi/?$",
},
handler,
)
if err != nil {
panic(err)
}
}
remoteWorkerAPI = &http.Server{
ErrorLog: c.logger,
Handler: handler,
ReadHeaderTimeout: 5 * time.Second,
}
go func() {
err := remoteWorkerAPI.Serve(c.workerListener)
if err != nil && err != http.ErrServerClosed {
panic(err)
}
}()
}
if c.apiListener != nil {
const apiRouteV2 = "/api/image-builder-composer/v2"
mux := http.NewServeMux()
// Add a "/" here, because http.ServeMux expects the
// trailing slash for rooted subtrees, whereas the
// handler functions don't.
mux.Handle(apiRouteV2+"/", c.api.V2(apiRouteV2))
handler := http.Handler(mux)
var err error
if c.config.Koji.EnableJWT {
keysURLs := c.config.Koji.JWTKeysURLs
handler, err = auth.BuildJWTAuthHandler(
keysURLs,
c.config.Koji.JWTKeysCA,
c.config.Koji.JWTACLFile,
[]string{
"/api/image-builder-composer/v2/openapi/?$",
"/api/image-builder-composer/v2/errors/?$",
}, mux)
if err != nil {
panic(err)
}
}
composerAPI = &http.Server{
ErrorLog: c.logger,
Handler: handler,
ReadHeaderTimeout: 5 * time.Second,
}
go func() {
err := composerAPI.Serve(c.apiListener)
if err != nil && err != http.ErrServerClosed {
panic(err)
}
}()
}
if c.promListener != nil {
// metrics listener on another port
metricsMux := http.NewServeMux()
metricsMux.Handle("/metrics", promhttp.Handler().(http.HandlerFunc))
prometheusAPI = &http.Server{
ErrorLog: c.logger,
Handler: metricsMux,
ReadHeaderTimeout: 5 * time.Second,
}
go func() {
err := prometheusAPI.Serve(c.promListener)
if err != nil && err != http.ErrServerClosed {
panic(err)
}
}()
}
if c.weldrListener != nil {
go func() {
err := c.weldr.Serve(c.weldrListener)
if err != nil && err != http.ErrServerClosed {
panic(err)
}
}()
}
sigint := make(chan os.Signal, 1)
signal.Notify(sigint, syscall.SIGTERM)
signal.Notify(sigint, syscall.SIGINT)
// block until interrupted
<-sigint
logrus.Info("Shutting down.")
if c.apiListener != nil {
// First, close all listeners and then wait for all goroutines to finish.
err := composerAPI.Shutdown(context.Background())
c.api.Shutdown()
if err != nil {
panic(err)
}
}
if c.promListener != nil {
err := prometheusAPI.Shutdown(context.Background())
if err != nil {
panic(err)
}
}
if c.localWorkerListener != nil {
err := localWorkerAPI.Shutdown(context.Background())
if err != nil {
panic(err)
}
}
if c.workerListener != nil {
err := remoteWorkerAPI.Shutdown(context.Background())
if err != nil {
panic(err)
}
}
if c.weldrListener != nil {
err := c.weldr.Shutdown(context.Background())
if err != nil {
panic(err)
}
}
return nil
}
func (c *Composer) ensureStateDirectory(name string, perm os.FileMode) (string, error) {
d := path.Join(c.stateDir, name)
err := os.Mkdir(d, perm)
if err != nil && !os.IsExist(err) {
return "", fmt.Errorf("cannot create state directory %s: %v", name, err)
}
return d, nil
}
type connectionConfig struct {
// CA used for client certificate validation. If empty, then the CAs
// trusted by the host system are used.
CACertFile string
ServerKeyFile string
ServerCertFile string
AllowedDomains []string
ClientAuth tls.ClientAuthType
}
func createTLSConfig(c *connectionConfig) (*tls.Config, error) {
var roots *x509.CertPool
if c.CACertFile != "" {
caCertPEM, err := os.ReadFile(c.CACertFile)
if err != nil {
return nil, err
}
roots = x509.NewCertPool()
ok := roots.AppendCertsFromPEM(caCertPEM)
if !ok {
panic("failed to parse root certificate")
}
}
cert, err := tls.LoadX509KeyPair(c.ServerCertFile, c.ServerKeyFile)
if err != nil {
return nil, err
}
return &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: c.ClientAuth,
ClientCAs: roots,
MinVersion: tls.VersionTLS12,
VerifyPeerCertificate: func(rawCerts [][]byte, verifiedChains [][]*x509.Certificate) error {
for _, chain := range verifiedChains {
for _, domain := range c.AllowedDomains {
if chain[0].VerifyHostname(domain) == nil {
return nil
}
}
}
return errors.New("domain not in allowlist")
},
}, nil
}