fsjobqueue: factor common functionality into maybeEnqueue()

This commit is contained in:
Lars Karlitski 2020-05-10 17:58:33 +02:00 committed by Tom Gundersen
parent 7592e38d3d
commit 3240f11647

View file

@ -84,25 +84,10 @@ func New(dir string) (*fsJobQueue, error) {
if err != nil {
return nil, err
}
// We only enqueue jobs that were previously pending.
if j.StartedAt.IsZero() {
continue
}
// Enqueue the job again if all dependencies have finished, or
// there are none. Otherwise, update dependants so that this
// check is done again when FinishJob() is called for a
// dependency.
n, err := q.countFinishedJobs(j.Dependencies)
err = q.maybeEnqueue(j, true)
if err != nil {
return nil, err
}
if n == len(j.Dependencies) {
q.pendingChannel(j.Type) <- j.Id
} else {
for _, dep := range j.Dependencies {
q.dependants[dep] = append(q.dependants[dep], j.Id)
}
}
}
return q, nil
@ -125,10 +110,16 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
return uuid.Nil, fmt.Errorf("error marshaling job arguments: %v", err)
}
// Verify dependencies and check how many of them are already finished.
finished, err := q.countFinishedJobs(j.Dependencies)
if err != nil {
return uuid.Nil, err
// Verify dependendencies early, so that the job doesn't get written
// when one of them doesn't exist.
for _, d := range j.Dependencies {
exists, err := q.db.Read(d.String(), nil)
if err != nil {
return uuid.Nil, err
}
if !exists {
return uuid.Nil, jobqueue.ErrNotExist
}
}
// Write the job before updating in-memory state, so that the latter
@ -138,15 +129,9 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
return uuid.Nil, fmt.Errorf("cannot write job: %v:", err)
}
// If all dependencies have finished, or there are none, queue the job.
// Otherwise, update dependants so that this check is done again when
// FinishJob() is called for a dependency.
if finished == len(j.Dependencies) {
q.pendingChannel(j.Type) <- j.Id
} else {
for _, id := range j.Dependencies {
q.dependants[id] = append(q.dependants[id], j.Id)
}
err = q.maybeEnqueue(&j, true)
if err != nil {
return uuid.Nil, err
}
return j.Id, nil
@ -224,13 +209,10 @@ func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
if err != nil {
return err
}
n, err := q.countFinishedJobs(dep.Dependencies)
err = q.maybeEnqueue(dep, false)
if err != nil {
return err
}
if n == len(dep.Dependencies) {
q.pendingChannel(dep.Type) <- dep.Id
}
}
delete(q.dependants, id)
@ -260,22 +242,6 @@ func (q *fsJobQueue) JobStatus(id uuid.UUID, result interface{}) (queued, starte
return
}
// Returns the number of finished jobs in `ids`.
func (q *fsJobQueue) countFinishedJobs(ids []uuid.UUID) (int, error) {
n := 0
for _, id := range ids {
j, err := q.readJob(id)
if err != nil {
return 0, err
}
if !j.FinishedAt.IsZero() {
n += 1
}
}
return n, nil
}
// Reads job with `id`. This is a thin wrapper around `q.db.Read`, which
// returns the job directly, or and error if a job with `id` does not exist.
func (q *fsJobQueue) readJob(id uuid.UUID) (*job, error) {
@ -291,6 +257,37 @@ func (q *fsJobQueue) readJob(id uuid.UUID) (*job, error) {
return &j, nil
}
// Enqueue `job` if it is pending and all its dependencies have finished.
// Update `q.dependants` if the job was not queued and updateDependants is true
// (i.e., when this is a new job).
func (q *fsJobQueue) maybeEnqueue(j *job, updateDependants bool) error {
if !j.StartedAt.IsZero() {
return nil
}
depsFinished := true
for _, id := range j.Dependencies {
j, err := q.readJob(id)
if err != nil {
return err
}
if j.FinishedAt.IsZero() {
depsFinished = false
break
}
}
if depsFinished {
q.pendingChannel(j.Type) <- j.Id
} else if updateDependants {
for _, id := range j.Dependencies {
q.dependants[id] = append(q.dependants[id], j.Id)
}
}
return nil
}
// Safe access to the pending channel for `jobType`. Channels are created on
// demand.
func (q *fsJobQueue) pendingChannel(jobType string) chan uuid.UUID {