store: make the Store a package of its own

This encapsulates all the state of osbuild, no longer just the state specific
to the weldr API. Make it a first class type, preparing for direct access from
both the weldr and jobqueue APIs.

Signed-off-by: Tom Gundersen <teg@jklm.no>
This commit is contained in:
Tom Gundersen 2019-10-05 17:07:47 +02:00 committed by Lars Karlitski
parent 2e979c8b82
commit 4fcb5d66fa
2 changed files with 70 additions and 62 deletions

View file

@ -1,4 +1,6 @@
package weldr
// Package store contains primitives for representing and changing the
// osbuild-composer state.
package store
import (
"encoding/json"
@ -15,10 +17,12 @@ import (
"github.com/google/uuid"
)
type store struct {
// A Store contains all the persistent state of osbuild-composer, and is serialized
// on every change, and deserialized on start.
type Store struct {
Blueprints map[string]blueprint.Blueprint `json:"blueprints"`
Workspace map[string]blueprint.Blueprint `json:"workspace"`
Composes map[uuid.UUID]compose `json:"composes"`
Composes map[uuid.UUID]Compose `json:"composes"`
mu sync.RWMutex // protects all fields
pendingJobs chan<- job.Job
@ -26,7 +30,9 @@ type store struct {
stateChannel chan<- []byte
}
type compose struct {
// A Compose represent the task of building one image. It contains all the information
// necessary to generate the inputs for the job, as well as the job's state.
type Compose struct {
QueueStatus string `json:"queue_status"`
Blueprint *blueprint.Blueprint `json:"blueprint"`
OutputType string `json:"output-type"`
@ -36,8 +42,15 @@ type compose struct {
JobFinished time.Time `json:"job_finished"`
}
func newStore(initialState []byte, stateChannel chan<- []byte, pendingJobs chan<- job.Job, jobUpdates <-chan job.Status) *store {
var s store
// An Image represents the image resulting from a compose.
type Image struct {
File *os.File
Name string
Mime string
}
func New(initialState []byte, stateChannel chan<- []byte, pendingJobs chan<- job.Job, jobUpdates <-chan job.Status) *Store {
var s Store
if initialState != nil {
err := json.Unmarshal(initialState, &s)
@ -54,7 +67,7 @@ func newStore(initialState []byte, stateChannel chan<- []byte, pendingJobs chan<
}
if s.Composes == nil {
// TODO: push waiting/running composes to workers again
s.Composes = make(map[uuid.UUID]compose)
s.Composes = make(map[uuid.UUID]Compose)
}
s.stateChannel = stateChannel
s.pendingJobs = pendingJobs
@ -87,7 +100,7 @@ func newStore(initialState []byte, stateChannel chan<- []byte, pendingJobs chan<
return &s
}
func (s *store) change(f func()) {
func (s *Store) change(f func()) {
s.mu.Lock()
defer s.mu.Unlock()
@ -104,7 +117,7 @@ func (s *store) change(f func()) {
}
}
func (s *store) listBlueprints() []string {
func (s *Store) ListBlueprints() []string {
s.mu.RLock()
defer s.mu.RUnlock()
@ -117,7 +130,7 @@ func (s *store) listBlueprints() []string {
return names
}
type composeEntry struct {
type ComposeEntry struct {
ID uuid.UUID `json:"id"`
Blueprint string `json:"blueprint"`
QueueStatus string `json:"queue_status"`
@ -126,21 +139,21 @@ type composeEntry struct {
JobFinished float64 `json:"job_finished,omitempty"`
}
func (s *store) listQueue(uuids []uuid.UUID) []*composeEntry {
func (s *Store) ListQueue(uuids []uuid.UUID) []*ComposeEntry {
s.mu.RLock()
defer s.mu.RUnlock()
newCompose := func(id uuid.UUID, compose compose) *composeEntry {
newCompose := func(id uuid.UUID, compose Compose) *ComposeEntry {
switch compose.QueueStatus {
case "WAITING":
return &composeEntry{
return &ComposeEntry{
ID: id,
Blueprint: compose.Blueprint.Name,
QueueStatus: compose.QueueStatus,
JobCreated: float64(compose.JobCreated.UnixNano()) / 1000000000,
}
case "RUNNING":
return &composeEntry{
return &ComposeEntry{
ID: id,
Blueprint: compose.Blueprint.Name,
QueueStatus: compose.QueueStatus,
@ -148,7 +161,7 @@ func (s *store) listQueue(uuids []uuid.UUID) []*composeEntry {
JobStarted: float64(compose.JobStarted.UnixNano()) / 1000000000,
}
case "FAILED", "FINISHED":
return &composeEntry{
return &ComposeEntry{
ID: id,
Blueprint: compose.Blueprint.Name,
QueueStatus: compose.QueueStatus,
@ -161,14 +174,14 @@ func (s *store) listQueue(uuids []uuid.UUID) []*composeEntry {
}
}
var composes []*composeEntry
var composes []*ComposeEntry
if uuids == nil {
composes = make([]*composeEntry, 0, len(s.Composes))
composes = make([]*ComposeEntry, 0, len(s.Composes))
for id, compose := range s.Composes {
composes = append(composes, newCompose(id, compose))
}
} else {
composes = make([]*composeEntry, 0, len(uuids))
composes = make([]*ComposeEntry, 0, len(uuids))
for _, id := range uuids {
if compose, exists := s.Composes[id]; exists {
composes = append(composes, newCompose(id, compose))
@ -179,7 +192,7 @@ func (s *store) listQueue(uuids []uuid.UUID) []*composeEntry {
return composes
}
func (s *store) getBlueprint(name string, bp *blueprint.Blueprint, changed *bool) bool {
func (s *Store) GetBlueprint(name string, bp *blueprint.Blueprint, changed *bool) bool {
s.mu.RLock()
defer s.mu.RUnlock()
@ -214,7 +227,7 @@ func (s *store) getBlueprint(name string, bp *blueprint.Blueprint, changed *bool
return true
}
func (s *store) getBlueprintCommitted(name string, bp *blueprint.Blueprint) bool {
func (s *Store) GetBlueprintCommitted(name string, bp *blueprint.Blueprint) bool {
s.mu.RLock()
defer s.mu.RUnlock()
@ -241,38 +254,38 @@ func (s *store) getBlueprintCommitted(name string, bp *blueprint.Blueprint) bool
return true
}
func (s *store) pushBlueprint(bp blueprint.Blueprint) {
func (s *Store) PushBlueprint(bp blueprint.Blueprint) {
s.change(func() {
delete(s.Workspace, bp.Name)
s.Blueprints[bp.Name] = bp
})
}
func (s *store) pushBlueprintToWorkspace(bp blueprint.Blueprint) {
func (s *Store) PushBlueprintToWorkspace(bp blueprint.Blueprint) {
s.change(func() {
s.Workspace[bp.Name] = bp
})
}
func (s *store) deleteBlueprint(name string) {
func (s *Store) DeleteBlueprint(name string) {
s.change(func() {
delete(s.Workspace, name)
delete(s.Blueprints, name)
})
}
func (s *store) deleteBlueprintFromWorkspace(name string) {
func (s *Store) DeleteBlueprintFromWorkspace(name string) {
s.change(func() {
delete(s.Workspace, name)
})
}
func (s *store) addCompose(composeID uuid.UUID, bp *blueprint.Blueprint, composeType string) {
func (s *Store) AddCompose(composeID uuid.UUID, bp *blueprint.Blueprint, composeType string) {
targets := []*target.Target{
target.NewLocalTarget(target.NewLocalTargetOptions("/var/lib/osbuild-composer/outputs/" + composeID.String())),
}
s.change(func() {
s.Composes[composeID] = compose{
s.Composes[composeID] = Compose{
QueueStatus: "WAITING",
Blueprint: bp,
OutputType: composeType,
@ -287,13 +300,7 @@ func (s *store) addCompose(composeID uuid.UUID, bp *blueprint.Blueprint, compose
}
}
type image struct {
file *os.File
name string
mime string
}
func (s *store) getImage(composeID uuid.UUID) (*image, error) {
func (s *Store) GetImage(composeID uuid.UUID) (*Image, error) {
s.mu.RLock()
defer s.mu.RUnlock()
@ -307,10 +314,10 @@ func (s *store) getImage(composeID uuid.UUID) (*image, error) {
case *target.LocalTargetOptions:
file, err := os.Open(options.Location + "/" + name)
if err == nil {
return &image{
file: file,
name: name,
mime: mime,
return &Image{
File: file,
Name: name,
Mime: mime,
}, nil
}
}

View file

@ -16,10 +16,11 @@ import (
"osbuild-composer/internal/blueprint"
"osbuild-composer/internal/job"
"osbuild-composer/internal/rpmmd"
"osbuild-composer/internal/store"
)
type API struct {
store *store
store *store.Store
repo rpmmd.RepoConfig
packages rpmmd.PackageList
@ -32,7 +33,7 @@ func New(repo rpmmd.RepoConfig, packages rpmmd.PackageList, logger *log.Logger,
// This needs to be shared with the worker API so that they can communicate with each other
// builds := make(chan queue.Build, 200)
api := &API{
store: newStore(initialState, stateChannel, jobs, jobStatus),
store: store.New(initialState, stateChannel, jobs, jobStatus),
repo: repo,
packages: packages,
logger: logger,
@ -40,7 +41,7 @@ func New(repo rpmmd.RepoConfig, packages rpmmd.PackageList, logger *log.Logger,
// sample blueprint on first run
if initialState == nil {
api.store.pushBlueprint(blueprint.Blueprint{
api.store.PushBlueprint(blueprint.Blueprint{
Name: "example",
Description: "An Example",
Version: "1",
@ -385,7 +386,7 @@ func (api *API) blueprintsListHandler(writer http.ResponseWriter, request *http.
return
}
names := api.store.listBlueprints()
names := api.store.ListBlueprints()
total := uint(len(names))
offset = min(offset, total)
limit = min(limit, total-offset)
@ -420,7 +421,7 @@ func (api *API) blueprintsInfoHandler(writer http.ResponseWriter, request *http.
for _, name := range names {
var blueprint blueprint.Blueprint
var changed bool
if !api.store.getBlueprint(name, &blueprint, &changed) {
if !api.store.GetBlueprint(name, &blueprint, &changed) {
statusResponseError(writer, http.StatusNotFound)
return
}
@ -454,7 +455,7 @@ func (api *API) blueprintsDepsolveHandler(writer http.ResponseWriter, request *h
blueprints := []entry{}
for _, name := range names {
var blueprint blueprint.Blueprint
if !api.store.getBlueprint(name, &blueprint, nil) {
if !api.store.GetBlueprint(name, &blueprint, nil) {
statusResponseError(writer, http.StatusNotFound)
return
}
@ -513,7 +514,7 @@ func (api *API) blueprintsDiffHandler(writer http.ResponseWriter, request *http.
// Fetch old and new blueprint details from store and return error if not found
var oldBlueprint, newBlueprint blueprint.Blueprint
if !api.store.getBlueprintCommitted(name, &oldBlueprint) || !api.store.getBlueprint(name, &newBlueprint, nil) {
if !api.store.GetBlueprintCommitted(name, &oldBlueprint) || !api.store.GetBlueprint(name, &newBlueprint, nil) {
statusResponseError(writer, http.StatusNotFound)
return
}
@ -563,7 +564,7 @@ func (api *API) blueprintsNewHandler(writer http.ResponseWriter, request *http.R
return
}
api.store.pushBlueprint(blueprint)
api.store.PushBlueprint(blueprint)
statusResponseOK(writer)
}
@ -582,18 +583,18 @@ func (api *API) blueprintsWorkspaceHandler(writer http.ResponseWriter, request *
return
}
api.store.pushBlueprintToWorkspace(blueprint)
api.store.PushBlueprintToWorkspace(blueprint)
statusResponseOK(writer)
}
func (api *API) blueprintDeleteHandler(writer http.ResponseWriter, request *http.Request, params httprouter.Params) {
api.store.deleteBlueprint(params.ByName("blueprint"))
api.store.DeleteBlueprint(params.ByName("blueprint"))
statusResponseOK(writer)
}
func (api *API) blueprintDeleteWorkspaceHandler(writer http.ResponseWriter, request *http.Request, params httprouter.Params) {
api.store.deleteBlueprintFromWorkspace(params.ByName("blueprint"))
api.store.DeleteBlueprintFromWorkspace(params.ByName("blueprint"))
statusResponseOK(writer)
}
@ -631,10 +632,10 @@ func (api *API) composeHandler(writer http.ResponseWriter, httpRequest *http.Req
bp := blueprint.Blueprint{}
changed := false
found := api.store.getBlueprint(cr.BlueprintName, &bp, &changed) // TODO: what to do with changed?
found := api.store.GetBlueprint(cr.BlueprintName, &bp, &changed) // TODO: what to do with changed?
if found {
api.store.addCompose(reply.BuildID, &bp, cr.ComposeType)
api.store.AddCompose(reply.BuildID, &bp, cr.ComposeType)
} else {
statusResponseError(writer, http.StatusBadRequest, "blueprint does not exist")
return
@ -662,14 +663,14 @@ func (api *API) composeTypesHandler(writer http.ResponseWriter, request *http.Re
func (api *API) composeQueueHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) {
var reply struct {
New []*composeEntry `json:"new"`
Run []*composeEntry `json:"run"`
New []*store.ComposeEntry `json:"new"`
Run []*store.ComposeEntry `json:"run"`
}
reply.New = make([]*composeEntry, 0)
reply.Run = make([]*composeEntry, 0)
reply.New = make([]*store.ComposeEntry, 0)
reply.Run = make([]*store.ComposeEntry, 0)
for _, entry := range api.store.listQueue(nil) {
for _, entry := range api.store.ListQueue(nil) {
switch entry.QueueStatus {
case "WAITING":
reply.New = append(reply.New, entry)
@ -683,7 +684,7 @@ func (api *API) composeQueueHandler(writer http.ResponseWriter, request *http.Re
func (api *API) composeStatusHandler(writer http.ResponseWriter, request *http.Request, params httprouter.Params) {
var reply struct {
UUIDs []*composeEntry `json:"uuids"`
UUIDs []*store.ComposeEntry `json:"uuids"`
}
uuidStrings := strings.Split(params.ByName("uuids"), ",")
@ -696,7 +697,7 @@ func (api *API) composeStatusHandler(writer http.ResponseWriter, request *http.R
}
uuids = append(uuids, id)
}
reply.UUIDs = api.store.listQueue(uuids)
reply.UUIDs = api.store.ListQueue(uuids)
json.NewEncoder(writer).Encode(reply)
}
@ -709,23 +710,23 @@ func (api *API) composeImageHandler(writer http.ResponseWriter, request *http.Re
return
}
image, err := api.store.getImage(id)
image, err := api.store.GetImage(id)
if err != nil {
statusResponseError(writer, http.StatusNotFound, "image for compose not found")
return
}
stat, err := image.file.Stat()
stat, err := image.File.Stat()
if err != nil {
statusResponseError(writer, http.StatusInternalServerError)
return
}
writer.Header().Set("Content-Disposition", "attachment; filename="+id.String()+"-"+image.name)
writer.Header().Set("Content-Type", image.mime)
writer.Header().Set("Content-Disposition", "attachment; filename="+id.String()+"-"+image.Name)
writer.Header().Set("Content-Type", image.Mime)
writer.Header().Set("Content-Length", fmt.Sprintf("%d", stat.Size()))
io.Copy(writer, image.file)
io.Copy(writer, image.File)
}
func (api *API) composeFinishedHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) {