jsondb: introduce a simple JSON database
weldr's store is quite complex and handled serialization itself. Move that part out into a separate package `jsondb`. This package is more generic than the store needs: it can write an arbitrary amount of JSON documents while the store only needs one: state.json. This is in preparation for future work, which introduces a queue package that builds on top of `jsondb`.
This commit is contained in:
parent
6bf31423a2
commit
17f4281648
4 changed files with 273 additions and 66 deletions
106
internal/jsondb/db.go
Normal file
106
internal/jsondb/db.go
Normal file
|
|
@ -0,0 +1,106 @@
|
|||
// Package jsondb implements a simple database of JSON documents, backed by the
|
||||
// file system.
|
||||
//
|
||||
// It supports two operations: Read() and Write(). Their signatures mirror
|
||||
// those of json.Unmarshal() and json.Marshal():
|
||||
//
|
||||
// err := db.Write("my-string", "octopus")
|
||||
//
|
||||
// var v string
|
||||
// exists, err := db.Read("my-string", &v)
|
||||
//
|
||||
// The JSON documents are stored in a directory, in the form name.json (name as
|
||||
// passed to Read() and Write()). Thus, names may only contain characters that
|
||||
// may appear in filenames.
|
||||
|
||||
package jsondb
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
type JSONDatabase struct {
|
||||
dir string
|
||||
perm os.FileMode
|
||||
}
|
||||
|
||||
// Create a new JSONDatabase in `dir`. Each document that is saved to it will
|
||||
// have a file mode of `perm`.
|
||||
func New(dir string, perm os.FileMode) *JSONDatabase {
|
||||
return &JSONDatabase{dir, perm}
|
||||
}
|
||||
|
||||
// Reads the value at `name`. `document` must deserializable from JSON. Returns
|
||||
// false if a document with `name` does not exist.
|
||||
func (db *JSONDatabase) Read(name string, document interface{}) (bool, error) {
|
||||
f, err := os.Open(path.Join(db.dir, name+".json"))
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return false, nil
|
||||
}
|
||||
return false, fmt.Errorf("error accessing db file %s: %w", name, err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
err = json.NewDecoder(f).Decode(&document)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error reading db file %s: %w", name, err)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Writes `document` to `name`, overwriting a previous document if it exists.
|
||||
// `document` must be serializable to JSON.
|
||||
func (db *JSONDatabase) Write(name string, document interface{}) error {
|
||||
return writeFileAtomically(db.dir, name+".json", db.perm, func(f *os.File) error {
|
||||
return json.NewEncoder(f).Encode(document)
|
||||
})
|
||||
}
|
||||
|
||||
// writeFileAtomically writes data to `filename` in `directory` atomically, by
|
||||
// first creating a temporary file in `directory` and only moving it when
|
||||
// writing succeeded. `writer` gets passed the open file handle to write to and
|
||||
// does not need to take care of closing it.
|
||||
func writeFileAtomically(dir, filename string, mode os.FileMode, writer func(f *os.File) error) error {
|
||||
tmpfile, err := ioutil.TempFile(dir, filename+"-*.tmp")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Remove `tmpfile` in each error case. We cannot use `defer` here,
|
||||
// because `tmpfile` shouldn't be removed when everything works: it
|
||||
// will be renamed to `filename`. Ignore errors from `os.Remove()`,
|
||||
// because the error relating to `tempfile` is more relevant.
|
||||
|
||||
err = tmpfile.Chmod(mode)
|
||||
if err != nil {
|
||||
_ = os.Remove(tmpfile.Name())
|
||||
return fmt.Errorf("error setting permissions on %s: %w", tmpfile.Name(), err)
|
||||
}
|
||||
|
||||
err = writer(tmpfile)
|
||||
if err != nil {
|
||||
_ = os.Remove(tmpfile.Name())
|
||||
return fmt.Errorf("error writing to %s: %w", tmpfile.Name(), err)
|
||||
}
|
||||
|
||||
err = tmpfile.Close()
|
||||
if err != nil {
|
||||
_ = os.Remove(tmpfile.Name())
|
||||
return fmt.Errorf("error closing %s: %w", tmpfile.Name(), err)
|
||||
}
|
||||
|
||||
err = os.Rename(tmpfile.Name(), path.Join(dir, filename))
|
||||
if err != nil {
|
||||
_ = os.Remove(tmpfile.Name())
|
||||
return fmt.Errorf("error moving %s to %s: %w", filepath.Base(tmpfile.Name()), filename, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
64
internal/jsondb/db_private_test.go
Normal file
64
internal/jsondb/db_private_test.go
Normal file
|
|
@ -0,0 +1,64 @@
|
|||
package jsondb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestWriteFileAtomically(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "jsondb-test-")
|
||||
require.NoError(t, err)
|
||||
|
||||
defer func() {
|
||||
err := os.RemoveAll(dir)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
t.Run("success", func(t *testing.T) {
|
||||
octopus := []byte("🐙\n")
|
||||
|
||||
// use an uncommon mode to check it's set correctly
|
||||
perm := os.FileMode(0750)
|
||||
|
||||
err = writeFileAtomically(dir, "octopus", perm, func(f *os.File) error {
|
||||
_, err := f.Write(octopus)
|
||||
return err
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// ensure that there are no stray temporary files
|
||||
infos, err := ioutil.ReadDir(dir)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(infos))
|
||||
require.Equal(t, "octopus", infos[0].Name())
|
||||
require.Equal(t, perm, infos[0].Mode())
|
||||
|
||||
filename := path.Join(dir, "octopus")
|
||||
contents, err := ioutil.ReadFile(filename)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, octopus, contents)
|
||||
|
||||
err = os.Remove(filename)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("error", func(t *testing.T) {
|
||||
err = writeFileAtomically(dir, "no-octopus", 0750, func(f *os.File) error {
|
||||
return errors.New("something went wrong")
|
||||
})
|
||||
require.Error(t, err)
|
||||
|
||||
_, err := os.Stat(path.Join(dir, "no-octopus"))
|
||||
require.Error(t, err)
|
||||
|
||||
// ensure there are no stray temporary files
|
||||
infos, err := ioutil.ReadDir(dir)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 0, len(infos))
|
||||
})
|
||||
}
|
||||
89
internal/jsondb/db_test.go
Normal file
89
internal/jsondb/db_test.go
Normal file
|
|
@ -0,0 +1,89 @@
|
|||
package jsondb_test
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/osbuild/osbuild-composer/internal/jsondb"
|
||||
)
|
||||
|
||||
type document struct {
|
||||
Animal string `json:"animal"`
|
||||
CanSwim bool `json:"can-swim"`
|
||||
}
|
||||
|
||||
// If the passed directory is not readable (writable), we should notice on the
|
||||
// first read (write).
|
||||
func TestDegenerate(t *testing.T) {
|
||||
db := jsondb.New("/non-existant-directory", 0755)
|
||||
|
||||
var d document
|
||||
exist, err := db.Read("one", &d)
|
||||
assert.False(t, exist)
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = db.Write("one", &d)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestCorrupt(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "jsondb-test-")
|
||||
require.NoError(t, err)
|
||||
|
||||
defer func() {
|
||||
err := os.RemoveAll(dir)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
err = ioutil.WriteFile(path.Join(dir, "one.json"), []byte("{"), 0755)
|
||||
require.NoError(t, err)
|
||||
|
||||
db := jsondb.New(dir, 0755)
|
||||
|
||||
var d document
|
||||
_, err = db.Read("one", &d)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestMultiple(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "jsondb-test-")
|
||||
require.NoError(t, err)
|
||||
|
||||
defer func() {
|
||||
err := os.RemoveAll(dir)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
perm := os.FileMode(0600)
|
||||
documents := map[string]document{
|
||||
"one": document{"octopus", true},
|
||||
"two": document{"zebra", false},
|
||||
"three": document{"clownfish", true},
|
||||
}
|
||||
|
||||
db := jsondb.New(dir, perm)
|
||||
|
||||
for name, doc := range documents {
|
||||
err = db.Write(name, doc)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
infos, err := ioutil.ReadDir(dir)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(infos), len(documents))
|
||||
for _, info := range infos {
|
||||
require.Equal(t, perm, info.Mode())
|
||||
}
|
||||
|
||||
for name, doc := range documents {
|
||||
var d document
|
||||
exist, err := db.Read(name, &d)
|
||||
require.NoError(t, err)
|
||||
require.True(t, exist)
|
||||
require.Equalf(t, doc, d, "error retrieving document '%s'", name)
|
||||
}
|
||||
}
|
||||
|
|
@ -14,12 +14,12 @@ import (
|
|||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/osbuild/osbuild-composer/internal/compose"
|
||||
"github.com/osbuild/osbuild-composer/internal/jsondb"
|
||||
"github.com/osbuild/osbuild-composer/internal/osbuild"
|
||||
|
||||
"github.com/osbuild/osbuild-composer/internal/blueprint"
|
||||
|
|
@ -32,6 +32,9 @@ import (
|
|||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// The name under which to save the store to the underlying jsondb
|
||||
const StoreDBName = "state"
|
||||
|
||||
// A Store contains all the persistent state of osbuild-composer, and is serialized
|
||||
// on every change, and deserialized on start.
|
||||
type Store struct {
|
||||
|
|
@ -42,10 +45,10 @@ type Store struct {
|
|||
BlueprintsChanges map[string]map[string]blueprint.Change `json:"changes"`
|
||||
BlueprintsCommits map[string][]string `json:"commits"`
|
||||
|
||||
mu sync.RWMutex // protects all fields
|
||||
pendingJobs chan Job
|
||||
stateChannel chan []byte
|
||||
stateDir *string
|
||||
mu sync.RWMutex // protects all fields
|
||||
pendingJobs chan Job
|
||||
stateDir *string
|
||||
db *jsondb.JSONDatabase
|
||||
}
|
||||
|
||||
// A Job contains the information about a compose a worker needs to process it.
|
||||
|
|
@ -114,28 +117,11 @@ func New(stateDir *string) *Store {
|
|||
log.Fatalf("cannot create output directory")
|
||||
}
|
||||
|
||||
stateFile := *stateDir + "/state.json"
|
||||
state, err := ioutil.ReadFile(stateFile)
|
||||
|
||||
if err == nil {
|
||||
err := json.Unmarshal(state, &s)
|
||||
if err != nil {
|
||||
log.Fatalf("invalid initial state: %v", err)
|
||||
}
|
||||
} else if !os.IsNotExist(err) {
|
||||
s.db = jsondb.New(*stateDir, 0600)
|
||||
_, err = s.db.Read(StoreDBName, &s)
|
||||
if err != nil {
|
||||
log.Fatalf("cannot read state: %v", err)
|
||||
}
|
||||
|
||||
s.stateChannel = make(chan []byte, 128)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
err := writeFileAtomically(stateFile, <-s.stateChannel, 0600)
|
||||
if err != nil {
|
||||
log.Fatalf("cannot write state: %v", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
s.pendingJobs = make(chan Job, 200)
|
||||
|
|
@ -225,43 +211,6 @@ func New(stateDir *string) *Store {
|
|||
return &s
|
||||
}
|
||||
|
||||
func writeFileAtomically(filename string, data []byte, mode os.FileMode) error {
|
||||
dir, name := filepath.Dir(filename), filepath.Base(filename)
|
||||
|
||||
tmpfile, err := ioutil.TempFile(dir, name+"-*.tmp")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = tmpfile.Write(data)
|
||||
if err != nil {
|
||||
// FIXME: handle or comment this possible error
|
||||
_ = os.Remove(tmpfile.Name())
|
||||
return err
|
||||
}
|
||||
|
||||
err = tmpfile.Chmod(mode)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = tmpfile.Close()
|
||||
if err != nil {
|
||||
// FIXME: handle or comment this possible error
|
||||
_ = os.Remove(tmpfile.Name())
|
||||
return err
|
||||
}
|
||||
|
||||
err = os.Rename(tmpfile.Name(), filename)
|
||||
if err != nil {
|
||||
// FIXME: handle or comment this possible error
|
||||
_ = os.Remove(tmpfile.Name())
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func randomSHA1String() (string, error) {
|
||||
hash := sha1.New()
|
||||
data := make([]byte, 20)
|
||||
|
|
@ -284,14 +233,13 @@ func (s *Store) change(f func() error) error {
|
|||
|
||||
result := f()
|
||||
|
||||
if s.stateChannel != nil {
|
||||
serialized, err := json.Marshal(s)
|
||||
if s.stateDir != nil {
|
||||
err := s.db.Write(StoreDBName, s)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
s.stateChannel <- serialized
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue