debian-forge-composer/internal/store/store.go
Jacob Kozol 1b46a83522 store: add getBlueprintChange update blueprint commit message
Add function to get a single blueprint change given the blueprint name
and the commit hash. Update the PushBlueprint function to allow a commit
message to be passed to it which will be used when adding a change.
2019-11-14 20:36:17 +01:00

606 lines
14 KiB
Go

// Package store contains primitives for representing and changing the
// osbuild-composer state.
package store
import (
"crypto/sha1"
"encoding/hex"
"encoding/json"
"io/ioutil"
"log"
"os"
"path/filepath"
"sort"
"sync"
"time"
"github.com/osbuild/osbuild-composer/internal/blueprint"
"github.com/osbuild/osbuild-composer/internal/pipeline"
"github.com/osbuild/osbuild-composer/internal/target"
"github.com/osbuild/osbuild-composer/internal/distro"
_ "github.com/osbuild/osbuild-composer/internal/distro/fedora30"
"github.com/google/uuid"
)
// A Store contains all the persistent state of osbuild-composer, and is serialized
// on every change, and deserialized on start.
type Store struct {
Blueprints map[string]blueprint.Blueprint `json:"blueprints"`
Workspace map[string]blueprint.Blueprint `json:"workspace"`
Composes map[uuid.UUID]Compose `json:"composes"`
Sources map[string]SourceConfig `json:"sources"`
BlueprintsChanges map[string]map[string]blueprint.Change `json:"changes"`
mu sync.RWMutex // protects all fields
pendingJobs chan Job
stateChannel chan []byte
}
// A Compose represent the task of building one image. It contains all the information
// necessary to generate the inputs for the job, as well as the job's state.
type Compose struct {
QueueStatus string `json:"queue_status"`
Blueprint *blueprint.Blueprint `json:"blueprint"`
OutputType string `json:"output-type"`
Targets []*target.Target `json:"targets"`
JobCreated time.Time `json:"job_created"`
JobStarted time.Time `json:"job_started"`
JobFinished time.Time `json:"job_finished"`
}
// A Job contains the information about a compose a worker needs to process it.
type Job struct {
ComposeID uuid.UUID
Pipeline *pipeline.Pipeline
Targets []*target.Target
}
// An Image represents the image resulting from a compose.
type Image struct {
File *os.File
Name string
Mime string
Size int64
}
type SourceConfig struct {
Name string `json:"name"`
Type string `json:"type"`
URL string `json:"url"`
CheckGPG bool `json:"check_gpg"`
CheckSSL bool `json:"check_ssl"`
System bool `json:"system"`
}
type NotFoundError struct {
message string
}
func (e *NotFoundError) Error() string {
return e.message
}
type NotPendingError struct {
message string
}
func (e *NotPendingError) Error() string {
return e.message
}
type NotRunningError struct {
message string
}
func (e *NotRunningError) Error() string {
return e.message
}
type InvalidRequestError struct {
message string
}
func (e *InvalidRequestError) Error() string {
return e.message
}
func New(stateFile *string) *Store {
var s Store
if stateFile != nil {
state, err := ioutil.ReadFile(*stateFile)
if state != nil {
err := json.Unmarshal(state, &s)
if err != nil {
log.Fatalf("invalid initial state: %v", err)
}
} else if !os.IsNotExist(err) {
log.Fatalf("cannot read state: %v", err)
}
s.stateChannel = make(chan []byte, 128)
go func() {
for {
err := writeFileAtomically(*stateFile, <-s.stateChannel, 0755)
if err != nil {
log.Fatalf("cannot write state: %v", err)
}
}
}()
}
if s.Blueprints == nil {
s.Blueprints = make(map[string]blueprint.Blueprint)
}
if s.Workspace == nil {
s.Workspace = make(map[string]blueprint.Blueprint)
}
if s.Composes == nil {
// TODO: push waiting/running composes to workers again
s.Composes = make(map[uuid.UUID]Compose)
}
if s.Sources == nil {
s.Sources = make(map[string]SourceConfig)
}
if s.BlueprintsChanges == nil {
s.BlueprintsChanges = make(map[string]map[string]blueprint.Change)
}
s.pendingJobs = make(chan Job, 200)
return &s
}
func writeFileAtomically(filename string, data []byte, mode os.FileMode) error {
dir, name := filepath.Dir(filename), filepath.Base(filename)
tmpfile, err := ioutil.TempFile(dir, name+"-*.tmp")
if err != nil {
return err
}
_, err = tmpfile.Write(data)
if err != nil {
os.Remove(tmpfile.Name())
return err
}
err = tmpfile.Chmod(mode)
if err != nil {
return err
}
err = tmpfile.Close()
if err != nil {
os.Remove(tmpfile.Name())
return err
}
err = os.Rename(tmpfile.Name(), filename)
if err != nil {
os.Remove(tmpfile.Name())
return err
}
return nil
}
func (s *Store) change(f func() error) error {
s.mu.Lock()
defer s.mu.Unlock()
result := f()
if s.stateChannel != nil {
serialized, err := json.Marshal(s)
if err != nil {
// we ought to know all types that go into the store
panic(err)
}
s.stateChannel <- serialized
}
return result
}
func (s *Store) ListBlueprints() []string {
s.mu.RLock()
defer s.mu.RUnlock()
names := make([]string, 0, len(s.Blueprints))
for name := range s.Blueprints {
names = append(names, name)
}
sort.Strings(names)
return names
}
type ComposeEntry struct {
ID uuid.UUID `json:"id"`
Blueprint string `json:"blueprint"`
Version string `json:"version"`
ComposeType string `json:"compose_type"`
ImageSize int64 `json:"image_size"`
QueueStatus string `json:"queue_status"`
JobCreated float64 `json:"job_created"`
JobStarted float64 `json:"job_started,omitempty"`
JobFinished float64 `json:"job_finished,omitempty"`
}
func (s *Store) ListQueue(uuids []uuid.UUID) []*ComposeEntry {
s.mu.RLock()
defer s.mu.RUnlock()
newCompose := func(id uuid.UUID, compose Compose) *ComposeEntry {
switch compose.QueueStatus {
case "WAITING":
return &ComposeEntry{
ID: id,
Blueprint: compose.Blueprint.Name,
Version: compose.Blueprint.Version,
ComposeType: compose.OutputType,
QueueStatus: compose.QueueStatus,
JobCreated: float64(compose.JobCreated.UnixNano()) / 1000000000,
}
case "RUNNING":
return &ComposeEntry{
ID: id,
Blueprint: compose.Blueprint.Name,
Version: compose.Blueprint.Version,
ComposeType: compose.OutputType,
QueueStatus: compose.QueueStatus,
JobCreated: float64(compose.JobCreated.UnixNano()) / 1000000000,
JobStarted: float64(compose.JobStarted.UnixNano()) / 1000000000,
}
case "FINISHED":
image, err := s.GetImage(id)
imageSize := int64(0)
if err == nil {
imageSize = image.Size
}
return &ComposeEntry{
ID: id,
Blueprint: compose.Blueprint.Name,
Version: compose.Blueprint.Version,
ComposeType: compose.OutputType,
ImageSize: imageSize,
QueueStatus: compose.QueueStatus,
JobCreated: float64(compose.JobCreated.UnixNano()) / 1000000000,
JobStarted: float64(compose.JobStarted.UnixNano()) / 1000000000,
JobFinished: float64(compose.JobFinished.UnixNano()) / 1000000000,
}
case "FAILED":
return &ComposeEntry{
ID: id,
Blueprint: compose.Blueprint.Name,
Version: compose.Blueprint.Version,
ComposeType: compose.OutputType,
QueueStatus: compose.QueueStatus,
JobCreated: float64(compose.JobCreated.UnixNano()) / 1000000000,
JobStarted: float64(compose.JobStarted.UnixNano()) / 1000000000,
JobFinished: float64(compose.JobFinished.UnixNano()) / 1000000000,
}
default:
panic("invalid compose state")
}
}
var composes []*ComposeEntry
if uuids == nil {
composes = make([]*ComposeEntry, 0, len(s.Composes))
for id, compose := range s.Composes {
composes = append(composes, newCompose(id, compose))
}
} else {
composes = make([]*ComposeEntry, 0, len(uuids))
for _, id := range uuids {
if compose, exists := s.Composes[id]; exists {
composes = append(composes, newCompose(id, compose))
}
}
}
return composes
}
func (s *Store) GetBlueprint(name string, bp *blueprint.Blueprint, changed *bool) bool {
s.mu.RLock()
defer s.mu.RUnlock()
var inWorkspace bool
*bp, inWorkspace = s.Workspace[name]
if !inWorkspace {
var ok bool
*bp, ok = s.Blueprints[name]
if !ok {
return false
}
}
// cockpit-composer cannot deal with missing "packages" or "modules"
if bp.Packages == nil {
bp.Packages = []blueprint.Package{}
}
if bp.Modules == nil {
bp.Modules = []blueprint.Package{}
}
if bp.Groups == nil {
bp.Groups = []blueprint.Group{}
}
if bp.Version == "" {
bp.Version = "0.0.0"
}
if changed != nil {
*changed = inWorkspace
}
return true
}
func (s *Store) GetBlueprintCommitted(name string, bp *blueprint.Blueprint) bool {
s.mu.RLock()
defer s.mu.RUnlock()
var ok bool
*bp, ok = s.Blueprints[name]
if !ok {
return false
}
// cockpit-composer cannot deal with missing "packages" or "modules"
if bp.Packages == nil {
bp.Packages = []blueprint.Package{}
}
if bp.Modules == nil {
bp.Modules = []blueprint.Package{}
}
if bp.Groups == nil {
bp.Groups = []blueprint.Group{}
}
if bp.Version == "" {
bp.Version = "0.0.0"
}
return true
}
func (s *Store) GetBlueprintChange(name string, commit string) *blueprint.Change {
s.mu.RLock()
defer s.mu.RUnlock()
change, ok := s.BlueprintsChanges[name][commit]
if !ok {
return nil
}
return &change
}
func (s *Store) GetBlueprintChanges(name string) []blueprint.Change {
s.mu.RLock()
defer s.mu.RUnlock()
var changes []blueprint.Change
for _, change := range s.BlueprintsChanges[name] {
changes = append(changes, change)
}
return changes
}
func (s *Store) PushBlueprint(bp blueprint.Blueprint, commitMsg string) {
s.change(func() error {
hash := sha1.New()
// Hash timestamp to create unique hash
hash.Write([]byte(time.Now().String()))
// Get hash as a byte slice
commitBytes := hash.Sum(nil)
// Get hash as a hex string
commit := hex.EncodeToString(commitBytes)
timestamp := time.Now().Format("2006-01-02T15:04:05Z")
change := blueprint.Change{
Commit: commit,
Message: commitMsg,
Timestamp: timestamp,
Blueprint: bp,
}
delete(s.Workspace, bp.Name)
if s.BlueprintsChanges[bp.Name] == nil {
s.BlueprintsChanges[bp.Name] = make(map[string]blueprint.Change)
}
s.BlueprintsChanges[bp.Name][commit] = change
s.Blueprints[bp.Name] = bp
return nil
})
}
func (s *Store) PushBlueprintToWorkspace(bp blueprint.Blueprint) {
s.change(func() error {
s.Workspace[bp.Name] = bp
return nil
})
}
func (s *Store) DeleteBlueprint(name string) {
s.change(func() error {
delete(s.Workspace, name)
delete(s.Blueprints, name)
return nil
})
}
func (s *Store) DeleteBlueprintFromWorkspace(name string) {
s.change(func() error {
delete(s.Workspace, name)
return nil
})
}
func (s *Store) PushCompose(composeID uuid.UUID, bp *blueprint.Blueprint, composeType string) error {
targets := []*target.Target{
target.NewLocalTarget(target.NewLocalTargetOptions("/var/lib/osbuild-composer/outputs/" + composeID.String())),
}
d := distro.New("")
pipeline, err := d.Pipeline(bp, composeType)
if err != nil {
return err
}
s.change(func() error {
s.Composes[composeID] = Compose{
QueueStatus: "WAITING",
Blueprint: bp,
OutputType: composeType,
Targets: targets,
JobCreated: time.Now(),
}
return nil
})
s.pendingJobs <- Job{
ComposeID: composeID,
Pipeline: pipeline,
Targets: targets,
}
return nil
}
func (s *Store) PopCompose() Job {
job := <-s.pendingJobs
s.change(func() error {
compose, exists := s.Composes[job.ComposeID]
if !exists || compose.QueueStatus != "WAITING" {
panic("Invalid job in queue.")
}
compose.JobStarted = time.Now()
compose.QueueStatus = "RUNNING"
s.Composes[job.ComposeID] = compose
return nil
})
return job
}
func (s *Store) UpdateCompose(composeID uuid.UUID, status string) error {
return s.change(func() error {
compose, exists := s.Composes[composeID]
if !exists {
return &NotFoundError{"compose does not exist"}
}
if compose.QueueStatus == "WAITING" {
return &NotPendingError{"compose has not been popped"}
}
switch status {
case "RUNNING":
switch compose.QueueStatus {
case "RUNNING":
default:
return &NotRunningError{"compose was not running"}
}
case "FINISHED", "FAILED":
switch compose.QueueStatus {
case "RUNNING":
compose.JobFinished = time.Now()
default:
return &NotRunningError{"compose was not running"}
}
compose.QueueStatus = status
s.Composes[composeID] = compose
default:
return &InvalidRequestError{"invalid state transition"}
}
return nil
})
}
func (s *Store) GetImage(composeID uuid.UUID) (*Image, error) {
s.mu.RLock()
defer s.mu.RUnlock()
if compose, exists := s.Composes[composeID]; exists {
if compose.QueueStatus != "FINISHED" {
return nil, &InvalidRequestError{"compose was not finished"}
}
d := distro.New("")
name, mime, err := d.FilenameFromType(compose.OutputType)
if err != nil {
panic("invalid output type")
}
for _, t := range compose.Targets {
switch options := t.Options.(type) {
case *target.LocalTargetOptions:
file, err := os.Open(options.Location + "/" + name)
if err == nil {
fileStat, err := file.Stat()
if err != nil {
return nil, &NotFoundError{"image info could not be found"}
}
size := fileStat.Size()
return &Image{
File: file,
Name: name,
Mime: mime,
Size: size,
}, nil
}
}
}
return nil, &NotFoundError{"image could not be found"}
}
return nil, &NotFoundError{"compose could not be found"}
}
func (s *Store) PushSource(source SourceConfig) {
s.change(func() error {
s.Sources[source.Name] = source
return nil
})
}
func (s *Store) DeleteSource(name string) {
s.change(func() error {
delete(s.Sources, name)
return nil
})
}
func (s *Store) ListSources() []string {
s.mu.RLock()
defer s.mu.RUnlock()
names := make([]string, 0, len(s.Sources))
for name := range s.Sources {
names = append(names, name)
}
sort.Strings(names)
return names
}
func (s *Store) GetSource(name string) *SourceConfig {
s.mu.RLock()
defer s.mu.RUnlock()
source, ok := s.Sources[name]
if !ok {
return nil
}
return &source
}
func (s *Store) GetAllSources() map[string]SourceConfig {
s.mu.RLock()
defer s.mu.RUnlock()
sources := make(map[string]SourceConfig)
for k, v := range s.Sources {
sources[k] = v
}
return sources
}