debian-forge-composer/internal/jobqueue/fsjobqueue/fsjobqueue.go
Tom Gundersen c777a18df0 jobqueue: expose dependencies when querying status
The status of a job may depend on the status of its dependenices,
as we do not repeat for instance the failed state in each dependent
job.

Return also the list of dependencies so these can be queried too.
2020-11-11 18:16:42 +01:00

361 lines
8.8 KiB
Go

// Package fsjobqueue implements a filesystem-backed job queue. It implements
// the interfaces in package jobqueue.
//
// Jobs are stored in the file system, using the `jsondb` package. However,
// this package does not use the file system as a database, but keeps some
// state in memory. This means that access to a given directory must be
// exclusive to only one `fsJobQueue` object at a time. A single `fsJobQueue`
// can be safely accessed from multiple goroutines, though.
//
// Data is stored non-reduntantly. Any data structure necessary for efficient
// access (e.g., dependants) are kept in memory.
package fsjobqueue
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"sync"
"time"
"github.com/google/uuid"
"github.com/osbuild/osbuild-composer/internal/jobqueue"
"github.com/osbuild/osbuild-composer/internal/jsondb"
)
type fsJobQueue struct {
// Protects all fields of this struct. In particular, it ensures
// transactions on `db` are atomic. All public functions except
// JobStatus hold it while they're running. Dequeue() releases it
// briefly while waiting on pending channels.
mu sync.Mutex
db *jsondb.JSONDatabase
// Maps job types to channels of job ids for that type.
pending map[string]chan uuid.UUID
// Maps job ids to the jobs that depend on it, if any of those
// dependants have not yet finished.
dependants map[uuid.UUID][]uuid.UUID
}
// On-disk job struct. Contains all necessary (but non-redundant) information
// about a job. These are not held in memory by the job queue, but
// (de)serialized on each access.
type job struct {
Id uuid.UUID `json:"id"`
Type string `json:"type"`
Args json.RawMessage `json:"args,omitempty"`
Dependencies []uuid.UUID `json:"dependencies"`
Result json.RawMessage `json:"result,omitempty"`
QueuedAt time.Time `json:"queued_at,omitempty"`
StartedAt time.Time `json:"started_at,omitempty"`
FinishedAt time.Time `json:"finished_at,omitempty"`
Canceled bool `json:"canceled,omitempty"`
}
// Create a new fsJobQueue object for `dir`. This object must have exclusive
// access to `dir`. If `dir` contains jobs created from previous runs, they are
// loaded and rescheduled to run if necessary.
func New(dir string, acceptedJobTypes []string) (*fsJobQueue, error) {
q := &fsJobQueue{
db: jsondb.New(dir, 0600),
pending: make(map[string]chan uuid.UUID),
dependants: make(map[uuid.UUID][]uuid.UUID),
}
for _, jt := range acceptedJobTypes {
q.pending[jt] = make(chan uuid.UUID, 100)
}
// Look for jobs that are still pending and build the dependant map.
ids, err := q.db.List()
if err != nil {
return nil, fmt.Errorf("error listing jobs: %v", err)
}
for _, id := range ids {
uuid, err := uuid.Parse(id)
if err != nil {
return nil, fmt.Errorf("invalid job '%s' in db: %v", id, err)
}
j, err := q.readJob(uuid)
if err != nil {
return nil, err
}
err = q.maybeEnqueue(j, true)
if err != nil {
return nil, err
}
}
return q, nil
}
func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID) (uuid.UUID, error) {
q.mu.Lock()
defer q.mu.Unlock()
if _, exists := q.pending[jobType]; !exists {
return uuid.Nil, fmt.Errorf("this queue does not accept job type '%s'", jobType)
}
var j = job{
Id: uuid.New(),
Type: jobType,
Dependencies: dependencies,
QueuedAt: time.Now(),
}
var err error
j.Args, err = json.Marshal(args)
if err != nil {
return uuid.Nil, fmt.Errorf("error marshaling job arguments: %v", err)
}
// Verify dependendencies early, so that the job doesn't get written
// when one of them doesn't exist.
for _, d := range j.Dependencies {
exists, err := q.db.Read(d.String(), nil)
if err != nil {
return uuid.Nil, err
}
if !exists {
return uuid.Nil, jobqueue.ErrNotExist
}
}
// Write the job before updating in-memory state, so that the latter
// doesn't become corrupt when writing fails.
err = q.db.Write(j.Id.String(), j)
if err != nil {
return uuid.Nil, fmt.Errorf("cannot write job: %v:", err)
}
err = q.maybeEnqueue(&j, true)
if err != nil {
return uuid.Nil, err
}
return j.Id, nil
}
func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
q.mu.Lock()
defer q.mu.Unlock()
// Return early if the context is already canceled.
if err := ctx.Err(); err != nil {
return uuid.Nil, nil, "", nil, err
}
// Filter q.pending by the `jobTypes`. Ignore those job types that this
// queue doesn't accept.
chans := []chan uuid.UUID{}
for _, jt := range jobTypes {
if c, exists := q.pending[jt]; exists {
chans = append(chans, c)
}
}
// Loop until finding a non-canceled job.
var j *job
for {
// Unlock the mutex while polling channels, so that multiple goroutines
// can wait at the same time.
q.mu.Unlock()
id, err := selectUUIDChannel(ctx, chans)
q.mu.Lock()
if err != nil {
return uuid.Nil, nil, "", nil, err
}
j, err = q.readJob(id)
if err != nil {
return uuid.Nil, nil, "", nil, err
}
if !j.Canceled {
break
}
}
j.StartedAt = time.Now()
err := q.db.Write(j.Id.String(), j)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error writing job %s: %v", j.Id, err)
}
return j.Id, j.Dependencies, j.Type, j.Args, nil
}
func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
q.mu.Lock()
defer q.mu.Unlock()
j, err := q.readJob(id)
if err != nil {
return err
}
if j.Canceled {
return jobqueue.ErrCanceled
}
if j.StartedAt.IsZero() || !j.FinishedAt.IsZero() {
return jobqueue.ErrNotRunning
}
j.FinishedAt = time.Now()
j.Result, err = json.Marshal(result)
if err != nil {
return fmt.Errorf("error marshaling result: %v", err)
}
// Write before notifying dependants, because it will be read again.
err = q.db.Write(id.String(), j)
if err != nil {
return fmt.Errorf("error writing job %s: %v", id, err)
}
for _, depid := range q.dependants[id] {
dep, err := q.readJob(depid)
if err != nil {
return err
}
err = q.maybeEnqueue(dep, false)
if err != nil {
return err
}
}
delete(q.dependants, id)
return nil
}
func (q *fsJobQueue) CancelJob(id uuid.UUID) error {
q.mu.Lock()
defer q.mu.Unlock()
j, err := q.readJob(id)
if err != nil {
return err
}
if !j.FinishedAt.IsZero() {
return nil
}
j.Canceled = true
err = q.db.Write(id.String(), j)
if err != nil {
return fmt.Errorf("error writing job %s: %v", id, err)
}
return nil
}
func (q *fsJobQueue) JobStatus(id uuid.UUID) (result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) {
j, err := q.readJob(id)
if err != nil {
return
}
result = j.Result
queued = j.QueuedAt
started = j.StartedAt
finished = j.FinishedAt
canceled = j.Canceled
deps = j.Dependencies
return
}
// Reads job with `id`. This is a thin wrapper around `q.db.Read`, which
// returns the job directly, or and error if a job with `id` does not exist.
func (q *fsJobQueue) readJob(id uuid.UUID) (*job, error) {
var j job
exists, err := q.db.Read(id.String(), &j)
if err != nil {
return nil, fmt.Errorf("error reading job '%s': %v", id, err)
}
if !exists {
// return corrupt database?
return nil, jobqueue.ErrNotExist
}
return &j, nil
}
// Enqueue `job` if it is pending and all its dependencies have finished.
// Update `q.dependants` if the job was not queued and updateDependants is true
// (i.e., when this is a new job).
func (q *fsJobQueue) maybeEnqueue(j *job, updateDependants bool) error {
if !j.StartedAt.IsZero() {
return nil
}
depsFinished := true
for _, id := range j.Dependencies {
j, err := q.readJob(id)
if err != nil {
return err
}
if j.FinishedAt.IsZero() {
depsFinished = false
break
}
}
if depsFinished {
c, exists := q.pending[j.Type]
if !exists {
return fmt.Errorf("this queue doesn't accept job type '%s'", j.Type)
}
c <- j.Id
} else if updateDependants {
for _, id := range j.Dependencies {
q.dependants[id] = append(q.dependants[id], j.Id)
}
}
return nil
}
// Select on a list of `chan uuid.UUID`s. Returns an error if one of the
// channels is closed.
//
// Uses reflect.Select(), because the `select` statement cannot operate on an
// unknown amount of channels.
func selectUUIDChannel(ctx context.Context, chans []chan uuid.UUID) (uuid.UUID, error) {
cases := []reflect.SelectCase{
{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.Done()),
},
}
for _, c := range chans {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(c),
})
}
chosen, value, recvOK := reflect.Select(cases)
if !recvOK {
if chosen == 0 {
return uuid.Nil, ctx.Err()
} else {
return uuid.Nil, errors.New("channel was closed unexpectedly")
}
}
return value.Interface().(uuid.UUID), nil
}