When creating a pipeline the assembler includes an image size. This image size can be set when creating the pipeline but if it is 0 then a default image size will be used. The default is 2 GB except for ami images which are 6 GB.
661 lines
14 KiB
Go
661 lines
14 KiB
Go
// Package store contains primitives for representing and changing the
|
|
// osbuild-composer state.
|
|
package store
|
|
|
|
import (
|
|
"bytes"
|
|
"crypto/sha1"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/osbuild/osbuild-composer/internal/blueprint"
|
|
"github.com/osbuild/osbuild-composer/internal/distro"
|
|
"github.com/osbuild/osbuild-composer/internal/pipeline"
|
|
"github.com/osbuild/osbuild-composer/internal/rpmmd"
|
|
"github.com/osbuild/osbuild-composer/internal/target"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// A Store contains all the persistent state of osbuild-composer, and is serialized
|
|
// on every change, and deserialized on start.
|
|
type Store struct {
|
|
Blueprints map[string]blueprint.Blueprint `json:"blueprints"`
|
|
Workspace map[string]blueprint.Blueprint `json:"workspace"`
|
|
Composes map[uuid.UUID]Compose `json:"composes"`
|
|
Sources map[string]SourceConfig `json:"sources"`
|
|
BlueprintsChanges map[string]map[string]blueprint.Change `json:"changes"`
|
|
|
|
mu sync.RWMutex // protects all fields
|
|
pendingJobs chan Job
|
|
stateChannel chan []byte
|
|
distro distro.Distro
|
|
stateDir *string
|
|
}
|
|
|
|
// A Compose represent the task of building one image. It contains all the information
|
|
// necessary to generate the inputs for the job, as well as the job's state.
|
|
type Compose struct {
|
|
QueueStatus string `json:"queue_status"`
|
|
Blueprint *blueprint.Blueprint `json:"blueprint"`
|
|
OutputType string `json:"output-type"`
|
|
Pipeline *pipeline.Pipeline `json:"pipeline"`
|
|
Targets []*target.Target `json:"targets"`
|
|
JobCreated time.Time `json:"job_created"`
|
|
JobStarted time.Time `json:"job_started"`
|
|
JobFinished time.Time `json:"job_finished"`
|
|
Image *Image `json:"image"`
|
|
}
|
|
|
|
// A Job contains the information about a compose a worker needs to process it.
|
|
type Job struct {
|
|
ComposeID uuid.UUID
|
|
Distro string
|
|
Pipeline *pipeline.Pipeline
|
|
Targets []*target.Target
|
|
OutputType string
|
|
}
|
|
|
|
// An Image represents the image resulting from a compose.
|
|
type Image struct {
|
|
Path string
|
|
Mime string
|
|
Size int64
|
|
}
|
|
|
|
type SourceConfig struct {
|
|
Name string `json:"name"`
|
|
Type string `json:"type"`
|
|
URL string `json:"url"`
|
|
CheckGPG bool `json:"check_gpg"`
|
|
CheckSSL bool `json:"check_ssl"`
|
|
System bool `json:"system"`
|
|
}
|
|
|
|
type NotFoundError struct {
|
|
message string
|
|
}
|
|
|
|
func (e *NotFoundError) Error() string {
|
|
return e.message
|
|
}
|
|
|
|
type NotPendingError struct {
|
|
message string
|
|
}
|
|
|
|
func (e *NotPendingError) Error() string {
|
|
return e.message
|
|
}
|
|
|
|
type NotRunningError struct {
|
|
message string
|
|
}
|
|
|
|
func (e *NotRunningError) Error() string {
|
|
return e.message
|
|
}
|
|
|
|
type InvalidRequestError struct {
|
|
message string
|
|
}
|
|
|
|
func (e *InvalidRequestError) Error() string {
|
|
return e.message
|
|
}
|
|
|
|
func New(stateDir *string, distro distro.Distro) *Store {
|
|
var s Store
|
|
|
|
if stateDir != nil {
|
|
err := os.Mkdir(*stateDir+"/"+"outputs", 0700)
|
|
if err != nil && !os.IsExist(err) {
|
|
log.Fatalf("cannot create output directory")
|
|
}
|
|
|
|
stateFile := *stateDir + "/state.json"
|
|
state, err := ioutil.ReadFile(stateFile)
|
|
|
|
if err == nil {
|
|
err := json.Unmarshal(state, &s)
|
|
if err != nil {
|
|
log.Fatalf("invalid initial state: %v", err)
|
|
}
|
|
} else if !os.IsNotExist(err) {
|
|
log.Fatalf("cannot read state: %v", err)
|
|
}
|
|
|
|
s.stateChannel = make(chan []byte, 128)
|
|
|
|
go func() {
|
|
for {
|
|
err := writeFileAtomically(stateFile, <-s.stateChannel, 0600)
|
|
if err != nil {
|
|
log.Fatalf("cannot write state: %v", err)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
s.pendingJobs = make(chan Job, 200)
|
|
s.distro = distro
|
|
s.stateDir = stateDir
|
|
|
|
if s.Blueprints == nil {
|
|
s.Blueprints = make(map[string]blueprint.Blueprint)
|
|
}
|
|
if s.Workspace == nil {
|
|
s.Workspace = make(map[string]blueprint.Blueprint)
|
|
}
|
|
if s.Composes == nil {
|
|
s.Composes = make(map[uuid.UUID]Compose)
|
|
} else {
|
|
for composeID, compose := range s.Composes {
|
|
switch compose.QueueStatus {
|
|
case "RUNNING":
|
|
// We do not support resuming an in-flight build
|
|
compose.QueueStatus = "FAILED"
|
|
s.Composes[composeID] = compose
|
|
case "WAITING":
|
|
// Push waiting composes back into the pending jobs queue
|
|
s.pendingJobs <- Job{
|
|
ComposeID: composeID,
|
|
Distro: s.distro.Name(),
|
|
Pipeline: compose.Pipeline,
|
|
Targets: compose.Targets,
|
|
OutputType: compose.OutputType,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
if s.Sources == nil {
|
|
s.Sources = make(map[string]SourceConfig)
|
|
}
|
|
if s.BlueprintsChanges == nil {
|
|
s.BlueprintsChanges = make(map[string]map[string]blueprint.Change)
|
|
}
|
|
|
|
return &s
|
|
}
|
|
|
|
func writeFileAtomically(filename string, data []byte, mode os.FileMode) error {
|
|
dir, name := filepath.Dir(filename), filepath.Base(filename)
|
|
|
|
tmpfile, err := ioutil.TempFile(dir, name+"-*.tmp")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = tmpfile.Write(data)
|
|
if err != nil {
|
|
os.Remove(tmpfile.Name())
|
|
return err
|
|
}
|
|
|
|
err = tmpfile.Chmod(mode)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = tmpfile.Close()
|
|
if err != nil {
|
|
os.Remove(tmpfile.Name())
|
|
return err
|
|
}
|
|
|
|
err = os.Rename(tmpfile.Name(), filename)
|
|
if err != nil {
|
|
os.Remove(tmpfile.Name())
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Store) change(f func() error) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
result := f()
|
|
|
|
if s.stateChannel != nil {
|
|
serialized, err := json.Marshal(s)
|
|
if err != nil {
|
|
// we ought to know all types that go into the store
|
|
panic(err)
|
|
}
|
|
|
|
s.stateChannel <- serialized
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (s *Store) ListBlueprints() []string {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
names := make([]string, 0, len(s.Blueprints))
|
|
for name := range s.Blueprints {
|
|
names = append(names, name)
|
|
}
|
|
sort.Strings(names)
|
|
|
|
return names
|
|
}
|
|
|
|
func (s *Store) GetBlueprint(name string) (*blueprint.Blueprint, bool) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
bp, inWorkspace := s.Workspace[name]
|
|
if !inWorkspace {
|
|
var ok bool
|
|
bp, ok = s.Blueprints[name]
|
|
if !ok {
|
|
return nil, false
|
|
}
|
|
}
|
|
|
|
// cockpit-composer cannot deal with missing "packages" or "modules"
|
|
if bp.Packages == nil {
|
|
bp.Packages = []blueprint.Package{}
|
|
}
|
|
if bp.Modules == nil {
|
|
bp.Modules = []blueprint.Package{}
|
|
}
|
|
if bp.Groups == nil {
|
|
bp.Groups = []blueprint.Group{}
|
|
}
|
|
if bp.Version == "" {
|
|
bp.Version = "0.0.0"
|
|
}
|
|
|
|
return &bp, inWorkspace
|
|
}
|
|
|
|
func (s *Store) GetBlueprintCommitted(name string) *blueprint.Blueprint {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
bp, ok := s.Blueprints[name]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
// cockpit-composer cannot deal with missing "packages" or "modules"
|
|
if bp.Packages == nil {
|
|
bp.Packages = []blueprint.Package{}
|
|
}
|
|
if bp.Modules == nil {
|
|
bp.Modules = []blueprint.Package{}
|
|
}
|
|
if bp.Groups == nil {
|
|
bp.Groups = []blueprint.Group{}
|
|
}
|
|
if bp.Version == "" {
|
|
bp.Version = "0.0.0"
|
|
}
|
|
|
|
return &bp
|
|
}
|
|
|
|
func (s *Store) GetBlueprintChange(name string, commit string) *blueprint.Change {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
change, ok := s.BlueprintsChanges[name][commit]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return &change
|
|
}
|
|
|
|
func (s *Store) GetBlueprintChanges(name string) []blueprint.Change {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
var changes []blueprint.Change
|
|
|
|
for _, change := range s.BlueprintsChanges[name] {
|
|
changes = append(changes, change)
|
|
}
|
|
|
|
return changes
|
|
}
|
|
|
|
func bumpVersion(str string) string {
|
|
v := [3]uint64{}
|
|
fields := strings.SplitN(str, ".", 3)
|
|
for i := 0; i < len(fields); i++ {
|
|
if n, err := strconv.ParseUint(fields[i], 10, 64); err == nil {
|
|
v[i] = n
|
|
} else {
|
|
// don't touch strings with invalid versions
|
|
return str
|
|
}
|
|
}
|
|
|
|
return fmt.Sprintf("%d.%d.%d", v[0], v[1], v[2]+1)
|
|
}
|
|
|
|
func (s *Store) PushBlueprint(bp blueprint.Blueprint, commitMsg string) {
|
|
s.change(func() error {
|
|
hash := sha1.New()
|
|
// Hash timestamp to create unique hash
|
|
hash.Write([]byte(time.Now().String()))
|
|
// Get hash as a byte slice
|
|
commitBytes := hash.Sum(nil)
|
|
// Get hash as a hex string
|
|
commit := hex.EncodeToString(commitBytes)
|
|
timestamp := time.Now().Format("2006-01-02T15:04:05Z")
|
|
change := blueprint.Change{
|
|
Commit: commit,
|
|
Message: commitMsg,
|
|
Timestamp: timestamp,
|
|
Blueprint: bp,
|
|
}
|
|
|
|
delete(s.Workspace, bp.Name)
|
|
if s.BlueprintsChanges[bp.Name] == nil {
|
|
s.BlueprintsChanges[bp.Name] = make(map[string]blueprint.Change)
|
|
}
|
|
s.BlueprintsChanges[bp.Name][commit] = change
|
|
|
|
if old, ok := s.Blueprints[bp.Name]; ok {
|
|
if bp.Version == "" || bp.Version == old.Version {
|
|
bp.Version = bumpVersion(old.Version)
|
|
}
|
|
}
|
|
s.Blueprints[bp.Name] = bp
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (s *Store) PushBlueprintToWorkspace(bp blueprint.Blueprint) {
|
|
s.change(func() error {
|
|
s.Workspace[bp.Name] = bp
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (s *Store) DeleteBlueprint(name string) {
|
|
s.change(func() error {
|
|
delete(s.Workspace, name)
|
|
delete(s.Blueprints, name)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (s *Store) DeleteBlueprintFromWorkspace(name string) {
|
|
s.change(func() error {
|
|
delete(s.Workspace, name)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (s *Store) GetCompose(id uuid.UUID) (Compose, bool) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
compose, exists := s.Composes[id]
|
|
return compose, exists
|
|
}
|
|
|
|
func (s *Store) GetAllComposes() map[uuid.UUID]Compose {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
composes := make(map[uuid.UUID]Compose)
|
|
|
|
for id, compose := range s.Composes {
|
|
newCompose := compose
|
|
newCompose.Targets = []*target.Target{}
|
|
|
|
for _, t := range compose.Targets {
|
|
newTarget := *t
|
|
newCompose.Targets = append(newCompose.Targets, &newTarget)
|
|
}
|
|
|
|
newBlueprint := *compose.Blueprint
|
|
newCompose.Blueprint = &newBlueprint
|
|
|
|
composes[id] = newCompose
|
|
}
|
|
|
|
return composes
|
|
}
|
|
|
|
func (s *Store) GetComposeResult(id uuid.UUID) (io.ReadCloser, error) {
|
|
if s.stateDir == nil {
|
|
return ioutil.NopCloser(bytes.NewBuffer([]byte("{}"))), nil
|
|
}
|
|
return os.Open(*s.stateDir + "/outputs/" + id.String() + "/result.json")
|
|
}
|
|
|
|
func (s *Store) PushCompose(composeID uuid.UUID, bp *blueprint.Blueprint, checksums map[string]string, arch, composeType string, uploadTarget *target.Target) error {
|
|
targets := []*target.Target{}
|
|
|
|
if s.stateDir != nil {
|
|
targets = append(targets, target.NewLocalTarget(
|
|
&target.LocalTargetOptions{
|
|
Location: *s.stateDir + "/outputs/" + composeID.String(),
|
|
},
|
|
))
|
|
}
|
|
|
|
if uploadTarget != nil {
|
|
targets = append(targets, uploadTarget)
|
|
}
|
|
|
|
repos := []rpmmd.RepoConfig{}
|
|
for _, source := range s.Sources {
|
|
repos = append(repos, source.RepoConfig())
|
|
}
|
|
|
|
pipeline, err := s.distro.Pipeline(bp, repos, checksums, arch, composeType, 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
s.change(func() error {
|
|
s.Composes[composeID] = Compose{
|
|
QueueStatus: "WAITING",
|
|
Blueprint: bp,
|
|
Pipeline: pipeline,
|
|
OutputType: composeType,
|
|
Targets: targets,
|
|
JobCreated: time.Now(),
|
|
}
|
|
return nil
|
|
})
|
|
s.pendingJobs <- Job{
|
|
ComposeID: composeID,
|
|
Distro: s.distro.Name(),
|
|
Pipeline: pipeline,
|
|
Targets: targets,
|
|
OutputType: composeType,
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Store) DeleteCompose(id uuid.UUID) error {
|
|
return s.change(func() error {
|
|
compose, exists := s.Composes[id]
|
|
|
|
if !exists {
|
|
return &NotFoundError{}
|
|
}
|
|
|
|
if compose.QueueStatus != "FINISHED" && compose.QueueStatus != "FAILED" {
|
|
return &InvalidRequestError{fmt.Sprintf("Compose %s is not in FINISHED or FAILED.", id)}
|
|
}
|
|
|
|
delete(s.Composes, id)
|
|
|
|
var err error
|
|
if s.stateDir != nil {
|
|
err = os.RemoveAll(*s.stateDir + "/outputs/" + id.String())
|
|
}
|
|
|
|
return err
|
|
})
|
|
}
|
|
|
|
func (s *Store) PopCompose() Job {
|
|
job := <-s.pendingJobs
|
|
s.change(func() error {
|
|
compose, exists := s.Composes[job.ComposeID]
|
|
if !exists || compose.QueueStatus != "WAITING" {
|
|
panic("Invalid job in queue.")
|
|
}
|
|
compose.JobStarted = time.Now()
|
|
compose.QueueStatus = "RUNNING"
|
|
for _, t := range compose.Targets {
|
|
t.Status = "RUNNING"
|
|
}
|
|
s.Composes[job.ComposeID] = compose
|
|
return nil
|
|
})
|
|
return job
|
|
}
|
|
|
|
func (s *Store) UpdateCompose(composeID uuid.UUID, status string, image *Image) error {
|
|
return s.change(func() error {
|
|
compose, exists := s.Composes[composeID]
|
|
if !exists {
|
|
return &NotFoundError{"compose does not exist"}
|
|
}
|
|
if compose.QueueStatus == "WAITING" {
|
|
return &NotPendingError{"compose has not been popped"}
|
|
}
|
|
switch status {
|
|
case "RUNNING":
|
|
switch compose.QueueStatus {
|
|
case "RUNNING":
|
|
default:
|
|
return &NotRunningError{"compose was not running"}
|
|
}
|
|
case "FINISHED", "FAILED":
|
|
switch compose.QueueStatus {
|
|
case "RUNNING":
|
|
compose.JobFinished = time.Now()
|
|
default:
|
|
return &NotRunningError{"compose was not running"}
|
|
}
|
|
compose.QueueStatus = status
|
|
for _, t := range compose.Targets {
|
|
t.Status = status
|
|
}
|
|
|
|
if status == "FINISHED" {
|
|
compose.Image = image
|
|
}
|
|
|
|
s.Composes[composeID] = compose
|
|
default:
|
|
return &InvalidRequestError{"invalid state transition"}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (s *Store) PushSource(source SourceConfig) {
|
|
s.change(func() error {
|
|
s.Sources[source.Name] = source
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (s *Store) DeleteSource(name string) {
|
|
s.change(func() error {
|
|
delete(s.Sources, name)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (s *Store) ListSources() []string {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
names := make([]string, 0, len(s.Sources))
|
|
for name := range s.Sources {
|
|
names = append(names, name)
|
|
}
|
|
sort.Strings(names)
|
|
|
|
return names
|
|
}
|
|
|
|
func (s *Store) GetSource(name string) *SourceConfig {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
source, ok := s.Sources[name]
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return &source
|
|
}
|
|
|
|
func (s *Store) GetAllSources() map[string]SourceConfig {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
sources := make(map[string]SourceConfig)
|
|
|
|
for k, v := range s.Sources {
|
|
sources[k] = v
|
|
}
|
|
|
|
return sources
|
|
}
|
|
|
|
func NewSourceConfig(repo rpmmd.RepoConfig, system bool) SourceConfig {
|
|
sc := SourceConfig{
|
|
Name: repo.Id,
|
|
CheckGPG: true,
|
|
CheckSSL: !repo.IgnoreSSL,
|
|
System: system,
|
|
}
|
|
|
|
if repo.BaseURL != "" {
|
|
sc.URL = repo.BaseURL
|
|
sc.Type = "yum-baseurl"
|
|
} else if repo.Metalink != "" {
|
|
sc.URL = repo.Metalink
|
|
sc.Type = "yum-metalink"
|
|
} else if repo.MirrorList != "" {
|
|
sc.URL = repo.MirrorList
|
|
sc.Type = "yum-mirrorlist"
|
|
}
|
|
|
|
return sc
|
|
}
|
|
|
|
func (s *SourceConfig) RepoConfig() rpmmd.RepoConfig {
|
|
var repo rpmmd.RepoConfig
|
|
|
|
repo.Name = s.Name
|
|
repo.Id = s.Name
|
|
repo.IgnoreSSL = !s.CheckSSL
|
|
|
|
if s.Type == "yum-baseurl" {
|
|
repo.BaseURL = s.URL
|
|
} else if s.Type == "yum-metalink" {
|
|
repo.Metalink = s.URL
|
|
} else if s.Type == "yum-mirrorlist" {
|
|
repo.MirrorList = s.URL
|
|
}
|
|
|
|
return repo
|
|
}
|