jobqueue: Introduce jobqueue backed by a postgres database
Co-authored-by: sanne <sanne.raymaekers@gmail.com>
This commit is contained in:
parent
871c6e9cbb
commit
9c2c92f729
419 changed files with 98376 additions and 956 deletions
|
|
@ -14,6 +14,8 @@ import (
|
|||
|
||||
"github.com/osbuild/osbuild-composer/internal/cloudapi"
|
||||
"github.com/osbuild/osbuild-composer/internal/distroregistry"
|
||||
"github.com/osbuild/osbuild-composer/internal/jobqueue"
|
||||
"github.com/osbuild/osbuild-composer/internal/jobqueue/dbjobqueue"
|
||||
"github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue"
|
||||
"github.com/osbuild/osbuild-composer/internal/kojiapi"
|
||||
"github.com/osbuild/osbuild-composer/internal/rpmmd"
|
||||
|
|
@ -61,9 +63,25 @@ func NewComposer(config *ComposerConfigFile, stateDir, cacheDir string, logger *
|
|||
|
||||
c.rpm = rpmmd.NewRPMMD(path.Join(c.cacheDir, "rpmmd"), "/usr/libexec/osbuild-composer/dnf-json")
|
||||
|
||||
jobs, err := fsjobqueue.New(queueDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create jobqueue: %v", err)
|
||||
var jobs jobqueue.JobQueue
|
||||
if config.Worker.PGDatabase != "" {
|
||||
dbURL := fmt.Sprintf("postgres://%s:%s@%s:%s/%s?sslmode=%s",
|
||||
config.Worker.PGUser,
|
||||
config.Worker.PGPassword,
|
||||
config.Worker.PGHost,
|
||||
config.Worker.PGPort,
|
||||
config.Worker.PGDatabase,
|
||||
config.Worker.PGSSLMode,
|
||||
)
|
||||
jobs, err = dbjobqueue.New(dbURL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create jobqueue: %v", err)
|
||||
}
|
||||
} else {
|
||||
jobs, err = fsjobqueue.New(queueDir)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create jobqueue: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
c.workers = worker.NewServer(c.logger, jobs, artifactsDir, c.config.Worker.IdentityFilter)
|
||||
|
|
|
|||
|
|
@ -1,7 +1,10 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"reflect"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
)
|
||||
|
|
@ -15,6 +18,12 @@ type ComposerConfigFile struct {
|
|||
AllowedDomains []string `toml:"allowed_domains"`
|
||||
CA string `toml:"ca"`
|
||||
IdentityFilter []string `toml:"identity_filter"`
|
||||
PGHost string `toml:"pg_host" env:"PGHOST"`
|
||||
PGPort string `toml:"pg_port" env:"PGPORT"`
|
||||
PGDatabase string `toml:"pg_database" env:"PGDATABASE"`
|
||||
PGUser string `toml:"pg_user" env:"PGUSER"`
|
||||
PGPassword string `toml:"pg_password" env:"PGPASSWORD"`
|
||||
PGSSLMode string `toml:"pg_ssl_mode" env:"PGSSLMODE"`
|
||||
} `toml:"worker"`
|
||||
ComposerAPI struct {
|
||||
IdentityFilter []string `toml:"identity_filter"`
|
||||
|
|
@ -27,9 +36,48 @@ func LoadConfig(name string) (*ComposerConfigFile, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = loadConfigFromEnv(&c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &c, nil
|
||||
}
|
||||
|
||||
func loadConfigFromEnv(intf interface{}) error {
|
||||
t := reflect.TypeOf(intf).Elem()
|
||||
v := reflect.ValueOf(intf).Elem()
|
||||
|
||||
for i := 0; i < v.NumField(); i++ {
|
||||
fieldT := t.Field(i)
|
||||
fieldV := v.Field(i)
|
||||
kind := fieldV.Kind()
|
||||
|
||||
switch kind {
|
||||
case reflect.String:
|
||||
key, ok := fieldT.Tag.Lookup("env")
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
confV, ok := os.LookupEnv(key)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
fieldV.SetString(confV)
|
||||
case reflect.Slice:
|
||||
// no-op
|
||||
continue
|
||||
case reflect.Struct:
|
||||
err := loadConfigFromEnv(fieldV.Addr().Interface())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("Unsupported type: %s", kind)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func DumpConfig(c *ComposerConfigFile, w io.Writer) error {
|
||||
return toml.NewEncoder(w).Encode(c)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ func TestEmpty(t *testing.T) {
|
|||
require.Empty(t, config.Koji.CA)
|
||||
require.Empty(t, config.Worker.AllowedDomains)
|
||||
require.Empty(t, config.Worker.CA)
|
||||
require.Empty(t, config.Worker.PGDatabase)
|
||||
}
|
||||
|
||||
func TestNonExisting(t *testing.T) {
|
||||
|
|
@ -34,4 +35,12 @@ func TestConfig(t *testing.T) {
|
|||
|
||||
require.Equal(t, config.Worker.AllowedDomains, []string{"osbuild.org"})
|
||||
require.Equal(t, config.Worker.CA, "/etc/osbuild-composer/ca-crt.pem")
|
||||
|
||||
require.Equal(t, "overwrite-me-db", config.Worker.PGDatabase)
|
||||
|
||||
require.NoError(t, os.Setenv("PGDATABASE", "composer-db"))
|
||||
config, err = LoadConfig("testdata/test.toml")
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, config)
|
||||
require.Equal(t, "composer-db", config.Worker.PGDatabase)
|
||||
}
|
||||
|
|
|
|||
1
cmd/osbuild-composer/testdata/test.toml
vendored
1
cmd/osbuild-composer/testdata/test.toml
vendored
|
|
@ -5,3 +5,4 @@ ca = "/etc/osbuild-composer/ca-crt.pem"
|
|||
[worker]
|
||||
allowed_domains = [ "osbuild.org" ]
|
||||
ca = "/etc/osbuild-composer/ca-crt.pem"
|
||||
pg_database = "overwrite-me-db"
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue