From 056b3c5ea6319602beb0cb1a9165603f3567ff1f Mon Sep 17 00:00:00 2001 From: Sanne Raymaekers Date: Tue, 5 Nov 2024 14:24:00 +0100 Subject: [PATCH] jobqueue: return if a job was requeued or not --- cmd/osbuild-service-maintenance/db_test.go | 3 +- internal/jobqueue/fsjobqueue/fsjobqueue.go | 24 +++++------ .../jobqueue/jobqueuetest/jobqueuetest.go | 43 ++++++++++++------- pkg/jobqueue/dbjobqueue/dbjobqueue.go | 31 ++++++------- pkg/jobqueue/jobqueue.go | 6 +-- 5 files changed, 60 insertions(+), 47 deletions(-) diff --git a/cmd/osbuild-service-maintenance/db_test.go b/cmd/osbuild-service-maintenance/db_test.go index e81ebed2b..7a0eb2588 100644 --- a/cmd/osbuild-service-maintenance/db_test.go +++ b/cmd/osbuild-service-maintenance/db_test.go @@ -55,8 +55,9 @@ func testDeleteJob(t *testing.T, d db, q *dbjobqueue.DBJobQueue) { res, err := json.Marshal(result) require.NoError(t, err) - err = q.RequeueOrFinishJob(id, 0, res) + requeued, err := q.RequeueOrFinishJob(id, 0, res) require.NoError(t, err) + require.False(t, requeued) _, _, r, _, _, _, _, _, _, err := q.JobStatus(id) require.NoError(t, err) diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index 938031903..c8f9e4b38 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -126,7 +126,7 @@ func New(dir string) (*fsJobQueue, error) { if !j.StartedAt.IsZero() && j.FinishedAt.IsZero() && !j.Canceled { // Fail older running jobs which don't have a token stored if j.Token == uuid.Nil { - err = q.RequeueOrFinishJob(j.Id, 0, nil) + _, err = q.RequeueOrFinishJob(j.Id, 0, nil) if err != nil { return nil, fmt.Errorf("Error finishing job '%s' without a token: %v", j.Id, err) } @@ -297,21 +297,21 @@ func (q *fsJobQueue) DequeueByID(ctx context.Context, id, wID uuid.UUID) (uuid.U return j.Token, j.Dependencies, j.Type, j.Args, nil } -func (q *fsJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) error { +func (q *fsJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) (bool, error) { q.mu.Lock() defer q.mu.Unlock() j, err := q.readJob(id) if err != nil { - return err + return false, err } if j.Canceled { - return jobqueue.ErrCanceled + return false, jobqueue.ErrCanceled } if j.StartedAt.IsZero() || !j.FinishedAt.IsZero() { - return jobqueue.ErrNotRunning + return false, jobqueue.ErrNotRunning } delete(q.jobIdByToken, j.Token) @@ -326,26 +326,27 @@ func (q *fsJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result j.Result, err = json.Marshal(result) if err != nil { - return fmt.Errorf("error marshaling result: %v", err) + return false, fmt.Errorf("error marshaling result: %v", err) } // Write before notifying dependants, because it will be read again. err = q.db.Write(id.String(), j) if err != nil { - return fmt.Errorf("error writing job %s: %v", id, err) + return false, fmt.Errorf("error writing job %s: %v", id, err) } for _, depid := range q.dependants[id] { dep, err := q.readJob(depid) if err != nil { - return err + return false, err } err = q.maybeEnqueue(dep, false) if err != nil { - return err + return false, err } } delete(q.dependants, id) + return false, nil } else { j.Token = uuid.Nil j.StartedAt = time.Time{} @@ -355,7 +356,7 @@ func (q *fsJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result // doesn't become corrupt when writing fails. err = q.db.Write(j.Id.String(), j) if err != nil { - return fmt.Errorf("cannot write job: %v", err) + return false, fmt.Errorf("cannot write job: %v", err) } // add the job to the list of pending ones @@ -368,9 +369,8 @@ func (q *fsJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result default: } } + return true, nil } - - return nil } func (q *fsJobQueue) CancelJob(id uuid.UUID) error { diff --git a/internal/jobqueue/jobqueuetest/jobqueuetest.go b/internal/jobqueue/jobqueuetest/jobqueuetest.go index 11cdd2433..bfd431cd5 100644 --- a/internal/jobqueue/jobqueuetest/jobqueuetest.go +++ b/internal/jobqueue/jobqueuetest/jobqueuetest.go @@ -74,8 +74,9 @@ func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result require.Equal(t, jobType, typ) require.NotNil(t, args) - err = q.RequeueOrFinishJob(id, 0, result) + requeued, err := q.RequeueOrFinishJob(id, 0, result) require.NoError(t, err) + require.False(t, requeued) return id } @@ -101,8 +102,9 @@ func testErrors(t *testing.T, q jobqueue.JobQueue) { require.NoError(t, err) require.Equal(t, id, idFromT) - err = q.RequeueOrFinishJob(id, 0, nil) + requeued, err := q.RequeueOrFinishJob(id, 0, nil) require.NoError(t, err) + require.False(t, requeued) // Make sure the token gets removed id, err = q.IdFromToken(tok) @@ -376,7 +378,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, jobType, "clownfish") require.True(t, canceled) require.Nil(t, result) - err = q.RequeueOrFinishJob(id, 0, &testResult{}) + _, err = q.RequeueOrFinishJob(id, 0, &testResult{}) require.Error(t, err) // Cancel a running job, which should not dequeue the canceled job from above @@ -396,7 +398,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, jobType, "clownfish") require.True(t, canceled) require.Nil(t, result) - err = q.RequeueOrFinishJob(id, 0, &testResult{}) + _, err = q.RequeueOrFinishJob(id, 0, &testResult{}) require.Error(t, err) // Cancel a finished job, which is a no-op @@ -409,8 +411,9 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { require.Empty(t, deps) require.Equal(t, "clownfish", typ) require.Equal(t, json.RawMessage("null"), args) - err = q.RequeueOrFinishJob(id, 0, &testResult{}) + requeued, err := q.RequeueOrFinishJob(id, 0, &testResult{}) require.NoError(t, err) + require.False(t, requeued) err = q.CancelJob(id) require.Error(t, err) require.Equal(t, jobqueue.ErrNotRunning, err) @@ -424,13 +427,13 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { func testRequeue(t *testing.T, q jobqueue.JobQueue) { // Requeue a non-existing job - err := q.RequeueOrFinishJob(uuid.New(), 1, nil) + _, err := q.RequeueOrFinishJob(uuid.New(), 1, nil) require.Error(t, err) // Requeue a pending job id := pushTestJob(t, q, "clownfish", nil, nil, "") require.NotEmpty(t, id) - err = q.RequeueOrFinishJob(id, 1, nil) + _, err = q.RequeueOrFinishJob(id, 1, nil) require.Error(t, err) // Requeue a running job @@ -441,8 +444,9 @@ func testRequeue(t *testing.T, q jobqueue.JobQueue) { require.Empty(t, deps) require.Equal(t, "clownfish", typ) require.Equal(t, json.RawMessage("null"), args) - err = q.RequeueOrFinishJob(id, 1, nil) + requeued, err := q.RequeueOrFinishJob(id, 1, nil) require.NoError(t, err) + require.True(t, requeued) r, tok2, deps, typ, args, err := q.Dequeue(context.Background(), uuid.Nil, []string{"clownfish"}, []string{""}) require.NoError(t, err) require.Equal(t, id, r) @@ -456,11 +460,12 @@ func testRequeue(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, jobType, "clownfish") require.False(t, canceled) require.Nil(t, result) - err = q.RequeueOrFinishJob(id, 0, &testResult{}) + requeued, err = q.RequeueOrFinishJob(id, 0, &testResult{}) require.NoError(t, err) + require.False(t, requeued) // Requeue a finished job - err = q.RequeueOrFinishJob(id, 1, nil) + _, err = q.RequeueOrFinishJob(id, 1, nil) require.Error(t, err) } @@ -471,8 +476,9 @@ func testRequeueLimit(t *testing.T, q jobqueue.JobQueue) { _, _, _, _, _, err := q.Dequeue(context.Background(), uuid.Nil, []string{"clownfish"}, []string{""}) require.NoError(t, err) // Requeue once - err = q.RequeueOrFinishJob(id, 1, nil) + requeued, err := q.RequeueOrFinishJob(id, 1, nil) require.NoError(t, err) + require.True(t, requeued) // Start again _, _, _, _, _, err = q.Dequeue(context.Background(), uuid.Nil, []string{"clownfish"}, []string{""}) require.NoError(t, err) @@ -481,8 +487,9 @@ func testRequeueLimit(t *testing.T, q jobqueue.JobQueue) { require.True(t, finished.IsZero()) require.Nil(t, result) // Requeue a second time, this time finishing it - err = q.RequeueOrFinishJob(id, 1, &testResult{}) + requeued, err = q.RequeueOrFinishJob(id, 1, &testResult{}) require.NoError(t, err) + require.False(t, requeued) _, _, result, _, _, finished, _, _, _, err = q.JobStatus(id) require.NoError(t, err) require.False(t, finished.IsZero()) @@ -514,8 +521,9 @@ func testHeartbeats(t *testing.T, q jobqueue.JobQueue) { require.NoError(t, err) require.Equal(t, id2, id) - err = q.RequeueOrFinishJob(id, 0, &testResult{}) + requeued, err := q.RequeueOrFinishJob(id, 0, &testResult{}) require.NoError(t, err) + require.False(t, requeued) // No heartbeats for finished job require.Empty(t, q.Heartbeats(time.Second*0)) @@ -536,8 +544,9 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, "octopus", typ) require.NotNil(t, args) - err = q.RequeueOrFinishJob(one, 0, nil) + requeued, err := q.RequeueOrFinishJob(one, 0, nil) require.NoError(t, err) + require.False(t, requeued) require.Equal(t, two, finishNextTestJob(t, q, "octopus", testResult{}, nil)) }) @@ -562,8 +571,9 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) { _, _, _, _, err = q.DequeueByID(context.Background(), one, uuid.Nil) require.Equal(t, jobqueue.ErrNotPending, err) - err = q.RequeueOrFinishJob(one, 0, nil) + requeued, err := q.RequeueOrFinishJob(one, 0, nil) require.NoError(t, err) + require.False(t, requeued) _, _, _, _, err = q.DequeueByID(context.Background(), one, uuid.Nil) require.Equal(t, jobqueue.ErrNotPending, err) @@ -742,8 +752,9 @@ func testWorkers(t *testing.T, q jobqueue.JobQueue) { err = q.UpdateWorkerStatus(uuid.New()) require.Equal(t, err, jobqueue.ErrWorkerNotExist) - err = q.RequeueOrFinishJob(one, 0, &testResult{}) + requeued, err := q.RequeueOrFinishJob(one, 0, &testResult{}) require.NoError(t, err) + require.False(t, requeued) err = q.DeleteWorker(w1) require.NoError(t, err) diff --git a/pkg/jobqueue/dbjobqueue/dbjobqueue.go b/pkg/jobqueue/dbjobqueue/dbjobqueue.go index 81135e66b..35e5531e9 100644 --- a/pkg/jobqueue/dbjobqueue/dbjobqueue.go +++ b/pkg/jobqueue/dbjobqueue/dbjobqueue.go @@ -487,16 +487,16 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id, workerID uuid.UUID) (u return token, dependencies, jobType, args, nil } -func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) error { +func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) (bool, error) { conn, err := q.pool.Acquire(context.Background()) if err != nil { - return fmt.Errorf("error connecting to database: %w", err) + return false, fmt.Errorf("error connecting to database: %w", err) } defer conn.Release() tx, err := conn.Begin(context.Background()) if err != nil { - return fmt.Errorf("error starting database transaction: %w", err) + return false, fmt.Errorf("error starting database transaction: %w", err) } defer func() { err = tx.Rollback(context.Background()) @@ -513,60 +513,61 @@ func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result canceled := false err = tx.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, nil, nil, &started, &finished, &retries, &canceled) if err == pgx.ErrNoRows { - return jobqueue.ErrNotExist + return false, jobqueue.ErrNotExist } if canceled { - return jobqueue.ErrCanceled + return false, jobqueue.ErrCanceled } if started == nil || finished != nil { - return jobqueue.ErrNotRunning + return false, jobqueue.ErrNotRunning } // Remove from heartbeats if token is null tag, err := tx.Exec(context.Background(), sqlDeleteHeartbeat, id) if err != nil { - return fmt.Errorf("error removing job %s from heartbeats: %w", id, err) + return false, fmt.Errorf("error removing job %s from heartbeats: %w", id, err) } if tag.RowsAffected() != 1 { - return jobqueue.ErrNotExist + return false, jobqueue.ErrNotExist } if retries >= maxRetries { err = tx.QueryRow(context.Background(), sqlFinishJob, result, id).Scan(&finished) if err == pgx.ErrNoRows { - return jobqueue.ErrNotExist + return false, jobqueue.ErrNotExist } if err != nil { - return fmt.Errorf("error finishing job %s: %w", id, err) + return false, fmt.Errorf("error finishing job %s: %w", id, err) } } else { tag, err = tx.Exec(context.Background(), sqlRequeue, id) if err != nil { - return fmt.Errorf("error requeueing job %s: %w", id, err) + return false, fmt.Errorf("error requeueing job %s: %w", id, err) } if tag.RowsAffected() != 1 { - return jobqueue.ErrNotExist + return false, jobqueue.ErrNotExist } } _, err = tx.Exec(context.Background(), sqlNotify) if err != nil { - return fmt.Errorf("error notifying jobs channel: %w", err) + return false, fmt.Errorf("error notifying jobs channel: %w", err) } err = tx.Commit(context.Background()) if err != nil { - return fmt.Errorf("unable to commit database transaction: %w", err) + return false, fmt.Errorf("unable to commit database transaction: %w", err) } if retries >= maxRetries { q.logger.Info("Finished job", "job_type", jobType, "job_id", id.String()) + return false, nil } else { q.logger.Info("Requeued job", "job_type", jobType, "job_id", id.String()) + return true, nil } - return nil } func (q *DBJobQueue) CancelJob(id uuid.UUID) error { diff --git a/pkg/jobqueue/jobqueue.go b/pkg/jobqueue/jobqueue.go index cb88ee39a..f92a00730 100644 --- a/pkg/jobqueue/jobqueue.go +++ b/pkg/jobqueue/jobqueue.go @@ -51,10 +51,10 @@ type JobQueue interface { // Tries to requeue a running job by its ID // - // Returns the given job to the pending state. If the job has reached - // the maxRetries number of retries already, finish the job instead. + // If the job has reached the maxRetries number of retries already, finish the job instead. // `result` must fit the associated job type and must be serializable to JSON. - RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) error + // Fills in result, and returns if the job was requeued, or an error. + RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) (bool, error) // Cancel a job. Does nothing if the job has already finished. CancelJob(id uuid.UUID) error