debian-forge-composer/internal/jobqueue/fsjobqueue/fsjobqueue.go
Brian C. Lane 5cddc4223d dbjobqueue: Add AllRootJobIDs implementation
Related: RHEL-60120
2025-06-05 10:32:56 +02:00

773 lines
19 KiB
Go

// Package fsjobqueue implements a filesystem-backed job queue. It implements
// the interfaces in package jobqueue.
//
// Jobs are stored in the file system, using the `jsondb` package. However,
// this package does not use the file system as a database, but keeps some
// state in memory. This means that access to a given directory must be
// exclusive to only one `fsJobQueue` object at a time. A single `fsJobQueue`
// can be safely accessed from multiple goroutines, though.
//
// Data is stored non-reduntantly. Any data structure necessary for efficient
// access (e.g., dependants) are kept in memory.
package fsjobqueue
import (
"container/list"
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/google/uuid"
"github.com/osbuild/osbuild-composer/pkg/jobqueue"
"github.com/osbuild/osbuild-composer/internal/jsondb"
)
type fsJobQueue struct {
// Protects all fields of this struct. In particular, it ensures
// transactions on `db` are atomic. All public functions except
// JobStatus hold it while they're running. Dequeue() releases it
// briefly while waiting on pending channels.
mu sync.Mutex
db *jsondb.JSONDatabase
// List of pending job
pending *list.List
// Set of goroutines waiting for new pending jobs
listeners map[chan struct{}]struct{}
// Maps job ids to the jobs that depend on it, if any of those
// dependants have not yet finished.
dependants map[uuid.UUID][]uuid.UUID
// Currently running jobs. Workers are not handed job ids, but
// independent tokens which serve as an indirection. This enables
// race-free uploading of artifacts and makes restarting composer more
// robust (workers from an old run cannot report results for jobs
// composer thinks are not running).
// This map maps these tokens to job ids. Artifacts are stored in
// `$STATE_DIRECTORY/artifacts/tmp/$TOKEN` while the worker is running,
// and renamed to `$STATE_DIRECTORY/artifacts/$JOB_ID` once the job is
// reported as done.
jobIdByToken map[uuid.UUID]uuid.UUID
heartbeats map[uuid.UUID]time.Time // token -> heartbeat
workerIDByToken map[uuid.UUID]uuid.UUID // token -> workerID
workers map[uuid.UUID]worker
}
type worker struct {
Channel string `json:"channel"`
Arch string `json:"arch"`
Heartbeat time.Time `json:"heartbeat"`
Tokens map[uuid.UUID]struct{}
}
// On-disk job struct. Contains all necessary (but non-redundant) information
// about a job. These are not held in memory by the job queue, but
// (de)serialized on each access.
type job struct {
Id uuid.UUID `json:"id"`
Token uuid.UUID `json:"token"`
Type string `json:"type"`
Args json.RawMessage `json:"args,omitempty"`
Dependencies []uuid.UUID `json:"dependencies"`
Dependents []uuid.UUID `json:"dependents"`
Result json.RawMessage `json:"result,omitempty"`
Channel string `json:"channel"`
QueuedAt time.Time `json:"queued_at,omitempty"`
StartedAt time.Time `json:"started_at,omitempty"`
FinishedAt time.Time `json:"finished_at,omitempty"`
ExpiresAt time.Time `json:"expires_at,omitempty"`
Retries uint64 `json:"retries"`
Canceled bool `json:"canceled,omitempty"`
}
// Create a new fsJobQueue object for `dir`. This object must have exclusive
// access to `dir`. If `dir` contains jobs created from previous runs, they are
// loaded and rescheduled to run if necessary.
func New(dir string) (*fsJobQueue, error) {
q := &fsJobQueue{
db: jsondb.New(dir, 0600),
pending: list.New(),
dependants: make(map[uuid.UUID][]uuid.UUID),
jobIdByToken: make(map[uuid.UUID]uuid.UUID),
heartbeats: make(map[uuid.UUID]time.Time),
listeners: make(map[chan struct{}]struct{}),
workers: make(map[uuid.UUID]worker),
workerIDByToken: make(map[uuid.UUID]uuid.UUID),
}
// Look for jobs that are still pending and build the dependant map.
ids, err := q.db.List()
if err != nil {
return nil, fmt.Errorf("error listing jobs: %v", err)
}
for _, id := range ids {
jobId, err := uuid.Parse(id)
if err != nil {
return nil, fmt.Errorf("invalid job '%s' in db: %v", id, err)
}
j, err := q.readJob(jobId)
if err != nil {
// Skip invalid jobs, leaving them in place for later examination
continue
}
// If a job is running, and not cancelled, track the token
if !j.StartedAt.IsZero() && j.FinishedAt.IsZero() && !j.Canceled {
// Fail older running jobs which don't have a token stored
if j.Token == uuid.Nil {
_, err = q.RequeueOrFinishJob(j.Id, 0, nil)
if err != nil {
return nil, fmt.Errorf("Error finishing job '%s' without a token: %v", j.Id, err)
}
} else {
q.jobIdByToken[j.Token] = j.Id
q.heartbeats[j.Token] = time.Now()
}
}
err = q.maybeEnqueue(j, true)
if err != nil {
return nil, err
}
}
return q, nil
}
func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error) {
q.mu.Lock()
defer q.mu.Unlock()
var j = job{
Id: uuid.New(),
Token: uuid.Nil,
Type: jobType,
Dependencies: dependencies,
QueuedAt: time.Now(),
Channel: channel,
}
var err error
j.Args, err = json.Marshal(args)
if err != nil {
return uuid.Nil, fmt.Errorf("error marshaling job arguments: %v", err)
}
// Verify dependendencies early, so that the job doesn't get written
// when one of them doesn't exist.
for _, d := range j.Dependencies {
var dep job
exists, err := q.db.Read(d.String(), &dep)
if err != nil {
return uuid.Nil, err
}
if !exists {
return uuid.Nil, jobqueue.ErrNotExist
}
dep.Dependents = append(dep.Dependents, j.Id)
err = q.db.Write(d.String(), dep)
if err != nil {
return uuid.Nil, err
}
}
// Write the job before updating in-memory state, so that the latter
// doesn't become corrupt when writing fails.
err = q.db.Write(j.Id.String(), j)
if err != nil {
return uuid.Nil, fmt.Errorf("cannot write job: %v:", err)
}
err = q.maybeEnqueue(&j, true)
if err != nil {
return uuid.Nil, err
}
return j.Id, nil
}
func (q *fsJobQueue) Dequeue(ctx context.Context, wID uuid.UUID, jobTypes, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
q.mu.Lock()
defer q.mu.Unlock()
// Return early if the context is already canceled.
if err := ctx.Err(); err != nil {
return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
}
// Add a new listener
c := make(chan struct{}, 1)
q.listeners[c] = struct{}{}
defer delete(q.listeners, c)
// Loop until finding a suitable job
var j *job
for {
var found bool
var err error
j, found, err = q.dequeueSuitableJob(jobTypes, channels)
if err != nil {
return uuid.Nil, uuid.Nil, nil, "", nil, err
}
if found {
break
}
// Unlock the mutex while polling channels, so that multiple goroutines
// can wait at the same time.
q.mu.Unlock()
select {
case <-c:
case <-ctx.Done():
// there's defer q.mu.Unlock(), so let's lock
q.mu.Lock()
return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
}
q.mu.Lock()
}
j.StartedAt = time.Now()
j.Token = uuid.New()
q.jobIdByToken[j.Token] = j.Id
q.heartbeats[j.Token] = time.Now()
if _, ok := q.workers[wID]; ok {
q.workers[wID].Tokens[j.Token] = struct{}{}
q.workerIDByToken[j.Token] = wID
}
err := q.db.Write(j.Id.String(), j)
if err != nil {
return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error writing job %s: %v", j.Id, err)
}
return j.Id, j.Token, j.Dependencies, j.Type, j.Args, nil
}
func (q *fsJobQueue) DequeueByID(ctx context.Context, id, wID uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
q.mu.Lock()
defer q.mu.Unlock()
j, err := q.readJob(id)
if err != nil {
return uuid.Nil, nil, "", nil, err
}
if !j.StartedAt.IsZero() {
return uuid.Nil, nil, "", nil, jobqueue.ErrNotPending
}
depsFinished, err := q.hasAllFinishedDependencies(j)
if err != nil {
return uuid.Nil, nil, "", nil, err
}
if !depsFinished {
return uuid.Nil, nil, "", nil, jobqueue.ErrNotPending
}
q.removePendingJob(id)
j.StartedAt = time.Now()
j.Token = uuid.New()
q.jobIdByToken[j.Token] = j.Id
q.heartbeats[j.Token] = time.Now()
if _, ok := q.workers[wID]; ok {
q.workers[wID].Tokens[j.Token] = struct{}{}
q.workerIDByToken[j.Token] = wID
}
err = q.db.Write(j.Id.String(), j)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error writing job %s: %v", j.Id, err)
}
return j.Token, j.Dependencies, j.Type, j.Args, nil
}
func (q *fsJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) (bool, error) {
q.mu.Lock()
defer q.mu.Unlock()
j, err := q.readJob(id)
if err != nil {
return false, err
}
if j.Canceled {
return false, jobqueue.ErrCanceled
}
if j.StartedAt.IsZero() || !j.FinishedAt.IsZero() {
return false, jobqueue.ErrNotRunning
}
delete(q.jobIdByToken, j.Token)
delete(q.heartbeats, j.Token)
if wID, ok := q.workerIDByToken[j.Token]; ok {
delete(q.workers[wID].Tokens, j.Token)
delete(q.workerIDByToken, j.Token)
}
if j.Retries >= maxRetries {
j.FinishedAt = time.Now()
j.Result, err = json.Marshal(result)
if err != nil {
return false, fmt.Errorf("error marshaling result: %v", err)
}
// Write before notifying dependants, because it will be read again.
err = q.db.Write(id.String(), j)
if err != nil {
return false, fmt.Errorf("error writing job %s: %v", id, err)
}
for _, depid := range q.dependants[id] {
dep, err := q.readJob(depid)
if err != nil {
return false, err
}
err = q.maybeEnqueue(dep, false)
if err != nil {
return false, err
}
}
delete(q.dependants, id)
return false, nil
} else {
j.Token = uuid.Nil
j.StartedAt = time.Time{}
j.Retries += 1
// Write the job before updating in-memory state, so that the latter
// doesn't become corrupt when writing fails.
err = q.db.Write(j.Id.String(), j)
if err != nil {
return false, fmt.Errorf("cannot write job: %v", err)
}
// add the job to the list of pending ones
q.pending.PushBack(j.Id)
// notify all listeners in a non-blocking way
for c := range q.listeners {
select {
case c <- struct{}{}:
default:
}
}
return true, nil
}
}
func (q *fsJobQueue) CancelJob(id uuid.UUID) error {
q.mu.Lock()
defer q.mu.Unlock()
j, err := q.readJob(id)
if err != nil {
return err
}
if !j.FinishedAt.IsZero() {
return jobqueue.ErrNotRunning
}
// if the cancelled job is pending, remove it from the list
if j.StartedAt.IsZero() {
q.removePendingJob(id)
}
j.Canceled = true
delete(q.heartbeats, j.Token)
err = q.db.Write(id.String(), j)
if err != nil {
return fmt.Errorf("error writing job %s: %v", id, err)
}
return nil
}
func (q *fsJobQueue) FailJob(id uuid.UUID, result interface{}) error {
q.mu.Lock()
defer q.mu.Unlock()
j, err := q.readJob(id)
if err != nil {
return err
}
if !j.FinishedAt.IsZero() {
return jobqueue.ErrFinished
}
if !j.StartedAt.IsZero() {
return jobqueue.ErrRunning
}
j.Result, err = json.Marshal(result)
if err != nil {
return err
}
j.StartedAt = time.Now()
j.FinishedAt = time.Now()
j.Token = uuid.New()
err = q.db.Write(id.String(), j)
if err != nil {
return fmt.Errorf("error writing job %s: %v", id, err)
}
return nil
}
func (q *fsJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, dependents []uuid.UUID, err error) {
j, err := q.readJob(id)
if err != nil {
return
}
jobType = j.Type
channel = j.Channel
result = j.Result
queued = j.QueuedAt
started = j.StartedAt
finished = j.FinishedAt
canceled = j.Canceled
deps = j.Dependencies
dependents = j.Dependents
return
}
func (q *fsJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, channel string, err error) {
j, err := q.readJob(id)
if err != nil {
return
}
jobType = j.Type
args = j.Args
dependencies = j.Dependencies
channel = j.Channel
return
}
func (q *fsJobQueue) IdFromToken(token uuid.UUID) (id uuid.UUID, err error) {
q.mu.Lock()
defer q.mu.Unlock()
id, ok := q.jobIdByToken[token]
if !ok {
return uuid.Nil, jobqueue.ErrNotExist
}
return id, nil
}
// Retrieve a list of tokens tied to jobs, which most recent action has been
// olderThan time ago
func (q *fsJobQueue) Heartbeats(olderThan time.Duration) (tokens []uuid.UUID) {
q.mu.Lock()
defer q.mu.Unlock()
now := time.Now()
for token, hb := range q.heartbeats {
if now.Sub(hb) > olderThan {
tokens = append(tokens, token)
}
}
return
}
func (q *fsJobQueue) RefreshHeartbeat(token uuid.UUID) {
q.mu.Lock()
defer q.mu.Unlock()
if token != uuid.Nil {
q.heartbeats[token] = time.Now()
}
}
func (q *fsJobQueue) InsertWorker(channel, arch string) (uuid.UUID, error) {
q.mu.Lock()
defer q.mu.Unlock()
wID := uuid.New()
q.workers[wID] = worker{
Channel: channel,
Arch: arch,
Heartbeat: time.Now(),
Tokens: make(map[uuid.UUID]struct{}),
}
return wID, nil
}
func (q *fsJobQueue) UpdateWorkerStatus(wID uuid.UUID) error {
q.mu.Lock()
defer q.mu.Unlock()
worker, ok := q.workers[wID]
if !ok {
return jobqueue.ErrWorkerNotExist
}
worker.Heartbeat = time.Now()
return nil
}
func (q *fsJobQueue) Workers(olderThan time.Duration) ([]jobqueue.Worker, error) {
q.mu.Lock()
defer q.mu.Unlock()
now := time.Now()
workers := []jobqueue.Worker{}
for wID, w := range q.workers {
if now.Sub(w.Heartbeat) > olderThan {
workers = append(workers, jobqueue.Worker{
ID: wID,
Channel: w.Channel,
Arch: w.Arch,
})
}
}
return workers, nil
}
func (q *fsJobQueue) DeleteWorker(wID uuid.UUID) error {
q.mu.Lock()
defer q.mu.Unlock()
worker, ok := q.workers[wID]
if !ok {
return jobqueue.ErrWorkerNotExist
}
if len(worker.Tokens) != 0 {
return jobqueue.ErrActiveJobs
}
delete(q.workers, wID)
return nil
}
// Reads job with `id`. This is a thin wrapper around `q.db.Read`, which
// returns the job directly, or and error if a job with `id` does not exist.
func (q *fsJobQueue) readJob(id uuid.UUID) (*job, error) {
var j job
exists, err := q.db.Read(id.String(), &j)
if err != nil {
return nil, fmt.Errorf("error reading job '%s': %v", id, err)
}
if !exists {
// return corrupt database?
return nil, jobqueue.ErrNotExist
}
return &j, nil
}
// Enqueue `job` if it is pending and all its dependencies have finished.
// Update `q.dependants` if the job was not queued and updateDependants is true
// (i.e., when this is a new job).
// `q.mu` must be locked when this method is called. The only exception is
// `New()` because no concurrent calls are possible there.
func (q *fsJobQueue) maybeEnqueue(j *job, updateDependants bool) error {
if !j.StartedAt.IsZero() {
return nil
}
depsFinished, err := q.hasAllFinishedDependencies(j)
if err != nil {
return err
}
if depsFinished {
// add the job to the list of pending ones
q.pending.PushBack(j.Id)
// notify all listeners in a non-blocking way
for c := range q.listeners {
select {
case c <- struct{}{}:
default:
}
}
} else if updateDependants {
for _, id := range j.Dependencies {
q.dependants[id] = append(q.dependants[id], j.Id)
}
}
return nil
}
// hasAllFinishedDependencies returns true if all dependencies of `j`
// are finished. Otherwise, false is returned. If one of the jobs cannot
// be read, an error is returned.
func (q *fsJobQueue) hasAllFinishedDependencies(j *job) (bool, error) {
for _, id := range j.Dependencies {
j, err := q.readJob(id)
if err != nil {
return false, err
}
if j.FinishedAt.IsZero() {
return false, nil
}
}
return true, nil
}
// dequeueSuitableJob finds a suitable job in the list of pending jobs, removes it from there and returns it
//
// The job must meet the following conditions:
// - must be pending
// - its dependencies must be finished
// - must be of one of the type from jobTypes
// - must be of one of the channel from channels
//
// If a suitable job is not found, false is returned.
// If an error occurs during the search, it's returned.
func (q *fsJobQueue) dequeueSuitableJob(jobTypes []string, channels []string) (*job, bool, error) {
el := q.pending.Front()
for el != nil {
id := el.Value.(uuid.UUID)
j, err := q.readJob(id)
if err != nil {
return nil, false, err
}
if !jobMatchesCriteria(j, jobTypes, channels) {
el = el.Next()
continue
}
ready, err := q.hasAllFinishedDependencies(j)
if err != nil {
return nil, false, err
}
if ready {
q.pending.Remove(el)
return j, true, nil
}
el = el.Next()
}
return nil, false, nil
}
// removePendingJob removes a job with given ID from the list of pending jobs
//
// If the job isn't in the list, this is no-op.
func (q *fsJobQueue) removePendingJob(id uuid.UUID) {
el := q.pending.Front()
for el != nil {
if el.Value.(uuid.UUID) == id {
q.pending.Remove(el)
return
}
el = el.Next()
}
}
// jobMatchesCriteria returns true if it matches criteria defined in parameters
//
// Criteria:
// - the job's type is one of the acceptedJobTypes
// - the job's channel is one of the acceptedChannels
func jobMatchesCriteria(j *job, acceptedJobTypes []string, acceptedChannels []string) bool {
contains := func(slice []string, str string) bool {
for _, item := range slice {
if str == item {
return true
}
}
return false
}
return contains(acceptedJobTypes, j.Type) && contains(acceptedChannels, j.Channel)
}
// AllRootJobIDs Return a list of all the top level(root) job uuids
// This only includes jobs without any Dependents set
func (q *fsJobQueue) AllRootJobIDs(_ context.Context) ([]uuid.UUID, error) {
ids, err := q.db.List()
if err != nil {
return nil, err
}
jobIDs := []uuid.UUID{}
for _, id := range ids {
var j job
exists, err := q.db.Read(id, &j)
if err != nil {
return jobIDs, err
}
if !exists || len(j.Dependents) > 0 {
continue
}
jobIDs = append(jobIDs, j.Id)
}
return jobIDs, nil
}
// DeleteJob will delete a job and all of its dependencies
// If a dependency has multiple depenents it will only delete the parent job from
// the dependants list and then re-save the job instead of removing it.
//
// This assumes that the jobs have been created correctly, and that they have
// no dependency loops. Shared Dependants are ok, but a job cannot have a dependancy
// on any of its parents (this should never happen).
func (q *fsJobQueue) DeleteJob(_ context.Context, id uuid.UUID) error {
// Start it off with an empty parent
return q.deleteJob(uuid.UUID{}, id)
}
// deleteJob will delete jobs as far down the list as possible
// missing dependencies are ignored, it deletes as much as it can.
// A missing parent (the first call) will be returned as an error
func (q *fsJobQueue) deleteJob(parent, id uuid.UUID) error {
var j job
_, err := q.db.Read(id.String(), &j)
if err != nil {
return err
}
// Delete the parent uuid from the Dependents list
var deps []uuid.UUID
for _, d := range j.Dependents {
if d == parent {
continue
}
deps = append(deps, d)
}
j.Dependents = deps
// This job can only be deleted when the Dependents list is empty
// Otherwise it needs to be saved with the new Dependents list
if len(j.Dependents) > 0 {
return q.db.Write(id.String(), j)
}
// Recursively delete the dependencies of this job
for _, dj := range j.Dependencies {
_ = q.deleteJob(id, dj)
}
return q.db.Delete(id.String())
}