jobqueue: return if a job was requeued or not

This commit is contained in:
Sanne Raymaekers 2024-11-05 14:24:00 +01:00
parent 64f479092d
commit 056b3c5ea6
5 changed files with 60 additions and 47 deletions

View file

@ -55,8 +55,9 @@ func testDeleteJob(t *testing.T, d db, q *dbjobqueue.DBJobQueue) {
res, err := json.Marshal(result)
require.NoError(t, err)
err = q.RequeueOrFinishJob(id, 0, res)
requeued, err := q.RequeueOrFinishJob(id, 0, res)
require.NoError(t, err)
require.False(t, requeued)
_, _, r, _, _, _, _, _, _, err := q.JobStatus(id)
require.NoError(t, err)

View file

@ -126,7 +126,7 @@ func New(dir string) (*fsJobQueue, error) {
if !j.StartedAt.IsZero() && j.FinishedAt.IsZero() && !j.Canceled {
// Fail older running jobs which don't have a token stored
if j.Token == uuid.Nil {
err = q.RequeueOrFinishJob(j.Id, 0, nil)
_, err = q.RequeueOrFinishJob(j.Id, 0, nil)
if err != nil {
return nil, fmt.Errorf("Error finishing job '%s' without a token: %v", j.Id, err)
}
@ -297,21 +297,21 @@ func (q *fsJobQueue) DequeueByID(ctx context.Context, id, wID uuid.UUID) (uuid.U
return j.Token, j.Dependencies, j.Type, j.Args, nil
}
func (q *fsJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) error {
func (q *fsJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) (bool, error) {
q.mu.Lock()
defer q.mu.Unlock()
j, err := q.readJob(id)
if err != nil {
return err
return false, err
}
if j.Canceled {
return jobqueue.ErrCanceled
return false, jobqueue.ErrCanceled
}
if j.StartedAt.IsZero() || !j.FinishedAt.IsZero() {
return jobqueue.ErrNotRunning
return false, jobqueue.ErrNotRunning
}
delete(q.jobIdByToken, j.Token)
@ -326,26 +326,27 @@ func (q *fsJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result
j.Result, err = json.Marshal(result)
if err != nil {
return fmt.Errorf("error marshaling result: %v", err)
return false, fmt.Errorf("error marshaling result: %v", err)
}
// Write before notifying dependants, because it will be read again.
err = q.db.Write(id.String(), j)
if err != nil {
return fmt.Errorf("error writing job %s: %v", id, err)
return false, fmt.Errorf("error writing job %s: %v", id, err)
}
for _, depid := range q.dependants[id] {
dep, err := q.readJob(depid)
if err != nil {
return err
return false, err
}
err = q.maybeEnqueue(dep, false)
if err != nil {
return err
return false, err
}
}
delete(q.dependants, id)
return false, nil
} else {
j.Token = uuid.Nil
j.StartedAt = time.Time{}
@ -355,7 +356,7 @@ func (q *fsJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result
// doesn't become corrupt when writing fails.
err = q.db.Write(j.Id.String(), j)
if err != nil {
return fmt.Errorf("cannot write job: %v", err)
return false, fmt.Errorf("cannot write job: %v", err)
}
// add the job to the list of pending ones
@ -368,9 +369,8 @@ func (q *fsJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result
default:
}
}
return true, nil
}
return nil
}
func (q *fsJobQueue) CancelJob(id uuid.UUID) error {

View file

@ -74,8 +74,9 @@ func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result
require.Equal(t, jobType, typ)
require.NotNil(t, args)
err = q.RequeueOrFinishJob(id, 0, result)
requeued, err := q.RequeueOrFinishJob(id, 0, result)
require.NoError(t, err)
require.False(t, requeued)
return id
}
@ -101,8 +102,9 @@ func testErrors(t *testing.T, q jobqueue.JobQueue) {
require.NoError(t, err)
require.Equal(t, id, idFromT)
err = q.RequeueOrFinishJob(id, 0, nil)
requeued, err := q.RequeueOrFinishJob(id, 0, nil)
require.NoError(t, err)
require.False(t, requeued)
// Make sure the token gets removed
id, err = q.IdFromToken(tok)
@ -376,7 +378,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) {
require.Equal(t, jobType, "clownfish")
require.True(t, canceled)
require.Nil(t, result)
err = q.RequeueOrFinishJob(id, 0, &testResult{})
_, err = q.RequeueOrFinishJob(id, 0, &testResult{})
require.Error(t, err)
// Cancel a running job, which should not dequeue the canceled job from above
@ -396,7 +398,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) {
require.Equal(t, jobType, "clownfish")
require.True(t, canceled)
require.Nil(t, result)
err = q.RequeueOrFinishJob(id, 0, &testResult{})
_, err = q.RequeueOrFinishJob(id, 0, &testResult{})
require.Error(t, err)
// Cancel a finished job, which is a no-op
@ -409,8 +411,9 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) {
require.Empty(t, deps)
require.Equal(t, "clownfish", typ)
require.Equal(t, json.RawMessage("null"), args)
err = q.RequeueOrFinishJob(id, 0, &testResult{})
requeued, err := q.RequeueOrFinishJob(id, 0, &testResult{})
require.NoError(t, err)
require.False(t, requeued)
err = q.CancelJob(id)
require.Error(t, err)
require.Equal(t, jobqueue.ErrNotRunning, err)
@ -424,13 +427,13 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) {
func testRequeue(t *testing.T, q jobqueue.JobQueue) {
// Requeue a non-existing job
err := q.RequeueOrFinishJob(uuid.New(), 1, nil)
_, err := q.RequeueOrFinishJob(uuid.New(), 1, nil)
require.Error(t, err)
// Requeue a pending job
id := pushTestJob(t, q, "clownfish", nil, nil, "")
require.NotEmpty(t, id)
err = q.RequeueOrFinishJob(id, 1, nil)
_, err = q.RequeueOrFinishJob(id, 1, nil)
require.Error(t, err)
// Requeue a running job
@ -441,8 +444,9 @@ func testRequeue(t *testing.T, q jobqueue.JobQueue) {
require.Empty(t, deps)
require.Equal(t, "clownfish", typ)
require.Equal(t, json.RawMessage("null"), args)
err = q.RequeueOrFinishJob(id, 1, nil)
requeued, err := q.RequeueOrFinishJob(id, 1, nil)
require.NoError(t, err)
require.True(t, requeued)
r, tok2, deps, typ, args, err := q.Dequeue(context.Background(), uuid.Nil, []string{"clownfish"}, []string{""})
require.NoError(t, err)
require.Equal(t, id, r)
@ -456,11 +460,12 @@ func testRequeue(t *testing.T, q jobqueue.JobQueue) {
require.Equal(t, jobType, "clownfish")
require.False(t, canceled)
require.Nil(t, result)
err = q.RequeueOrFinishJob(id, 0, &testResult{})
requeued, err = q.RequeueOrFinishJob(id, 0, &testResult{})
require.NoError(t, err)
require.False(t, requeued)
// Requeue a finished job
err = q.RequeueOrFinishJob(id, 1, nil)
_, err = q.RequeueOrFinishJob(id, 1, nil)
require.Error(t, err)
}
@ -471,8 +476,9 @@ func testRequeueLimit(t *testing.T, q jobqueue.JobQueue) {
_, _, _, _, _, err := q.Dequeue(context.Background(), uuid.Nil, []string{"clownfish"}, []string{""})
require.NoError(t, err)
// Requeue once
err = q.RequeueOrFinishJob(id, 1, nil)
requeued, err := q.RequeueOrFinishJob(id, 1, nil)
require.NoError(t, err)
require.True(t, requeued)
// Start again
_, _, _, _, _, err = q.Dequeue(context.Background(), uuid.Nil, []string{"clownfish"}, []string{""})
require.NoError(t, err)
@ -481,8 +487,9 @@ func testRequeueLimit(t *testing.T, q jobqueue.JobQueue) {
require.True(t, finished.IsZero())
require.Nil(t, result)
// Requeue a second time, this time finishing it
err = q.RequeueOrFinishJob(id, 1, &testResult{})
requeued, err = q.RequeueOrFinishJob(id, 1, &testResult{})
require.NoError(t, err)
require.False(t, requeued)
_, _, result, _, _, finished, _, _, _, err = q.JobStatus(id)
require.NoError(t, err)
require.False(t, finished.IsZero())
@ -514,8 +521,9 @@ func testHeartbeats(t *testing.T, q jobqueue.JobQueue) {
require.NoError(t, err)
require.Equal(t, id2, id)
err = q.RequeueOrFinishJob(id, 0, &testResult{})
requeued, err := q.RequeueOrFinishJob(id, 0, &testResult{})
require.NoError(t, err)
require.False(t, requeued)
// No heartbeats for finished job
require.Empty(t, q.Heartbeats(time.Second*0))
@ -536,8 +544,9 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) {
require.Equal(t, "octopus", typ)
require.NotNil(t, args)
err = q.RequeueOrFinishJob(one, 0, nil)
requeued, err := q.RequeueOrFinishJob(one, 0, nil)
require.NoError(t, err)
require.False(t, requeued)
require.Equal(t, two, finishNextTestJob(t, q, "octopus", testResult{}, nil))
})
@ -562,8 +571,9 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) {
_, _, _, _, err = q.DequeueByID(context.Background(), one, uuid.Nil)
require.Equal(t, jobqueue.ErrNotPending, err)
err = q.RequeueOrFinishJob(one, 0, nil)
requeued, err := q.RequeueOrFinishJob(one, 0, nil)
require.NoError(t, err)
require.False(t, requeued)
_, _, _, _, err = q.DequeueByID(context.Background(), one, uuid.Nil)
require.Equal(t, jobqueue.ErrNotPending, err)
@ -742,8 +752,9 @@ func testWorkers(t *testing.T, q jobqueue.JobQueue) {
err = q.UpdateWorkerStatus(uuid.New())
require.Equal(t, err, jobqueue.ErrWorkerNotExist)
err = q.RequeueOrFinishJob(one, 0, &testResult{})
requeued, err := q.RequeueOrFinishJob(one, 0, &testResult{})
require.NoError(t, err)
require.False(t, requeued)
err = q.DeleteWorker(w1)
require.NoError(t, err)

View file

@ -487,16 +487,16 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id, workerID uuid.UUID) (u
return token, dependencies, jobType, args, nil
}
func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) error {
func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) (bool, error) {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return fmt.Errorf("error connecting to database: %w", err)
return false, fmt.Errorf("error connecting to database: %w", err)
}
defer conn.Release()
tx, err := conn.Begin(context.Background())
if err != nil {
return fmt.Errorf("error starting database transaction: %w", err)
return false, fmt.Errorf("error starting database transaction: %w", err)
}
defer func() {
err = tx.Rollback(context.Background())
@ -513,60 +513,61 @@ func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result
canceled := false
err = tx.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, nil, nil, &started, &finished, &retries, &canceled)
if err == pgx.ErrNoRows {
return jobqueue.ErrNotExist
return false, jobqueue.ErrNotExist
}
if canceled {
return jobqueue.ErrCanceled
return false, jobqueue.ErrCanceled
}
if started == nil || finished != nil {
return jobqueue.ErrNotRunning
return false, jobqueue.ErrNotRunning
}
// Remove from heartbeats if token is null
tag, err := tx.Exec(context.Background(), sqlDeleteHeartbeat, id)
if err != nil {
return fmt.Errorf("error removing job %s from heartbeats: %w", id, err)
return false, fmt.Errorf("error removing job %s from heartbeats: %w", id, err)
}
if tag.RowsAffected() != 1 {
return jobqueue.ErrNotExist
return false, jobqueue.ErrNotExist
}
if retries >= maxRetries {
err = tx.QueryRow(context.Background(), sqlFinishJob, result, id).Scan(&finished)
if err == pgx.ErrNoRows {
return jobqueue.ErrNotExist
return false, jobqueue.ErrNotExist
}
if err != nil {
return fmt.Errorf("error finishing job %s: %w", id, err)
return false, fmt.Errorf("error finishing job %s: %w", id, err)
}
} else {
tag, err = tx.Exec(context.Background(), sqlRequeue, id)
if err != nil {
return fmt.Errorf("error requeueing job %s: %w", id, err)
return false, fmt.Errorf("error requeueing job %s: %w", id, err)
}
if tag.RowsAffected() != 1 {
return jobqueue.ErrNotExist
return false, jobqueue.ErrNotExist
}
}
_, err = tx.Exec(context.Background(), sqlNotify)
if err != nil {
return fmt.Errorf("error notifying jobs channel: %w", err)
return false, fmt.Errorf("error notifying jobs channel: %w", err)
}
err = tx.Commit(context.Background())
if err != nil {
return fmt.Errorf("unable to commit database transaction: %w", err)
return false, fmt.Errorf("unable to commit database transaction: %w", err)
}
if retries >= maxRetries {
q.logger.Info("Finished job", "job_type", jobType, "job_id", id.String())
return false, nil
} else {
q.logger.Info("Requeued job", "job_type", jobType, "job_id", id.String())
return true, nil
}
return nil
}
func (q *DBJobQueue) CancelJob(id uuid.UUID) error {

View file

@ -51,10 +51,10 @@ type JobQueue interface {
// Tries to requeue a running job by its ID
//
// Returns the given job to the pending state. If the job has reached
// the maxRetries number of retries already, finish the job instead.
// If the job has reached the maxRetries number of retries already, finish the job instead.
// `result` must fit the associated job type and must be serializable to JSON.
RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) error
// Fills in result, and returns if the job was requeued, or an error.
RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) (bool, error)
// Cancel a job. Does nothing if the job has already finished.
CancelJob(id uuid.UUID) error