debian-forge-composer/internal/jobqueue/fsjobqueue/fsjobqueue.go
Chloe Kaubisch 873798514b prometheus: add tenant label
Include a tenant label for all prometheus metrics. Modify
jobstatus function in the worker accordingly to return channel
so it can be passed to prometheus.
2022-06-07 16:35:03 +02:00

548 lines
14 KiB
Go

// Package fsjobqueue implements a filesystem-backed job queue. It implements
// the interfaces in package jobqueue.
//
// Jobs are stored in the file system, using the `jsondb` package. However,
// this package does not use the file system as a database, but keeps some
// state in memory. This means that access to a given directory must be
// exclusive to only one `fsJobQueue` object at a time. A single `fsJobQueue`
// can be safely accessed from multiple goroutines, though.
//
// Data is stored non-reduntantly. Any data structure necessary for efficient
// access (e.g., dependants) are kept in memory.
package fsjobqueue
import (
"container/list"
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/google/uuid"
"github.com/osbuild/osbuild-composer/internal/jobqueue"
"github.com/osbuild/osbuild-composer/internal/jsondb"
)
type fsJobQueue struct {
// Protects all fields of this struct. In particular, it ensures
// transactions on `db` are atomic. All public functions except
// JobStatus hold it while they're running. Dequeue() releases it
// briefly while waiting on pending channels.
mu sync.Mutex
db *jsondb.JSONDatabase
// List of pending job
pending *list.List
// Set of goroutines waiting for new pending jobs
listeners map[chan struct{}]struct{}
// Maps job ids to the jobs that depend on it, if any of those
// dependants have not yet finished.
dependants map[uuid.UUID][]uuid.UUID
// Currently running jobs. Workers are not handed job ids, but
// independent tokens which serve as an indirection. This enables
// race-free uploading of artifacts and makes restarting composer more
// robust (workers from an old run cannot report results for jobs
// composer thinks are not running).
// This map maps these tokens to job ids. Artifacts are stored in
// `$STATE_DIRECTORY/artifacts/tmp/$TOKEN` while the worker is running,
// and renamed to `$STATE_DIRECTORY/artifacts/$JOB_ID` once the job is
// reported as done.
jobIdByToken map[uuid.UUID]uuid.UUID
heartbeats map[uuid.UUID]time.Time // token -> heartbeat
}
// On-disk job struct. Contains all necessary (but non-redundant) information
// about a job. These are not held in memory by the job queue, but
// (de)serialized on each access.
type job struct {
Id uuid.UUID `json:"id"`
Token uuid.UUID `json:"token"`
Type string `json:"type"`
Args json.RawMessage `json:"args,omitempty"`
Dependencies []uuid.UUID `json:"dependencies"`
Result json.RawMessage `json:"result,omitempty"`
Channel string `json:"channel"`
QueuedAt time.Time `json:"queued_at,omitempty"`
StartedAt time.Time `json:"started_at,omitempty"`
FinishedAt time.Time `json:"finished_at,omitempty"`
Canceled bool `json:"canceled,omitempty"`
}
// Create a new fsJobQueue object for `dir`. This object must have exclusive
// access to `dir`. If `dir` contains jobs created from previous runs, they are
// loaded and rescheduled to run if necessary.
func New(dir string) (*fsJobQueue, error) {
q := &fsJobQueue{
db: jsondb.New(dir, 0600),
pending: list.New(),
dependants: make(map[uuid.UUID][]uuid.UUID),
jobIdByToken: make(map[uuid.UUID]uuid.UUID),
heartbeats: make(map[uuid.UUID]time.Time),
listeners: make(map[chan struct{}]struct{}),
}
// Look for jobs that are still pending and build the dependant map.
ids, err := q.db.List()
if err != nil {
return nil, fmt.Errorf("error listing jobs: %v", err)
}
for _, id := range ids {
jobId, err := uuid.Parse(id)
if err != nil {
return nil, fmt.Errorf("invalid job '%s' in db: %v", id, err)
}
j, err := q.readJob(jobId)
if err != nil {
return nil, err
}
// If a job is running, and not cancelled, track the token
if !j.StartedAt.IsZero() && j.FinishedAt.IsZero() && !j.Canceled {
// Fail older running jobs which don't have a token stored
if j.Token == uuid.Nil {
err = q.FinishJob(j.Id, nil)
if err != nil {
return nil, fmt.Errorf("Error finishing job '%s' without a token: %v", j.Id, err)
}
} else {
q.jobIdByToken[j.Token] = j.Id
q.heartbeats[j.Token] = time.Now()
}
}
err = q.maybeEnqueue(j, true)
if err != nil {
return nil, err
}
}
return q, nil
}
func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error) {
q.mu.Lock()
defer q.mu.Unlock()
var j = job{
Id: uuid.New(),
Token: uuid.Nil,
Type: jobType,
Dependencies: dependencies,
QueuedAt: time.Now(),
Channel: channel,
}
var err error
j.Args, err = json.Marshal(args)
if err != nil {
return uuid.Nil, fmt.Errorf("error marshaling job arguments: %v", err)
}
// Verify dependendencies early, so that the job doesn't get written
// when one of them doesn't exist.
for _, d := range j.Dependencies {
exists, err := q.db.Read(d.String(), nil)
if err != nil {
return uuid.Nil, err
}
if !exists {
return uuid.Nil, jobqueue.ErrNotExist
}
}
// Write the job before updating in-memory state, so that the latter
// doesn't become corrupt when writing fails.
err = q.db.Write(j.Id.String(), j)
if err != nil {
return uuid.Nil, fmt.Errorf("cannot write job: %v:", err)
}
err = q.maybeEnqueue(&j, true)
if err != nil {
return uuid.Nil, err
}
return j.Id, nil
}
func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
q.mu.Lock()
defer q.mu.Unlock()
// Return early if the context is already canceled.
if err := ctx.Err(); err != nil {
return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
}
// Add a new listener
c := make(chan struct{}, 1)
q.listeners[c] = struct{}{}
defer delete(q.listeners, c)
// Loop until finding a suitable job
var j *job
for {
var found bool
var err error
j, found, err = q.dequeueSuitableJob(jobTypes, channels)
if err != nil {
return uuid.Nil, uuid.Nil, nil, "", nil, err
}
if found {
break
}
// Unlock the mutex while polling channels, so that multiple goroutines
// can wait at the same time.
q.mu.Unlock()
select {
case <-c:
case <-ctx.Done():
// there's defer q.mu.Unlock(), so let's lock
q.mu.Lock()
return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
}
q.mu.Lock()
}
j.StartedAt = time.Now()
j.Token = uuid.New()
q.jobIdByToken[j.Token] = j.Id
q.heartbeats[j.Token] = time.Now()
err := q.db.Write(j.Id.String(), j)
if err != nil {
return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error writing job %s: %v", j.Id, err)
}
return j.Id, j.Token, j.Dependencies, j.Type, j.Args, nil
}
func (q *fsJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
q.mu.Lock()
defer q.mu.Unlock()
j, err := q.readJob(id)
if err != nil {
return uuid.Nil, nil, "", nil, err
}
if !j.StartedAt.IsZero() {
return uuid.Nil, nil, "", nil, jobqueue.ErrNotPending
}
depsFinished, err := q.hasAllFinishedDependencies(j)
if err != nil {
return uuid.Nil, nil, "", nil, err
}
if !depsFinished {
return uuid.Nil, nil, "", nil, jobqueue.ErrNotPending
}
q.removePendingJob(id)
j.StartedAt = time.Now()
j.Token = uuid.New()
q.jobIdByToken[j.Token] = j.Id
q.heartbeats[j.Token] = time.Now()
err = q.db.Write(j.Id.String(), j)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error writing job %s: %v", j.Id, err)
}
return j.Token, j.Dependencies, j.Type, j.Args, nil
}
func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
q.mu.Lock()
defer q.mu.Unlock()
j, err := q.readJob(id)
if err != nil {
return err
}
if j.Canceled {
return jobqueue.ErrCanceled
}
if j.StartedAt.IsZero() || !j.FinishedAt.IsZero() {
return jobqueue.ErrNotRunning
}
j.FinishedAt = time.Now()
j.Result, err = json.Marshal(result)
if err != nil {
return fmt.Errorf("error marshaling result: %v", err)
}
delete(q.heartbeats, j.Token)
delete(q.jobIdByToken, j.Token)
// Write before notifying dependants, because it will be read again.
err = q.db.Write(id.String(), j)
if err != nil {
return fmt.Errorf("error writing job %s: %v", id, err)
}
for _, depid := range q.dependants[id] {
dep, err := q.readJob(depid)
if err != nil {
return err
}
err = q.maybeEnqueue(dep, false)
if err != nil {
return err
}
}
delete(q.dependants, id)
return nil
}
func (q *fsJobQueue) CancelJob(id uuid.UUID) error {
q.mu.Lock()
defer q.mu.Unlock()
j, err := q.readJob(id)
if err != nil {
return err
}
if !j.FinishedAt.IsZero() {
return jobqueue.ErrNotRunning
}
// if the cancelled job is pending, remove it from the list
if j.StartedAt.IsZero() {
q.removePendingJob(id)
}
j.Canceled = true
delete(q.heartbeats, j.Token)
err = q.db.Write(id.String(), j)
if err != nil {
return fmt.Errorf("error writing job %s: %v", id, err)
}
return nil
}
func (q *fsJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) {
j, err := q.readJob(id)
if err != nil {
return
}
jobType = j.Type
channel = j.Channel
result = j.Result
queued = j.QueuedAt
started = j.StartedAt
finished = j.FinishedAt
canceled = j.Canceled
deps = j.Dependencies
return
}
func (q *fsJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, channel string, err error) {
j, err := q.readJob(id)
if err != nil {
return
}
jobType = j.Type
args = j.Args
dependencies = j.Dependencies
channel = j.Channel
return
}
func (q *fsJobQueue) IdFromToken(token uuid.UUID) (id uuid.UUID, err error) {
q.mu.Lock()
defer q.mu.Unlock()
id, ok := q.jobIdByToken[token]
if !ok {
return uuid.Nil, jobqueue.ErrNotExist
}
return id, nil
}
// Retrieve a list of tokens tied to jobs, which most recent action has been
// olderThan time ago
func (q *fsJobQueue) Heartbeats(olderThan time.Duration) (tokens []uuid.UUID) {
q.mu.Lock()
defer q.mu.Unlock()
now := time.Now()
for token, hb := range q.heartbeats {
if now.Sub(hb) > olderThan {
tokens = append(tokens, token)
}
}
return
}
func (q *fsJobQueue) RefreshHeartbeat(token uuid.UUID) {
q.mu.Lock()
defer q.mu.Unlock()
if token != uuid.Nil {
q.heartbeats[token] = time.Now()
}
}
// Reads job with `id`. This is a thin wrapper around `q.db.Read`, which
// returns the job directly, or and error if a job with `id` does not exist.
func (q *fsJobQueue) readJob(id uuid.UUID) (*job, error) {
var j job
exists, err := q.db.Read(id.String(), &j)
if err != nil {
return nil, fmt.Errorf("error reading job '%s': %v", id, err)
}
if !exists {
// return corrupt database?
return nil, jobqueue.ErrNotExist
}
return &j, nil
}
// Enqueue `job` if it is pending and all its dependencies have finished.
// Update `q.dependants` if the job was not queued and updateDependants is true
// (i.e., when this is a new job).
// `q.mu` must be locked when this method is called. The only exception is
// `New()` because no concurrent calls are possible there.
func (q *fsJobQueue) maybeEnqueue(j *job, updateDependants bool) error {
if !j.StartedAt.IsZero() {
return nil
}
depsFinished, err := q.hasAllFinishedDependencies(j)
if err != nil {
return err
}
if depsFinished {
// add the job to the list of pending ones
q.pending.PushBack(j.Id)
// notify all listeners in a non-blocking way
for c := range q.listeners {
select {
case c <- struct{}{}:
default:
}
}
} else if updateDependants {
for _, id := range j.Dependencies {
q.dependants[id] = append(q.dependants[id], j.Id)
}
}
return nil
}
// hasAllFinishedDependencies returns true if all dependencies of `j`
// are finished. Otherwise, false is returned. If one of the jobs cannot
// be read, an error is returned.
func (q *fsJobQueue) hasAllFinishedDependencies(j *job) (bool, error) {
for _, id := range j.Dependencies {
j, err := q.readJob(id)
if err != nil {
return false, err
}
if j.FinishedAt.IsZero() {
return false, nil
}
}
return true, nil
}
// dequeueSuitableJob finds a suitable job in the list of pending jobs, removes it from there and returns it
//
// The job must meet the following conditions:
// - must be pending
// - its dependencies must be finished
// - must be of one of the type from jobTypes
// - must be of one of the channel from channels
//
// If a suitable job is not found, false is returned.
// If an error occurs during the search, it's returned.
func (q *fsJobQueue) dequeueSuitableJob(jobTypes []string, channels []string) (*job, bool, error) {
el := q.pending.Front()
for el != nil {
id := el.Value.(uuid.UUID)
j, err := q.readJob(id)
if err != nil {
return nil, false, err
}
if !jobMatchesCriteria(j, jobTypes, channels) {
el = el.Next()
continue
}
ready, err := q.hasAllFinishedDependencies(j)
if err != nil {
return nil, false, err
}
if ready {
q.pending.Remove(el)
return j, true, nil
}
el = el.Next()
}
return nil, false, nil
}
// removePendingJob removes a job with given ID from the list of pending jobs
//
// If the job isn't in the list, this is no-op.
func (q *fsJobQueue) removePendingJob(id uuid.UUID) {
el := q.pending.Front()
for el != nil {
if el.Value.(uuid.UUID) == id {
q.pending.Remove(el)
return
}
el = el.Next()
}
}
// jobMatchesCriteria returns true if it matches criteria defined in parameters
//
// Criteria:
// - the job's type is one of the acceptedJobTypes
// - the job's channel is one of the acceptedChannels
func jobMatchesCriteria(j *job, acceptedJobTypes []string, acceptedChannels []string) bool {
contains := func(slice []string, str string) bool {
for _, item := range slice {
if str == item {
return true
}
}
return false
}
return contains(acceptedJobTypes, j.Type) && contains(acceptedChannels, j.Channel)
}