worker/server: requeue unresponsive jobs

If a job is unresponsive the worker has most likely crashed or been shut
down and the in-progress job been lost.

Instead of failing these jobs, requeue them up to two times. Once a job is lost
a third time it fails. This avoids infinite loops.

This is implemented by extending FinishJob to RequeuOrFinish job. It takes a
max number of requeues as an argument, and if that is 0, it has the same
behavior as FinishJob used to have.

If the maximum number of requeues has not yet been reached, then the running
job is returned to pending state to be picked up again.
This commit is contained in:
Tom Gundersen 2022-03-18 21:39:32 +00:00 committed by Sanne Raymaekers
parent d02f666a4b
commit 626530818d
8 changed files with 216 additions and 61 deletions

View file

@ -55,6 +55,11 @@ const (
)
RETURNING token, type, args, queued_at, started_at`
sqlRequeue = `
UPDATE jobs
SET started_at = NULL, token = NULL, retries = retries + 1
WHERE id = $1 AND started_at IS NOT NULL AND finished_at IS NULL`
sqlInsertDependency = `INSERT INTO job_dependencies VALUES ($1, $2)`
sqlQueryDependencies = `
SELECT dependency_id
@ -66,7 +71,7 @@ const (
WHERE dependency_id = $1`
sqlQueryJob = `
SELECT type, args, channel, started_at, finished_at, canceled
SELECT type, args, channel, started_at, finished_at, retries, canceled
FROM jobs
WHERE id = $1`
sqlQueryJobStatus = `
@ -396,7 +401,7 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID,
return token, dependencies, jobType, args, nil
}
func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) error {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return fmt.Errorf("error connecting to database: %v", err)
@ -410,46 +415,57 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
defer func() {
err = tx.Rollback(context.Background())
if err != nil && !errors.Is(err, pgx.ErrTxClosed) {
q.logger.Error(err, "Error rolling back finish job transaction", "job_id", id.String())
q.logger.Error(err, "Error rolling back retry job transaction", "job_id", id.String())
}
}()
// Use double pointers for timestamps because they might be NULL, which would result in *time.Time == nil
var started, finished *time.Time
var jobType string
var started, finished *time.Time
var retries uint64
canceled := false
err = tx.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, nil, nil, &started, &finished, &canceled)
err = tx.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, nil, nil, &started, &finished, &retries, &canceled)
if err == pgx.ErrNoRows {
return jobqueue.ErrNotExist
}
if canceled {
return jobqueue.ErrCanceled
}
if finished != nil {
if started == nil || finished != nil {
return jobqueue.ErrNotRunning
}
// Remove from heartbeats
tag, err := tx.Exec(context.Background(), sqlDeleteHeartbeat, id)
if err != nil {
return fmt.Errorf("error finishing job %s: %v", id, err)
return fmt.Errorf("error removing job %s from heartbeats: %v", id, err)
}
if tag.RowsAffected() != 1 {
return jobqueue.ErrNotExist
}
err = tx.QueryRow(context.Background(), sqlFinishJob, result, id).Scan(&finished)
if retries >= maxRetries {
err = tx.QueryRow(context.Background(), sqlFinishJob, result, id).Scan(&finished)
if err == pgx.ErrNoRows {
return jobqueue.ErrNotExist
}
if err != nil {
return fmt.Errorf("error finishing job %s: %v", id, err)
}
} else {
tag, err = tx.Exec(context.Background(), sqlRequeue, id)
if err != nil {
return fmt.Errorf("error requeueing job %s: %v", id, err)
}
if err == pgx.ErrNoRows {
return jobqueue.ErrNotExist
}
if err != nil {
return fmt.Errorf("error finishing job %s: %v", id, err)
if tag.RowsAffected() != 1 {
return jobqueue.ErrNotExist
}
}
_, err = conn.Exec(context.Background(), sqlNotify)
_, err = tx.Exec(context.Background(), sqlNotify)
if err != nil {
return fmt.Errorf("error notifying jobs channel: %v", err)
}
@ -459,8 +475,11 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
return fmt.Errorf("unable to commit database transaction: %v", err)
}
q.logger.Info("Finished job", "job_type", jobType, "job_id", id.String())
if retries >= maxRetries {
q.logger.Info("Finished job", "job_type", jobType, "job_id", id.String())
} else {
q.logger.Info("Requeued job", "job_type", jobType, "job_id", id.String())
}
return nil
}
@ -530,7 +549,7 @@ func (q *DBJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, de
}
defer conn.Release()
err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, &args, &channel, nil, nil, nil)
err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, &args, &channel, nil, nil, nil, nil)
if err == pgx.ErrNoRows {
err = jobqueue.ErrNotExist
return

View file

@ -0,0 +1,16 @@
-- add the expires_at column
ALTER TABLE jobs
ADD COLUMN retries BIGINT DEFAULT 0;
-- We added a column, thus we have to recreate the view.
CREATE OR REPLACE VIEW ready_jobs AS
SELECT *
FROM jobs
WHERE started_at IS NULL
AND canceled = FALSE
AND id NOT IN (
SELECT job_id
FROM job_dependencies JOIN jobs ON dependency_id = id
WHERE finished_at IS NULL
)
ORDER BY queued_at ASC

View file

@ -49,9 +49,12 @@ type JobQueue interface {
// can be unmarshaled to the type given in Enqueue().
DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error)
// Mark the job with `id` as finished. `result` must fit the associated
// job type and must be serializable to JSON.
FinishJob(id uuid.UUID, result interface{}) error
// Tries to requeue a running job by its ID
//
// Returns the given job to the pending state. If the job has reached
// the maxRetries number of retries already, finish the job instead.
// `result` must fit the associated job type and must be serializable to JSON.
RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) error
// Cancel a job. Does nothing if the job has already finished.
CancelJob(id uuid.UUID) error
@ -95,6 +98,6 @@ var (
ErrNotExist = errors.New("job does not exist")
ErrNotPending = errors.New("job is not pending")
ErrNotRunning = errors.New("job is not running")
ErrCanceled = errors.New("job ws canceled")
ErrCanceled = errors.New("job was canceled")
ErrDequeueTimeout = errors.New("dequeue context timed out or was canceled")
)