fsjobqueue: refactor to allow dequeuing by multiple criteria

Previous implementation of fsjobqueue is amazing but it has its drawbacks:
- dequeueing can be done only based on a job type
- it's limited to 100 jobs per a job type

As we soon want to be able to dequeue also by another criteria (job channel),
we need to refactor the queue.

The new implementation is more naive but also more flexible. It basically
works like the dbjobqueue - dequeueing goroutines listen for newly added
jobs. When that happens, a signal is sent to all of them and they all inspect
all pending jobs and dequeue ones that match their needs. Ones that don't find
a suitable job, are waiting for the next signal.

This is certainly slower implementation as every time a new job is added into
the queue, all dequeueing goroutines will have to iterate over all
pending jobs. I think that's fine because fsjobqueue is not recommended
to use for composer instances with heavy load.

Signed-off-by: Ondřej Budai <ondrej@budai.cz>
This commit is contained in:
Ondřej Budai 2022-02-15 21:19:52 +01:00 committed by Ondřej Budai
parent b34571c1ec
commit 9c80a17ee5

View file

@ -12,11 +12,10 @@
package fsjobqueue
import (
"container/list"
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"sync"
"time"
@ -35,8 +34,11 @@ type fsJobQueue struct {
db *jsondb.JSONDatabase
// Maps job types to channels of job ids for that type.
pending map[string]chan uuid.UUID
// List of pending job
pending *list.List
// Set of goroutines waiting for new pending jobs
listeners map[chan struct{}]struct{}
// Maps job ids to the jobs that depend on it, if any of those
// dependants have not yet finished.
@ -73,20 +75,17 @@ type job struct {
Canceled bool `json:"canceled,omitempty"`
}
// The size of channels used in fsJobQueue for queueing jobs.
// Note that each job type has its own queue.
const channelSize = 100
// Create a new fsJobQueue object for `dir`. This object must have exclusive
// access to `dir`. If `dir` contains jobs created from previous runs, they are
// loaded and rescheduled to run if necessary.
func New(dir string) (*fsJobQueue, error) {
q := &fsJobQueue{
db: jsondb.New(dir, 0600),
pending: make(map[string]chan uuid.UUID),
pending: list.New(),
dependants: make(map[uuid.UUID][]uuid.UUID),
jobIdByToken: make(map[uuid.UUID]uuid.UUID),
heartbeats: make(map[uuid.UUID]time.Time),
listeners: make(map[chan struct{}]struct{}),
}
// Look for jobs that are still pending and build the dependant map.
@ -182,43 +181,35 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID,
return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
}
// Filter q.pending by the `jobTypes`. Ignore those job types that this
// queue doesn't accept.
chans := []chan uuid.UUID{}
for _, jt := range jobTypes {
c, exists := q.pending[jt]
if !exists {
c = make(chan uuid.UUID, channelSize)
q.pending[jt] = c
}
chans = append(chans, c)
}
// Add a new listener
c := make(chan struct{}, 1)
q.listeners[c] = struct{}{}
defer delete(q.listeners, c)
// Loop until finding a non-canceled job and pending.
// Loop until finding a suitable job
var j *job
for {
var found bool
var err error
j, found, err = q.dequeueSuitableJob(jobTypes)
if err != nil {
return uuid.Nil, uuid.Nil, nil, "", nil, err
}
if found {
break
}
// Unlock the mutex while polling channels, so that multiple goroutines
// can wait at the same time.
q.mu.Unlock()
id, err := selectUUIDChannel(ctx, chans)
select {
case <-c:
case <-ctx.Done():
// there's defer q.mu.Unlock(), so let's lock
q.mu.Lock()
return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
}
q.mu.Lock()
if err != nil {
if errors.As(err, &context.Canceled) || errors.As(err, &context.DeadlineExceeded) {
return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
}
return uuid.Nil, uuid.Nil, nil, "", nil, err
}
j, err = q.readJob(id)
if err != nil {
return uuid.Nil, uuid.Nil, nil, "", nil, err
}
// jobs must be non-cancelled and pending
if !j.Canceled && j.StartedAt.IsZero() {
break
}
}
j.StartedAt = time.Now()
@ -256,6 +247,8 @@ func (q *fsJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID,
return uuid.Nil, nil, "", nil, jobqueue.ErrNotPending
}
q.removePendingJob(id)
j.StartedAt = time.Now()
j.Token = uuid.New()
@ -331,6 +324,11 @@ func (q *fsJobQueue) CancelJob(id uuid.UUID) error {
return jobqueue.ErrNotRunning
}
// if the cancelled job is pending, remove it from the list
if j.StartedAt.IsZero() {
q.removePendingJob(id)
}
j.Canceled = true
delete(q.heartbeats, j.Token)
@ -436,12 +434,16 @@ func (q *fsJobQueue) maybeEnqueue(j *job, updateDependants bool) error {
}
if depsFinished {
c, exists := q.pending[j.Type]
if !exists {
c = make(chan uuid.UUID, channelSize)
q.pending[j.Type] = c
// add the job to the list of pending ones
q.pending.PushBack(j.Id)
// notify all listeners in a non-blocking way
for c := range q.listeners {
select {
case c <- struct{}{}:
default:
}
}
c <- j.Id
} else if updateDependants {
for _, id := range j.Dependencies {
q.dependants[id] = append(q.dependants[id], j.Id)
@ -468,33 +470,69 @@ func (q *fsJobQueue) hasAllFinishedDependencies(j *job) (bool, error) {
return true, nil
}
// Select on a list of `chan uuid.UUID`s. Returns an error if one of the
// channels is closed.
// dequeueSuitableJob finds a suitable job in the list of pending jobs, removes it from there and returns it
//
// Uses reflect.Select(), because the `select` statement cannot operate on an
// unknown amount of channels.
func selectUUIDChannel(ctx context.Context, chans []chan uuid.UUID) (uuid.UUID, error) {
cases := []reflect.SelectCase{
{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.Done()),
},
}
for _, c := range chans {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(c),
})
// The job must meet the following conditions:
// - must be pending
// - its dependencies must be finished
// - must be of one of the type from jobTypes
//
// If a suitable job is not found, false is returned.
// If an error occurs during the search, it's returned.
func (q *fsJobQueue) dequeueSuitableJob(jobTypes []string) (*job, bool, error) {
el := q.pending.Front()
for el != nil {
id := el.Value.(uuid.UUID)
j, err := q.readJob(id)
if err != nil {
return nil, false, err
}
if !jobMatchesCriteria(j, jobTypes) {
el = el.Next()
continue
}
ready, err := q.hasAllFinishedDependencies(j)
if err != nil {
return nil, false, err
}
if ready {
q.pending.Remove(el)
return j, true, nil
}
el = el.Next()
}
chosen, value, recvOK := reflect.Select(cases)
if !recvOK {
if chosen == 0 {
return uuid.Nil, ctx.Err()
} else {
return uuid.Nil, errors.New("channel was closed unexpectedly")
return nil, false, nil
}
// removePendingJob removes a job with given ID from the list of pending jobs
//
// If the job isn't in the list, this is no-op.
func (q *fsJobQueue) removePendingJob(id uuid.UUID) {
el := q.pending.Front()
for el != nil {
if el.Value.(uuid.UUID) == id {
q.pending.Remove(el)
return
}
el = el.Next()
}
}
// jobMatchesCriteria returns true if it matches criteria defined in parameters
//
// Criteria:
// - the job's type is one of the acceptedJobTypes
func jobMatchesCriteria(j *job, acceptedJobTypes []string) bool {
for _, jt := range acceptedJobTypes {
if jt == j.Type {
return true
}
}
return value.Interface().(uuid.UUID), nil
return false
}