773 lines
19 KiB
Go
773 lines
19 KiB
Go
// Package fsjobqueue implements a filesystem-backed job queue. It implements
|
|
// the interfaces in package jobqueue.
|
|
//
|
|
// Jobs are stored in the file system, using the `jsondb` package. However,
|
|
// this package does not use the file system as a database, but keeps some
|
|
// state in memory. This means that access to a given directory must be
|
|
// exclusive to only one `fsJobQueue` object at a time. A single `fsJobQueue`
|
|
// can be safely accessed from multiple goroutines, though.
|
|
//
|
|
// Data is stored non-reduntantly. Any data structure necessary for efficient
|
|
// access (e.g., dependants) are kept in memory.
|
|
package fsjobqueue
|
|
|
|
import (
|
|
"container/list"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
|
|
"github.com/osbuild/osbuild-composer/pkg/jobqueue"
|
|
|
|
"github.com/osbuild/osbuild-composer/internal/jsondb"
|
|
)
|
|
|
|
type fsJobQueue struct {
|
|
// Protects all fields of this struct. In particular, it ensures
|
|
// transactions on `db` are atomic. All public functions except
|
|
// JobStatus hold it while they're running. Dequeue() releases it
|
|
// briefly while waiting on pending channels.
|
|
mu sync.Mutex
|
|
|
|
db *jsondb.JSONDatabase
|
|
|
|
// List of pending job
|
|
pending *list.List
|
|
|
|
// Set of goroutines waiting for new pending jobs
|
|
listeners map[chan struct{}]struct{}
|
|
|
|
// Maps job ids to the jobs that depend on it, if any of those
|
|
// dependants have not yet finished.
|
|
dependants map[uuid.UUID][]uuid.UUID
|
|
|
|
// Currently running jobs. Workers are not handed job ids, but
|
|
// independent tokens which serve as an indirection. This enables
|
|
// race-free uploading of artifacts and makes restarting composer more
|
|
// robust (workers from an old run cannot report results for jobs
|
|
// composer thinks are not running).
|
|
// This map maps these tokens to job ids. Artifacts are stored in
|
|
// `$STATE_DIRECTORY/artifacts/tmp/$TOKEN` while the worker is running,
|
|
// and renamed to `$STATE_DIRECTORY/artifacts/$JOB_ID` once the job is
|
|
// reported as done.
|
|
jobIdByToken map[uuid.UUID]uuid.UUID
|
|
heartbeats map[uuid.UUID]time.Time // token -> heartbeat
|
|
|
|
workerIDByToken map[uuid.UUID]uuid.UUID // token -> workerID
|
|
workers map[uuid.UUID]worker
|
|
}
|
|
|
|
type worker struct {
|
|
Channel string `json:"channel"`
|
|
Arch string `json:"arch"`
|
|
Heartbeat time.Time `json:"heartbeat"`
|
|
Tokens map[uuid.UUID]struct{}
|
|
}
|
|
|
|
// On-disk job struct. Contains all necessary (but non-redundant) information
|
|
// about a job. These are not held in memory by the job queue, but
|
|
// (de)serialized on each access.
|
|
type job struct {
|
|
Id uuid.UUID `json:"id"`
|
|
Token uuid.UUID `json:"token"`
|
|
Type string `json:"type"`
|
|
Args json.RawMessage `json:"args,omitempty"`
|
|
Dependencies []uuid.UUID `json:"dependencies"`
|
|
Dependents []uuid.UUID `json:"dependents"`
|
|
Result json.RawMessage `json:"result,omitempty"`
|
|
Channel string `json:"channel"`
|
|
|
|
QueuedAt time.Time `json:"queued_at,omitempty"`
|
|
StartedAt time.Time `json:"started_at,omitempty"`
|
|
FinishedAt time.Time `json:"finished_at,omitempty"`
|
|
ExpiresAt time.Time `json:"expires_at,omitempty"`
|
|
|
|
Retries uint64 `json:"retries"`
|
|
Canceled bool `json:"canceled,omitempty"`
|
|
}
|
|
|
|
// Create a new fsJobQueue object for `dir`. This object must have exclusive
|
|
// access to `dir`. If `dir` contains jobs created from previous runs, they are
|
|
// loaded and rescheduled to run if necessary.
|
|
func New(dir string) (*fsJobQueue, error) {
|
|
q := &fsJobQueue{
|
|
db: jsondb.New(dir, 0600),
|
|
pending: list.New(),
|
|
dependants: make(map[uuid.UUID][]uuid.UUID),
|
|
jobIdByToken: make(map[uuid.UUID]uuid.UUID),
|
|
heartbeats: make(map[uuid.UUID]time.Time),
|
|
listeners: make(map[chan struct{}]struct{}),
|
|
workers: make(map[uuid.UUID]worker),
|
|
workerIDByToken: make(map[uuid.UUID]uuid.UUID),
|
|
}
|
|
|
|
// Look for jobs that are still pending and build the dependant map.
|
|
ids, err := q.db.List()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error listing jobs: %v", err)
|
|
}
|
|
|
|
for _, id := range ids {
|
|
jobId, err := uuid.Parse(id)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid job '%s' in db: %v", id, err)
|
|
}
|
|
j, err := q.readJob(jobId)
|
|
if err != nil {
|
|
// Skip invalid jobs, leaving them in place for later examination
|
|
continue
|
|
}
|
|
|
|
// If a job is running, and not cancelled, track the token
|
|
if !j.StartedAt.IsZero() && j.FinishedAt.IsZero() && !j.Canceled {
|
|
// Fail older running jobs which don't have a token stored
|
|
if j.Token == uuid.Nil {
|
|
_, err = q.RequeueOrFinishJob(j.Id, 0, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error finishing job '%s' without a token: %v", j.Id, err)
|
|
}
|
|
} else {
|
|
q.jobIdByToken[j.Token] = j.Id
|
|
q.heartbeats[j.Token] = time.Now()
|
|
}
|
|
}
|
|
|
|
err = q.maybeEnqueue(j, true)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return q, nil
|
|
}
|
|
|
|
func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
var j = job{
|
|
Id: uuid.New(),
|
|
Token: uuid.Nil,
|
|
Type: jobType,
|
|
Dependencies: dependencies,
|
|
QueuedAt: time.Now(),
|
|
Channel: channel,
|
|
}
|
|
|
|
var err error
|
|
j.Args, err = json.Marshal(args)
|
|
if err != nil {
|
|
return uuid.Nil, fmt.Errorf("error marshaling job arguments: %v", err)
|
|
}
|
|
|
|
// Verify dependendencies early, so that the job doesn't get written
|
|
// when one of them doesn't exist.
|
|
for _, d := range j.Dependencies {
|
|
var dep job
|
|
exists, err := q.db.Read(d.String(), &dep)
|
|
if err != nil {
|
|
return uuid.Nil, err
|
|
}
|
|
if !exists {
|
|
return uuid.Nil, jobqueue.ErrNotExist
|
|
}
|
|
|
|
dep.Dependents = append(dep.Dependents, j.Id)
|
|
err = q.db.Write(d.String(), dep)
|
|
if err != nil {
|
|
return uuid.Nil, err
|
|
}
|
|
}
|
|
|
|
// Write the job before updating in-memory state, so that the latter
|
|
// doesn't become corrupt when writing fails.
|
|
err = q.db.Write(j.Id.String(), j)
|
|
if err != nil {
|
|
return uuid.Nil, fmt.Errorf("cannot write job: %v:", err)
|
|
}
|
|
|
|
err = q.maybeEnqueue(&j, true)
|
|
if err != nil {
|
|
return uuid.Nil, err
|
|
}
|
|
|
|
return j.Id, nil
|
|
}
|
|
|
|
func (q *fsJobQueue) Dequeue(ctx context.Context, wID uuid.UUID, jobTypes, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
// Return early if the context is already canceled.
|
|
if err := ctx.Err(); err != nil {
|
|
return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
|
|
}
|
|
|
|
// Add a new listener
|
|
c := make(chan struct{}, 1)
|
|
q.listeners[c] = struct{}{}
|
|
defer delete(q.listeners, c)
|
|
|
|
// Loop until finding a suitable job
|
|
var j *job
|
|
for {
|
|
var found bool
|
|
var err error
|
|
j, found, err = q.dequeueSuitableJob(jobTypes, channels)
|
|
if err != nil {
|
|
return uuid.Nil, uuid.Nil, nil, "", nil, err
|
|
}
|
|
if found {
|
|
break
|
|
}
|
|
|
|
// Unlock the mutex while polling channels, so that multiple goroutines
|
|
// can wait at the same time.
|
|
q.mu.Unlock()
|
|
select {
|
|
case <-c:
|
|
case <-ctx.Done():
|
|
// there's defer q.mu.Unlock(), so let's lock
|
|
q.mu.Lock()
|
|
return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
|
|
}
|
|
q.mu.Lock()
|
|
}
|
|
|
|
j.StartedAt = time.Now()
|
|
|
|
j.Token = uuid.New()
|
|
q.jobIdByToken[j.Token] = j.Id
|
|
q.heartbeats[j.Token] = time.Now()
|
|
if _, ok := q.workers[wID]; ok {
|
|
q.workers[wID].Tokens[j.Token] = struct{}{}
|
|
q.workerIDByToken[j.Token] = wID
|
|
}
|
|
|
|
err := q.db.Write(j.Id.String(), j)
|
|
if err != nil {
|
|
return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error writing job %s: %v", j.Id, err)
|
|
}
|
|
|
|
return j.Id, j.Token, j.Dependencies, j.Type, j.Args, nil
|
|
}
|
|
|
|
func (q *fsJobQueue) DequeueByID(ctx context.Context, id, wID uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
j, err := q.readJob(id)
|
|
if err != nil {
|
|
return uuid.Nil, nil, "", nil, err
|
|
}
|
|
|
|
if !j.StartedAt.IsZero() {
|
|
return uuid.Nil, nil, "", nil, jobqueue.ErrNotPending
|
|
}
|
|
|
|
depsFinished, err := q.hasAllFinishedDependencies(j)
|
|
if err != nil {
|
|
return uuid.Nil, nil, "", nil, err
|
|
}
|
|
if !depsFinished {
|
|
return uuid.Nil, nil, "", nil, jobqueue.ErrNotPending
|
|
}
|
|
|
|
q.removePendingJob(id)
|
|
|
|
j.StartedAt = time.Now()
|
|
|
|
j.Token = uuid.New()
|
|
q.jobIdByToken[j.Token] = j.Id
|
|
q.heartbeats[j.Token] = time.Now()
|
|
if _, ok := q.workers[wID]; ok {
|
|
q.workers[wID].Tokens[j.Token] = struct{}{}
|
|
q.workerIDByToken[j.Token] = wID
|
|
}
|
|
|
|
err = q.db.Write(j.Id.String(), j)
|
|
if err != nil {
|
|
return uuid.Nil, nil, "", nil, fmt.Errorf("error writing job %s: %v", j.Id, err)
|
|
}
|
|
|
|
return j.Token, j.Dependencies, j.Type, j.Args, nil
|
|
}
|
|
|
|
func (q *fsJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) (bool, error) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
j, err := q.readJob(id)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if j.Canceled {
|
|
return false, jobqueue.ErrCanceled
|
|
}
|
|
|
|
if j.StartedAt.IsZero() || !j.FinishedAt.IsZero() {
|
|
return false, jobqueue.ErrNotRunning
|
|
}
|
|
|
|
delete(q.jobIdByToken, j.Token)
|
|
delete(q.heartbeats, j.Token)
|
|
if wID, ok := q.workerIDByToken[j.Token]; ok {
|
|
delete(q.workers[wID].Tokens, j.Token)
|
|
delete(q.workerIDByToken, j.Token)
|
|
}
|
|
|
|
if j.Retries >= maxRetries {
|
|
j.FinishedAt = time.Now()
|
|
|
|
j.Result, err = json.Marshal(result)
|
|
if err != nil {
|
|
return false, fmt.Errorf("error marshaling result: %v", err)
|
|
}
|
|
|
|
// Write before notifying dependants, because it will be read again.
|
|
err = q.db.Write(id.String(), j)
|
|
if err != nil {
|
|
return false, fmt.Errorf("error writing job %s: %v", id, err)
|
|
}
|
|
|
|
for _, depid := range q.dependants[id] {
|
|
dep, err := q.readJob(depid)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
err = q.maybeEnqueue(dep, false)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
delete(q.dependants, id)
|
|
return false, nil
|
|
} else {
|
|
j.Token = uuid.Nil
|
|
j.StartedAt = time.Time{}
|
|
j.Retries += 1
|
|
|
|
// Write the job before updating in-memory state, so that the latter
|
|
// doesn't become corrupt when writing fails.
|
|
err = q.db.Write(j.Id.String(), j)
|
|
if err != nil {
|
|
return false, fmt.Errorf("cannot write job: %v", err)
|
|
}
|
|
|
|
// add the job to the list of pending ones
|
|
q.pending.PushBack(j.Id)
|
|
|
|
// notify all listeners in a non-blocking way
|
|
for c := range q.listeners {
|
|
select {
|
|
case c <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
return true, nil
|
|
}
|
|
}
|
|
|
|
func (q *fsJobQueue) CancelJob(id uuid.UUID) error {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
j, err := q.readJob(id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !j.FinishedAt.IsZero() {
|
|
return jobqueue.ErrNotRunning
|
|
}
|
|
|
|
// if the cancelled job is pending, remove it from the list
|
|
if j.StartedAt.IsZero() {
|
|
q.removePendingJob(id)
|
|
}
|
|
|
|
j.Canceled = true
|
|
|
|
delete(q.heartbeats, j.Token)
|
|
|
|
err = q.db.Write(id.String(), j)
|
|
if err != nil {
|
|
return fmt.Errorf("error writing job %s: %v", id, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *fsJobQueue) FailJob(id uuid.UUID, result interface{}) error {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
j, err := q.readJob(id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !j.FinishedAt.IsZero() {
|
|
return jobqueue.ErrFinished
|
|
}
|
|
|
|
if !j.StartedAt.IsZero() {
|
|
return jobqueue.ErrRunning
|
|
}
|
|
|
|
j.Result, err = json.Marshal(result)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
j.StartedAt = time.Now()
|
|
j.FinishedAt = time.Now()
|
|
j.Token = uuid.New()
|
|
|
|
err = q.db.Write(id.String(), j)
|
|
if err != nil {
|
|
return fmt.Errorf("error writing job %s: %v", id, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *fsJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, dependents []uuid.UUID, err error) {
|
|
j, err := q.readJob(id)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
jobType = j.Type
|
|
channel = j.Channel
|
|
result = j.Result
|
|
queued = j.QueuedAt
|
|
started = j.StartedAt
|
|
finished = j.FinishedAt
|
|
canceled = j.Canceled
|
|
deps = j.Dependencies
|
|
dependents = j.Dependents
|
|
|
|
return
|
|
}
|
|
|
|
func (q *fsJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, channel string, err error) {
|
|
j, err := q.readJob(id)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
jobType = j.Type
|
|
args = j.Args
|
|
dependencies = j.Dependencies
|
|
channel = j.Channel
|
|
|
|
return
|
|
}
|
|
|
|
func (q *fsJobQueue) IdFromToken(token uuid.UUID) (id uuid.UUID, err error) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
id, ok := q.jobIdByToken[token]
|
|
if !ok {
|
|
return uuid.Nil, jobqueue.ErrNotExist
|
|
}
|
|
return id, nil
|
|
}
|
|
|
|
// Retrieve a list of tokens tied to jobs, which most recent action has been
|
|
// olderThan time ago
|
|
func (q *fsJobQueue) Heartbeats(olderThan time.Duration) (tokens []uuid.UUID) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
now := time.Now()
|
|
for token, hb := range q.heartbeats {
|
|
if now.Sub(hb) > olderThan {
|
|
tokens = append(tokens, token)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (q *fsJobQueue) RefreshHeartbeat(token uuid.UUID) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
if token != uuid.Nil {
|
|
q.heartbeats[token] = time.Now()
|
|
}
|
|
}
|
|
|
|
func (q *fsJobQueue) InsertWorker(channel, arch string) (uuid.UUID, error) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
wID := uuid.New()
|
|
q.workers[wID] = worker{
|
|
Channel: channel,
|
|
Arch: arch,
|
|
Heartbeat: time.Now(),
|
|
Tokens: make(map[uuid.UUID]struct{}),
|
|
}
|
|
return wID, nil
|
|
}
|
|
|
|
func (q *fsJobQueue) UpdateWorkerStatus(wID uuid.UUID) error {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
worker, ok := q.workers[wID]
|
|
if !ok {
|
|
return jobqueue.ErrWorkerNotExist
|
|
}
|
|
|
|
worker.Heartbeat = time.Now()
|
|
return nil
|
|
}
|
|
|
|
func (q *fsJobQueue) Workers(olderThan time.Duration) ([]jobqueue.Worker, error) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
now := time.Now()
|
|
workers := []jobqueue.Worker{}
|
|
for wID, w := range q.workers {
|
|
if now.Sub(w.Heartbeat) > olderThan {
|
|
workers = append(workers, jobqueue.Worker{
|
|
ID: wID,
|
|
Channel: w.Channel,
|
|
Arch: w.Arch,
|
|
})
|
|
}
|
|
}
|
|
return workers, nil
|
|
}
|
|
|
|
func (q *fsJobQueue) DeleteWorker(wID uuid.UUID) error {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
worker, ok := q.workers[wID]
|
|
if !ok {
|
|
return jobqueue.ErrWorkerNotExist
|
|
}
|
|
|
|
if len(worker.Tokens) != 0 {
|
|
return jobqueue.ErrActiveJobs
|
|
}
|
|
delete(q.workers, wID)
|
|
return nil
|
|
}
|
|
|
|
// Reads job with `id`. This is a thin wrapper around `q.db.Read`, which
|
|
// returns the job directly, or and error if a job with `id` does not exist.
|
|
func (q *fsJobQueue) readJob(id uuid.UUID) (*job, error) {
|
|
var j job
|
|
exists, err := q.db.Read(id.String(), &j)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error reading job '%s': %v", id, err)
|
|
}
|
|
if !exists {
|
|
// return corrupt database?
|
|
return nil, jobqueue.ErrNotExist
|
|
}
|
|
return &j, nil
|
|
}
|
|
|
|
// Enqueue `job` if it is pending and all its dependencies have finished.
|
|
// Update `q.dependants` if the job was not queued and updateDependants is true
|
|
// (i.e., when this is a new job).
|
|
// `q.mu` must be locked when this method is called. The only exception is
|
|
// `New()` because no concurrent calls are possible there.
|
|
func (q *fsJobQueue) maybeEnqueue(j *job, updateDependants bool) error {
|
|
if !j.StartedAt.IsZero() {
|
|
return nil
|
|
}
|
|
|
|
depsFinished, err := q.hasAllFinishedDependencies(j)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if depsFinished {
|
|
// add the job to the list of pending ones
|
|
q.pending.PushBack(j.Id)
|
|
|
|
// notify all listeners in a non-blocking way
|
|
for c := range q.listeners {
|
|
select {
|
|
case c <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
} else if updateDependants {
|
|
for _, id := range j.Dependencies {
|
|
q.dependants[id] = append(q.dependants[id], j.Id)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// hasAllFinishedDependencies returns true if all dependencies of `j`
|
|
// are finished. Otherwise, false is returned. If one of the jobs cannot
|
|
// be read, an error is returned.
|
|
func (q *fsJobQueue) hasAllFinishedDependencies(j *job) (bool, error) {
|
|
for _, id := range j.Dependencies {
|
|
j, err := q.readJob(id)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if j.FinishedAt.IsZero() {
|
|
return false, nil
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// dequeueSuitableJob finds a suitable job in the list of pending jobs, removes it from there and returns it
|
|
//
|
|
// The job must meet the following conditions:
|
|
// - must be pending
|
|
// - its dependencies must be finished
|
|
// - must be of one of the type from jobTypes
|
|
// - must be of one of the channel from channels
|
|
//
|
|
// If a suitable job is not found, false is returned.
|
|
// If an error occurs during the search, it's returned.
|
|
func (q *fsJobQueue) dequeueSuitableJob(jobTypes []string, channels []string) (*job, bool, error) {
|
|
el := q.pending.Front()
|
|
for el != nil {
|
|
id := el.Value.(uuid.UUID)
|
|
|
|
j, err := q.readJob(id)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
|
|
if !jobMatchesCriteria(j, jobTypes, channels) {
|
|
el = el.Next()
|
|
continue
|
|
}
|
|
|
|
ready, err := q.hasAllFinishedDependencies(j)
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
if ready {
|
|
q.pending.Remove(el)
|
|
return j, true, nil
|
|
}
|
|
el = el.Next()
|
|
}
|
|
|
|
return nil, false, nil
|
|
}
|
|
|
|
// removePendingJob removes a job with given ID from the list of pending jobs
|
|
//
|
|
// If the job isn't in the list, this is no-op.
|
|
func (q *fsJobQueue) removePendingJob(id uuid.UUID) {
|
|
el := q.pending.Front()
|
|
for el != nil {
|
|
if el.Value.(uuid.UUID) == id {
|
|
q.pending.Remove(el)
|
|
return
|
|
}
|
|
|
|
el = el.Next()
|
|
}
|
|
}
|
|
|
|
// jobMatchesCriteria returns true if it matches criteria defined in parameters
|
|
//
|
|
// Criteria:
|
|
// - the job's type is one of the acceptedJobTypes
|
|
// - the job's channel is one of the acceptedChannels
|
|
func jobMatchesCriteria(j *job, acceptedJobTypes []string, acceptedChannels []string) bool {
|
|
contains := func(slice []string, str string) bool {
|
|
for _, item := range slice {
|
|
if str == item {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
return contains(acceptedJobTypes, j.Type) && contains(acceptedChannels, j.Channel)
|
|
}
|
|
|
|
// AllRootJobIDs Return a list of all the top level(root) job uuids
|
|
// This only includes jobs without any Dependents set
|
|
func (q *fsJobQueue) AllRootJobIDs(_ context.Context) ([]uuid.UUID, error) {
|
|
ids, err := q.db.List()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
jobIDs := []uuid.UUID{}
|
|
for _, id := range ids {
|
|
var j job
|
|
exists, err := q.db.Read(id, &j)
|
|
if err != nil {
|
|
return jobIDs, err
|
|
}
|
|
if !exists || len(j.Dependents) > 0 {
|
|
continue
|
|
}
|
|
jobIDs = append(jobIDs, j.Id)
|
|
}
|
|
|
|
return jobIDs, nil
|
|
}
|
|
|
|
// DeleteJob will delete a job and all of its dependencies
|
|
// If a dependency has multiple depenents it will only delete the parent job from
|
|
// the dependants list and then re-save the job instead of removing it.
|
|
//
|
|
// This assumes that the jobs have been created correctly, and that they have
|
|
// no dependency loops. Shared Dependants are ok, but a job cannot have a dependancy
|
|
// on any of its parents (this should never happen).
|
|
func (q *fsJobQueue) DeleteJob(_ context.Context, id uuid.UUID) error {
|
|
// Start it off with an empty parent
|
|
return q.deleteJob(uuid.UUID{}, id)
|
|
}
|
|
|
|
// deleteJob will delete jobs as far down the list as possible
|
|
// missing dependencies are ignored, it deletes as much as it can.
|
|
// A missing parent (the first call) will be returned as an error
|
|
func (q *fsJobQueue) deleteJob(parent, id uuid.UUID) error {
|
|
var j job
|
|
_, err := q.db.Read(id.String(), &j)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Delete the parent uuid from the Dependents list
|
|
var deps []uuid.UUID
|
|
for _, d := range j.Dependents {
|
|
if d == parent {
|
|
continue
|
|
}
|
|
deps = append(deps, d)
|
|
}
|
|
j.Dependents = deps
|
|
|
|
// This job can only be deleted when the Dependents list is empty
|
|
// Otherwise it needs to be saved with the new Dependents list
|
|
if len(j.Dependents) > 0 {
|
|
return q.db.Write(id.String(), j)
|
|
}
|
|
// Recursively delete the dependencies of this job
|
|
for _, dj := range j.Dependencies {
|
|
_ = q.deleteJob(id, dj)
|
|
}
|
|
|
|
return q.db.Delete(id.String())
|
|
}
|