Add jobqueue package
Now that the "old" `jobqueue` package was renamed to `worker`, add a new package that contains an interface to an actual job queue. Also add two implementations: fsjobqueue, a job queue backed by the file system, and testjobqueue, which can be used as a mock implementation for testing. These packages are not yet used.
This commit is contained in:
parent
9968358c49
commit
dde3d459f5
5 changed files with 841 additions and 0 deletions
340
internal/jobqueue/fsjobqueue/fsjobqueue.go
Normal file
340
internal/jobqueue/fsjobqueue/fsjobqueue.go
Normal file
|
|
@ -0,0 +1,340 @@
|
|||
// Package fsjobqueue implements a filesystem-backed job queue. It implements
|
||||
// the interfaces in package jobqueue.
|
||||
//
|
||||
// Jobs are stored in the file system, using the `jsondb` package. However,
|
||||
// this package does not use the file system as a database, but keeps some
|
||||
// state in memory. This means that access to a given directory myst be
|
||||
// exclusive to only one `fsJobQueue` object at a time. A single `fsJobQueue`
|
||||
// can be safely accessed from multiple goroutines, though.
|
||||
//
|
||||
// Data is stored non-reduntantly. Any data structure necessary for efficient
|
||||
// access (e.g., dependants) are kept in memory.
|
||||
package fsjobqueue
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/osbuild/osbuild-composer/internal/jobqueue"
|
||||
"github.com/osbuild/osbuild-composer/internal/jsondb"
|
||||
)
|
||||
|
||||
type fsJobQueue struct {
|
||||
db *jsondb.JSONDatabase
|
||||
|
||||
// Maps job types to channels of job ids for that type. Only access
|
||||
// through pendingChannel() to ensure concurrent access is restricted
|
||||
// by the mutex.
|
||||
pending map[string]chan uuid.UUID
|
||||
pendingMutex sync.Mutex
|
||||
|
||||
// Maps job ids to the jobs that depend on it, if any of those
|
||||
// dependants have not yet finished. Only acccess while holding the
|
||||
// associated mutex.
|
||||
dependants map[uuid.UUID][]uuid.UUID
|
||||
dependantsMutex sync.Mutex
|
||||
}
|
||||
|
||||
// On-disk job struct. Contains all necessary (but non-redundant) information
|
||||
// about a job. These are not held in memory by the job queue, but
|
||||
// (de)serialized on each access.
|
||||
type job struct {
|
||||
Id uuid.UUID `json:"id"`
|
||||
Type string `json:"type"`
|
||||
Args json.RawMessage `json:"args,omitempty"`
|
||||
Dependencies []uuid.UUID `json:"dependencies"`
|
||||
Result json.RawMessage `json:"result,omitempty"`
|
||||
|
||||
Status jobqueue.JobStatus `json:"status"`
|
||||
QueuedAt time.Time `json:"queued-at,omitempty"`
|
||||
StartedAt time.Time `json:"started-at,omitempty"`
|
||||
FinishedAt time.Time `json:"finished-at,omitempty"`
|
||||
}
|
||||
|
||||
// Create a new fsJobQueue object for `dir`. This object must have exclusive
|
||||
// access to `dir`. If `dir` contains jobs created from previous runs, they are
|
||||
// loaded and rescheduled to run if necessary.
|
||||
func New(dir string) (*fsJobQueue, error) {
|
||||
q := &fsJobQueue{
|
||||
db: jsondb.New(dir, 0600),
|
||||
pending: make(map[string]chan uuid.UUID),
|
||||
dependants: make(map[uuid.UUID][]uuid.UUID),
|
||||
}
|
||||
|
||||
// Look for jobs that are still pending and build the dependant map.
|
||||
ids, err := q.db.List()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error listing jobs: %v", err)
|
||||
}
|
||||
for _, id := range ids {
|
||||
uuid, err := uuid.Parse(id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid job '%s' in db: %v", id, err)
|
||||
}
|
||||
j, err := q.readJob(uuid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// We only enqueue jobs that were previously pending.
|
||||
if j.Status != jobqueue.JobPending {
|
||||
continue
|
||||
}
|
||||
// Initialize dependants for this job.
|
||||
q.dependantsMutex.Lock()
|
||||
defer q.dependantsMutex.Unlock()
|
||||
for _, dep := range j.Dependencies {
|
||||
q.dependants[dep] = append(q.dependants[dep], j.Id)
|
||||
}
|
||||
// Enqueue a job if all its dependencies (if there are any)
|
||||
// have finished, but the job itself hasn't run yet.
|
||||
n, err := q.countFinishedJobs(j.Dependencies)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if n == len(j.Dependencies) {
|
||||
q.pendingChannel(j.Type) <- j.Id
|
||||
}
|
||||
}
|
||||
|
||||
return q, nil
|
||||
}
|
||||
|
||||
func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID) (uuid.UUID, error) {
|
||||
var j = job{
|
||||
Id: uuid.New(),
|
||||
Type: jobType,
|
||||
Dependencies: uniqueUUIDList(dependencies),
|
||||
Status: jobqueue.JobPending,
|
||||
QueuedAt: time.Now(),
|
||||
}
|
||||
|
||||
var err error
|
||||
j.Args, err = json.Marshal(args)
|
||||
if err != nil {
|
||||
return uuid.Nil, fmt.Errorf("error marshaling job arguments: %v", err)
|
||||
}
|
||||
|
||||
// Verify dependencies and check how many of them are already finished.
|
||||
finished, err := q.countFinishedJobs(j.Dependencies)
|
||||
if err != nil {
|
||||
return uuid.Nil, err
|
||||
}
|
||||
|
||||
// Write the job before updating in-memory state, so that the latter
|
||||
// doesn't become corrupt when writing fails.
|
||||
err = q.db.Write(j.Id.String(), j)
|
||||
if err != nil {
|
||||
return uuid.Nil, fmt.Errorf("cannot write job: %v:", err)
|
||||
}
|
||||
|
||||
// If all dependencies have finished, or there are none, queue the job.
|
||||
// Otherwise, update dependants so that this check is done again when
|
||||
// FinishJob() is called for a dependency.
|
||||
if finished == len(j.Dependencies) {
|
||||
q.pendingChannel(j.Type) <- j.Id
|
||||
} else {
|
||||
q.dependantsMutex.Lock()
|
||||
defer q.dependantsMutex.Unlock()
|
||||
for _, id := range j.Dependencies {
|
||||
q.dependants[id] = append(q.dependants[id], j.Id)
|
||||
}
|
||||
}
|
||||
|
||||
return j.Id, nil
|
||||
}
|
||||
|
||||
func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string, args interface{}) (uuid.UUID, error) {
|
||||
// Return early if the conext is already canceled.
|
||||
if err := ctx.Err(); err != nil {
|
||||
return uuid.Nil, err
|
||||
}
|
||||
|
||||
// Use reflect.Select(), because the `select` statement cannot operate
|
||||
// on an unknown amount of channels.
|
||||
cases := []reflect.SelectCase{
|
||||
{
|
||||
Dir: reflect.SelectRecv,
|
||||
Chan: reflect.ValueOf(ctx.Done()),
|
||||
},
|
||||
}
|
||||
for _, jt := range jobTypes {
|
||||
cases = append(cases, reflect.SelectCase{
|
||||
Dir: reflect.SelectRecv,
|
||||
Chan: reflect.ValueOf(q.pendingChannel(jt)),
|
||||
})
|
||||
}
|
||||
|
||||
chosen, value, recvOK := reflect.Select(cases)
|
||||
if chosen == 0 && !recvOK {
|
||||
return uuid.Nil, ctx.Err()
|
||||
}
|
||||
// pending channels are never closed
|
||||
if !recvOK {
|
||||
panic("pending channel was closed unexpectedly")
|
||||
}
|
||||
id := value.Interface().(uuid.UUID)
|
||||
|
||||
j, err := q.readJob(id)
|
||||
if err != nil {
|
||||
return uuid.Nil, err
|
||||
}
|
||||
|
||||
err = json.Unmarshal(j.Args, args)
|
||||
if err != nil {
|
||||
return uuid.Nil, fmt.Errorf("error unmarshaling arguments for job '%s': %v", j.Id, err)
|
||||
}
|
||||
|
||||
j.Status = jobqueue.JobRunning
|
||||
j.StartedAt = time.Now()
|
||||
|
||||
err = q.db.Write(id.String(), j)
|
||||
if err != nil {
|
||||
return uuid.Nil, fmt.Errorf("error writing job %s: %v", id, err)
|
||||
}
|
||||
|
||||
return j.Id, nil
|
||||
}
|
||||
|
||||
func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
|
||||
j, err := q.readJob(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if j.Status != jobqueue.JobRunning {
|
||||
return jobqueue.ErrNotRunning
|
||||
}
|
||||
|
||||
j.Status = jobqueue.JobFinished
|
||||
j.FinishedAt = time.Now()
|
||||
|
||||
j.Result, err = json.Marshal(result)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error marshaling result: %v", err)
|
||||
}
|
||||
|
||||
// Write before notifying dependants, because it will be read again.
|
||||
err = q.db.Write(id.String(), j)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error writing job %s: %v", id, err)
|
||||
}
|
||||
|
||||
q.dependantsMutex.Lock()
|
||||
defer q.dependantsMutex.Unlock()
|
||||
for _, depid := range q.dependants[id] {
|
||||
dep, err := q.readJob(depid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
n, err := q.countFinishedJobs(dep.Dependencies)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if n == len(dep.Dependencies) {
|
||||
q.pendingChannel(dep.Type) <- dep.Id
|
||||
}
|
||||
}
|
||||
delete(q.dependants, id)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (q *fsJobQueue) JobStatus(id uuid.UUID, result interface{}) (status jobqueue.JobStatus, queued, started, finished time.Time, err error) {
|
||||
var j *job
|
||||
|
||||
j, err = q.readJob(id)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if j.Status == jobqueue.JobFinished {
|
||||
err = json.Unmarshal(j.Result, result)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error unmarshaling result for job '%s': %v", id, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
status = j.Status
|
||||
queued = j.QueuedAt
|
||||
started = j.StartedAt
|
||||
finished = j.FinishedAt
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Returns the number of finished jobs in `ids`.
|
||||
func (q *fsJobQueue) countFinishedJobs(ids []uuid.UUID) (int, error) {
|
||||
n := 0
|
||||
for _, id := range ids {
|
||||
j, err := q.readJob(id)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if j.Status == jobqueue.JobFinished {
|
||||
n += 1
|
||||
}
|
||||
}
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// Reads job with `id`. This is a thin wrapper around `q.db.Read`, which
|
||||
// returns the job directly, or and error if a job with `id` does not exist.
|
||||
func (q *fsJobQueue) readJob(id uuid.UUID) (*job, error) {
|
||||
var j job
|
||||
exists, err := q.db.Read(id.String(), &j)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error reading job '%s': %v", id, err)
|
||||
}
|
||||
if !exists {
|
||||
// return corrupt database?
|
||||
return nil, jobqueue.ErrNotExist
|
||||
}
|
||||
return &j, nil
|
||||
}
|
||||
|
||||
// Safe access to the pending channel for `jobType`. Channels are created on
|
||||
// demand.
|
||||
func (q *fsJobQueue) pendingChannel(jobType string) chan uuid.UUID {
|
||||
q.pendingMutex.Lock()
|
||||
defer q.pendingMutex.Unlock()
|
||||
|
||||
c, exists := q.pending[jobType]
|
||||
if !exists {
|
||||
c = make(chan uuid.UUID, 100)
|
||||
q.pending[jobType] = c
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// Sorts and removes duplicates from `ids`.
|
||||
func uniqueUUIDList(ids []uuid.UUID) []uuid.UUID {
|
||||
s := map[uuid.UUID]bool{}
|
||||
for _, id := range ids {
|
||||
s[id] = true
|
||||
}
|
||||
|
||||
l := []uuid.UUID{}
|
||||
for id := range s {
|
||||
l = append(l, id)
|
||||
}
|
||||
|
||||
sort.Slice(l, func(i, j int) bool {
|
||||
for b := 0; b < 16; b++ {
|
||||
if l[i][b] < l[j][b] {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
return l
|
||||
}
|
||||
35
internal/jobqueue/fsjobqueue/fsjobqueue_private_test.go
Normal file
35
internal/jobqueue/fsjobqueue/fsjobqueue_private_test.go
Normal file
|
|
@ -0,0 +1,35 @@
|
|||
package fsjobqueue
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func uuidList(t *testing.T, strs ...string) []uuid.UUID {
|
||||
var err error
|
||||
ids := make([]uuid.UUID, len(strs))
|
||||
for i, s := range strs {
|
||||
ids[i], err = uuid.Parse(s)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
func TestUniqueUUIDList(t *testing.T) {
|
||||
l := uniqueUUIDList([]uuid.UUID{})
|
||||
require.Empty(t, l)
|
||||
|
||||
s := uuidList(t, "8ad6bbcd-55f9-4cd8-be45-d0370ff079d2", "a0ad7428-b813-4efb-a156-da2b524f4868", "36e5817c-f29d-4043-8d7d-95ffaa77ff88")
|
||||
l = uniqueUUIDList(s)
|
||||
require.ElementsMatch(t, s, l)
|
||||
|
||||
s = uuidList(t, "8ad6bbcd-55f9-4cd8-be45-d0370ff079d2", "8ad6bbcd-55f9-4cd8-be45-d0370ff079d2")
|
||||
l = uniqueUUIDList(s)
|
||||
require.ElementsMatch(t, uuidList(t, "8ad6bbcd-55f9-4cd8-be45-d0370ff079d2"), l)
|
||||
|
||||
s = uuidList(t, "8ad6bbcd-55f9-4cd8-be45-d0370ff079d2", "a0ad7428-b813-4efb-a156-da2b524f4868", "8ad6bbcd-55f9-4cd8-be45-d0370ff079d2")
|
||||
l = uniqueUUIDList(s)
|
||||
require.ElementsMatch(t, uuidList(t, "8ad6bbcd-55f9-4cd8-be45-d0370ff079d2", "a0ad7428-b813-4efb-a156-da2b524f4868"), l)
|
||||
}
|
||||
164
internal/jobqueue/fsjobqueue/fsjobqueue_test.go
Normal file
164
internal/jobqueue/fsjobqueue/fsjobqueue_test.go
Normal file
|
|
@ -0,0 +1,164 @@
|
|||
package fsjobqueue_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/osbuild/osbuild-composer/internal/jobqueue"
|
||||
"github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue"
|
||||
)
|
||||
|
||||
type testResult struct {
|
||||
}
|
||||
|
||||
func cleanupTempDir(t *testing.T, dir string) {
|
||||
err := os.RemoveAll(dir)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func newTemporaryQueue(t *testing.T) (jobqueue.JobQueue, string) {
|
||||
dir, err := ioutil.TempDir("", "jobqueue-test-")
|
||||
require.NoError(t, err)
|
||||
|
||||
q, err := fsjobqueue.New(dir)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, q)
|
||||
|
||||
return q, dir
|
||||
}
|
||||
|
||||
func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID) uuid.UUID {
|
||||
id, err := q.Enqueue(jobType, args, dependencies)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, id)
|
||||
return id
|
||||
}
|
||||
|
||||
func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result interface{}) uuid.UUID {
|
||||
id, err := q.Dequeue(context.Background(), []string{jobType}, &json.RawMessage{})
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, id)
|
||||
|
||||
err = q.FinishJob(id, result)
|
||||
require.NoError(t, err)
|
||||
|
||||
return id
|
||||
}
|
||||
|
||||
func TestNonExistant(t *testing.T) {
|
||||
q, err := fsjobqueue.New("/non-existant-directory")
|
||||
require.Error(t, err)
|
||||
require.Nil(t, q)
|
||||
}
|
||||
|
||||
func TestErrors(t *testing.T) {
|
||||
q, dir := newTemporaryQueue(t)
|
||||
defer cleanupTempDir(t, dir)
|
||||
|
||||
// not serializable to JSON
|
||||
id, err := q.Enqueue("test", make(chan string), nil)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, uuid.Nil, id)
|
||||
|
||||
// invalid dependency
|
||||
id, err = q.Enqueue("test", "arg0", []uuid.UUID{uuid.New()})
|
||||
require.Error(t, err)
|
||||
require.Equal(t, uuid.Nil, id)
|
||||
}
|
||||
|
||||
func TestArgs(t *testing.T) {
|
||||
type argument struct {
|
||||
I int
|
||||
S string
|
||||
}
|
||||
|
||||
q, dir := newTemporaryQueue(t)
|
||||
defer cleanupTempDir(t, dir)
|
||||
|
||||
oneargs := argument{7, "🐠"}
|
||||
one := pushTestJob(t, q, "fish", oneargs, nil)
|
||||
|
||||
twoargs := argument{42, "🐙"}
|
||||
two := pushTestJob(t, q, "octopus", twoargs, nil)
|
||||
|
||||
var args argument
|
||||
id, err := q.Dequeue(context.Background(), []string{"octopus"}, &args)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, two, id)
|
||||
require.Equal(t, twoargs, args)
|
||||
|
||||
id, err = q.Dequeue(context.Background(), []string{"fish"}, &args)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, one, id)
|
||||
require.Equal(t, oneargs, args)
|
||||
}
|
||||
|
||||
func TestJobTypes(t *testing.T) {
|
||||
q, dir := newTemporaryQueue(t)
|
||||
defer cleanupTempDir(t, dir)
|
||||
|
||||
one := pushTestJob(t, q, "octopus", nil, nil)
|
||||
two := pushTestJob(t, q, "clownfish", nil, nil)
|
||||
|
||||
require.Equal(t, two, finishNextTestJob(t, q, "clownfish", testResult{}))
|
||||
require.Equal(t, one, finishNextTestJob(t, q, "octopus", testResult{}))
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancel()
|
||||
id, err := q.Dequeue(ctx, []string{"zebra"}, nil)
|
||||
require.Equal(t, err, context.Canceled)
|
||||
require.Equal(t, uuid.Nil, id)
|
||||
}
|
||||
|
||||
func TestDependencies(t *testing.T) {
|
||||
q, dir := newTemporaryQueue(t)
|
||||
defer cleanupTempDir(t, dir)
|
||||
|
||||
t.Run("done-before-pushing-dependant", func(t *testing.T) {
|
||||
one := pushTestJob(t, q, "test", nil, nil)
|
||||
two := pushTestJob(t, q, "test", nil, nil)
|
||||
|
||||
r := []uuid.UUID{}
|
||||
r = append(r, finishNextTestJob(t, q, "test", testResult{}))
|
||||
r = append(r, finishNextTestJob(t, q, "test", testResult{}))
|
||||
require.ElementsMatch(t, []uuid.UUID{one, two}, r)
|
||||
|
||||
j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
|
||||
status, _, _, _, err := q.JobStatus(j, nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, jobqueue.JobPending, status)
|
||||
|
||||
require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}))
|
||||
|
||||
status, _, _, _, err = q.JobStatus(j, &testResult{})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, jobqueue.JobFinished, status)
|
||||
})
|
||||
|
||||
t.Run("done-after-pushing-dependant", func(t *testing.T) {
|
||||
one := pushTestJob(t, q, "test", nil, nil)
|
||||
two := pushTestJob(t, q, "test", nil, nil)
|
||||
|
||||
j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
|
||||
status, _, _, _, err := q.JobStatus(j, nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, jobqueue.JobPending, status)
|
||||
|
||||
r := []uuid.UUID{}
|
||||
r = append(r, finishNextTestJob(t, q, "test", testResult{}))
|
||||
r = append(r, finishNextTestJob(t, q, "test", testResult{}))
|
||||
require.ElementsMatch(t, []uuid.UUID{one, two}, r)
|
||||
|
||||
require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}))
|
||||
|
||||
status, _, _, _, err = q.JobStatus(j, &testResult{})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, jobqueue.JobFinished, status)
|
||||
})
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue