From 68ade23fcc7d9535f2bd3ebd13911edc28f15263 Mon Sep 17 00:00:00 2001 From: Tom Gundersen Date: Sat, 5 Oct 2019 21:29:55 +0200 Subject: [PATCH] store: make state (de)serialization internal This hides the state hanlding in the store package. As before, it can be disabled by passing `nil` instead of the path to the state file. Signed-off-by: Tom Gundersen --- cmd/osbuild-composer/main.go | 57 ++---------------------------- internal/jobqueue/api_test.go | 2 +- internal/store/store.go | 65 +++++++++++++++++++++++++++++++---- internal/weldr/api_test.go | 6 ++-- 4 files changed, 65 insertions(+), 65 deletions(-) diff --git a/cmd/osbuild-composer/main.go b/cmd/osbuild-composer/main.go index 2f47d2236..419be5db1 100644 --- a/cmd/osbuild-composer/main.go +++ b/cmd/osbuild-composer/main.go @@ -2,10 +2,8 @@ package main import ( "flag" - "io/ioutil" "log" "os" - "path/filepath" "osbuild-composer/internal/jobqueue" "osbuild-composer/internal/rpmmd" @@ -15,13 +13,13 @@ import ( "github.com/coreos/go-systemd/activation" ) -const StateFile = "/var/lib/osbuild-composer/state.json" - func main() { var verbose bool flag.BoolVar(&verbose, "v", false, "Print access log") flag.Parse() + stateFile := "/var/lib/osbuild-composer/state.json" + listeners, err := activation.Listeners() if err != nil { panic(err) @@ -55,60 +53,11 @@ func main() { panic(err) } - state, err := ioutil.ReadFile(StateFile) - if err != nil && !os.IsNotExist(err) { - log.Fatalf("cannot read state: %v", err) - } - - stateChannel := make(chan []byte, 10) - - store := store.New(state, stateChannel) + store := store.New(&stateFile) jobAPI := jobqueue.New(logger, store) weldrAPI := weldr.New(repo, packages, logger, store) - go func() { - for { - err := writeFileAtomically(StateFile, <-stateChannel, 0755) - if err != nil { - log.Fatalf("cannot write state: %v", err) - } - } - }() go jobAPI.Serve(jobListener) weldrAPI.Serve(weldrListener) } - -func writeFileAtomically(filename string, data []byte, mode os.FileMode) error { - dir, name := filepath.Dir(filename), filepath.Base(filename) - - tmpfile, err := ioutil.TempFile(dir, name+"-*.tmp") - if err != nil { - return err - } - - _, err = tmpfile.Write(data) - if err != nil { - os.Remove(tmpfile.Name()) - return err - } - - err = tmpfile.Chmod(mode) - if err != nil { - return err - } - - err = tmpfile.Close() - if err != nil { - os.Remove(tmpfile.Name()) - return err - } - - err = os.Rename(tmpfile.Name(), filename) - if err != nil { - os.Remove(tmpfile.Name()) - return err - } - - return nil -} diff --git a/internal/jobqueue/api_test.go b/internal/jobqueue/api_test.go index 254ac9fd2..42e45dfd7 100644 --- a/internal/jobqueue/api_test.go +++ b/internal/jobqueue/api_test.go @@ -88,7 +88,7 @@ func TestBasic(t *testing.T) { //{"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"FINISHED"}`, http.StatusNotAllowed, ``}, } - store := store.New(nil, nil) + store := store.New(nil) api := jobqueue.New(nil, store) for _, c := range cases { id, _ := uuid.Parse("ffffffff-ffff-ffff-ffff-ffffffffffff") diff --git a/internal/store/store.go b/internal/store/store.go index 4c7debbd7..0074820d7 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -5,11 +5,13 @@ package store import ( "encoding/json" "errors" + "io/ioutil" "log" "os" "osbuild-composer/internal/blueprint" "osbuild-composer/internal/job" "osbuild-composer/internal/target" + "path/filepath" "sort" "sync" "time" @@ -26,7 +28,7 @@ type Store struct { mu sync.RWMutex // protects all fields pendingJobs chan job.Job - stateChannel chan<- []byte + stateChannel chan []byte } // A Compose represent the task of building one image. It contains all the information @@ -48,14 +50,30 @@ type Image struct { Mime string } -func New(initialState []byte, stateChannel chan<- []byte) *Store { +func New(stateFile *string) *Store { var s Store - if initialState != nil { - err := json.Unmarshal(initialState, &s) - if err != nil { - log.Fatalf("invalid initial state: %v", err) + if stateFile != nil { + state, err := ioutil.ReadFile(*stateFile) + if state != nil { + err := json.Unmarshal(state, &s) + if err != nil { + log.Fatalf("invalid initial state: %v", err) + } + } else if !os.IsNotExist(err) { + log.Fatalf("cannot read state: %v", err) } + + s.stateChannel = make(chan []byte, 128) + + go func() { + for { + err := writeFileAtomically(*stateFile, <-s.stateChannel, 0755) + if err != nil { + log.Fatalf("cannot write state: %v", err) + } + } + }() } if s.Blueprints == nil { @@ -68,12 +86,45 @@ func New(initialState []byte, stateChannel chan<- []byte) *Store { // TODO: push waiting/running composes to workers again s.Composes = make(map[uuid.UUID]Compose) } - s.stateChannel = stateChannel s.pendingJobs = make(chan job.Job, 200) return &s } +func writeFileAtomically(filename string, data []byte, mode os.FileMode) error { + dir, name := filepath.Dir(filename), filepath.Base(filename) + + tmpfile, err := ioutil.TempFile(dir, name+"-*.tmp") + if err != nil { + return err + } + + _, err = tmpfile.Write(data) + if err != nil { + os.Remove(tmpfile.Name()) + return err + } + + err = tmpfile.Chmod(mode) + if err != nil { + return err + } + + err = tmpfile.Close() + if err != nil { + os.Remove(tmpfile.Name()) + return err + } + + err = os.Rename(tmpfile.Name(), filename) + if err != nil { + os.Remove(tmpfile.Name()) + return err + } + + return nil +} + func (s *Store) change(f func()) { s.mu.Lock() defer s.mu.Unlock() diff --git a/internal/weldr/api_test.go b/internal/weldr/api_test.go index bfd88932d..70717803e 100644 --- a/internal/weldr/api_test.go +++ b/internal/weldr/api_test.go @@ -153,13 +153,13 @@ func TestBasic(t *testing.T) { } for _, c := range cases { - api := weldr.New(repo, packages, nil, store.New(nil, nil)) + api := weldr.New(repo, packages, nil, store.New(nil)) testRoute(t, api, "GET", c.Path, ``, c.ExpectedStatus, c.ExpectedJSON) } } func TestBlueprints(t *testing.T) { - api := weldr.New(repo, packages, nil, store.New(nil, nil)) + api := weldr.New(repo, packages, nil, store.New(nil)) testRoute(t, api, "POST", "/api/v0/blueprints/new", `{"name":"test","description":"Test","packages":[{"name":"httpd","version":"2.4.*"}],"version":"0.0.0"}`, @@ -182,7 +182,7 @@ func TestBlueprints(t *testing.T) { } func TestCompose(t *testing.T) { - api := weldr.New(repo, packages, nil, store.New(nil, nil)) + api := weldr.New(repo, packages, nil, store.New(nil)) testRoute(t, api, "POST", "/api/v0/blueprints/new", `{"name":"test","description":"Test","packages":[{"name":"httpd","version":"2.4.*"}],"version":"0.0.0"}`,