diff --git a/cmd/osbuild-service-maintenance/db_test.go b/cmd/osbuild-service-maintenance/db_test.go index e1861111c..391895b24 100644 --- a/cmd/osbuild-service-maintenance/db_test.go +++ b/cmd/osbuild-service-maintenance/db_test.go @@ -55,7 +55,7 @@ func testDeleteJob(t *testing.T, d db, q *dbjobqueue.DBJobQueue) { res, err := json.Marshal(result) require.NoError(t, err) - err = q.FinishJob(id, res) + err = q.RequeueOrFinishJob(id, 0, res) require.NoError(t, err) _, _, r, _, _, _, _, _, _, err := q.JobStatus(id) diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index da187b8c6..44d512c2d 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -75,7 +75,8 @@ type job struct { FinishedAt time.Time `json:"finished_at,omitempty"` ExpiresAt time.Time `json:"expires_at,omitempty"` - Canceled bool `json:"canceled,omitempty"` + Retries uint64 `json:"retries"` + Canceled bool `json:"canceled,omitempty"` } // Create a new fsJobQueue object for `dir`. This object must have exclusive @@ -111,7 +112,7 @@ func New(dir string) (*fsJobQueue, error) { if !j.StartedAt.IsZero() && j.FinishedAt.IsZero() && !j.Canceled { // Fail older running jobs which don't have a token stored if j.Token == uuid.Nil { - err = q.FinishJob(j.Id, nil) + err = q.RequeueOrFinishJob(j.Id, 0, nil) if err != nil { return nil, fmt.Errorf("Error finishing job '%s' without a token: %v", j.Id, err) } @@ -274,7 +275,7 @@ func (q *fsJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, return j.Token, j.Dependencies, j.Type, j.Args, nil } -func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error { +func (q *fsJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) error { q.mu.Lock() defer q.mu.Unlock() @@ -291,33 +292,57 @@ func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error { return jobqueue.ErrNotRunning } - j.FinishedAt = time.Now() - - j.Result, err = json.Marshal(result) - if err != nil { - return fmt.Errorf("error marshaling result: %v", err) - } - - delete(q.heartbeats, j.Token) delete(q.jobIdByToken, j.Token) + delete(q.heartbeats, j.Token) - // Write before notifying dependants, because it will be read again. - err = q.db.Write(id.String(), j) - if err != nil { - return fmt.Errorf("error writing job %s: %v", id, err) - } + if j.Retries >= maxRetries { + j.FinishedAt = time.Now() - for _, depid := range q.dependants[id] { - dep, err := q.readJob(depid) + j.Result, err = json.Marshal(result) if err != nil { - return err + return fmt.Errorf("error marshaling result: %v", err) } - err = q.maybeEnqueue(dep, false) + + // Write before notifying dependants, because it will be read again. + err = q.db.Write(id.String(), j) if err != nil { - return err + return fmt.Errorf("error writing job %s: %v", id, err) + } + + for _, depid := range q.dependants[id] { + dep, err := q.readJob(depid) + if err != nil { + return err + } + err = q.maybeEnqueue(dep, false) + if err != nil { + return err + } + } + delete(q.dependants, id) + } else { + j.Token = uuid.Nil + j.StartedAt = time.Time{} + j.Retries += 1 + + // Write the job before updating in-memory state, so that the latter + // doesn't become corrupt when writing fails. + err = q.db.Write(j.Id.String(), j) + if err != nil { + return fmt.Errorf("cannot write job: %v", err) + } + + // add the job to the list of pending ones + q.pending.PushBack(j.Id) + + // notify all listeners in a non-blocking way + for c := range q.listeners { + select { + case c <- struct{}{}: + default: + } } } - delete(q.dependants, id) return nil } diff --git a/internal/jobqueue/jobqueuetest/jobqueuetest.go b/internal/jobqueue/jobqueuetest/jobqueuetest.go index 8fa6d775d..fa2d688e8 100644 --- a/internal/jobqueue/jobqueuetest/jobqueuetest.go +++ b/internal/jobqueue/jobqueuetest/jobqueuetest.go @@ -33,6 +33,8 @@ func TestJobQueue(t *testing.T, makeJobQueue MakeJobQueue) { t.Run("errors", wrap(testErrors)) t.Run("args", wrap(testArgs)) t.Run("cancel", wrap(testCancel)) + t.Run("requeue", wrap(testRequeue)) + t.Run("requeue-limit", wrap(testRequeueLimit)) t.Run("job-types", wrap(testJobTypes)) t.Run("dependencies", wrap(testDependencies)) t.Run("multiple-workers", wrap(testMultipleWorkers)) @@ -61,7 +63,7 @@ func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result require.Equal(t, jobType, typ) require.NotNil(t, args) - err = q.FinishJob(id, result) + err = q.RequeueOrFinishJob(id, 0, result) require.NoError(t, err) return id @@ -88,7 +90,7 @@ func testErrors(t *testing.T, q jobqueue.JobQueue) { require.NoError(t, err) require.Equal(t, id, idFromT) - err = q.FinishJob(id, nil) + err = q.RequeueOrFinishJob(id, 0, nil) require.NoError(t, err) // Make sure the token gets removed @@ -363,7 +365,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, jobType, "clownfish") require.True(t, canceled) require.Nil(t, result) - err = q.FinishJob(id, &testResult{}) + err = q.RequeueOrFinishJob(id, 0, &testResult{}) require.Error(t, err) // Cancel a running job, which should not dequeue the canceled job from above @@ -383,7 +385,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, jobType, "clownfish") require.True(t, canceled) require.Nil(t, result) - err = q.FinishJob(id, &testResult{}) + err = q.RequeueOrFinishJob(id, 0, &testResult{}) require.Error(t, err) // Cancel a finished job, which is a no-op @@ -396,7 +398,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { require.Empty(t, deps) require.Equal(t, "clownfish", typ) require.Equal(t, json.RawMessage("null"), args) - err = q.FinishJob(id, &testResult{}) + err = q.RequeueOrFinishJob(id, 0, &testResult{}) require.NoError(t, err) err = q.CancelJob(id) require.Error(t, err) @@ -409,6 +411,73 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { require.NoError(t, err) } +func testRequeue(t *testing.T, q jobqueue.JobQueue) { + // Requeue a non-existing job + err := q.RequeueOrFinishJob(uuid.New(), 1, nil) + require.Error(t, err) + + // Requeue a pending job + id := pushTestJob(t, q, "clownfish", nil, nil, "") + require.NotEmpty(t, id) + err = q.RequeueOrFinishJob(id, 1, nil) + require.Error(t, err) + + // Requeue a running job + r, tok1, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}, []string{""}) + require.NoError(t, err) + require.Equal(t, id, r) + require.NotEmpty(t, tok1) + require.Empty(t, deps) + require.Equal(t, "clownfish", typ) + require.Equal(t, json.RawMessage("null"), args) + err = q.RequeueOrFinishJob(id, 1, nil) + require.NoError(t, err) + r, tok2, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}, []string{""}) + require.NoError(t, err) + require.Equal(t, id, r) + require.NotEmpty(t, tok2) + require.NotEqual(t, tok1, tok2) + require.Empty(t, deps) + require.Equal(t, "clownfish", typ) + require.Equal(t, json.RawMessage("null"), args) + jobType, _, result, _, _, _, canceled, _, _, err := q.JobStatus(id) + require.NoError(t, err) + require.Equal(t, jobType, "clownfish") + require.False(t, canceled) + require.Nil(t, result) + err = q.RequeueOrFinishJob(id, 0, &testResult{}) + require.NoError(t, err) + + // Requeue a finished job + err = q.RequeueOrFinishJob(id, 1, nil) + require.Error(t, err) +} + +func testRequeueLimit(t *testing.T, q jobqueue.JobQueue) { + // Start a job + id := pushTestJob(t, q, "clownfish", nil, nil, "") + require.NotEmpty(t, id) + _, _, _, _, _, err := q.Dequeue(context.Background(), []string{"clownfish"}, []string{""}) + require.NoError(t, err) + // Requeue once + err = q.RequeueOrFinishJob(id, 1, nil) + require.NoError(t, err) + // Start again + _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"clownfish"}, []string{""}) + require.NoError(t, err) + _, _, result, _, _, finished, _, _, _, err := q.JobStatus(id) + require.NoError(t, err) + require.True(t, finished.IsZero()) + require.Nil(t, result) + // Requeue a second time, this time finishing it + err = q.RequeueOrFinishJob(id, 1, &testResult{}) + require.NoError(t, err) + _, _, result, _, _, finished, _, _, _, err = q.JobStatus(id) + require.NoError(t, err) + require.False(t, finished.IsZero()) + require.NotNil(t, result) +} + func testHeartbeats(t *testing.T, q jobqueue.JobQueue) { id := pushTestJob(t, q, "octopus", nil, nil, "") // No heartbeats for queued job @@ -434,7 +503,7 @@ func testHeartbeats(t *testing.T, q jobqueue.JobQueue) { require.NoError(t, err) require.Equal(t, id2, id) - err = q.FinishJob(id, &testResult{}) + err = q.RequeueOrFinishJob(id, 0, &testResult{}) require.NoError(t, err) // No heartbeats for finished job @@ -456,7 +525,7 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, "octopus", typ) require.NotNil(t, args) - err = q.FinishJob(one, nil) + err = q.RequeueOrFinishJob(one, 0, nil) require.NoError(t, err) require.Equal(t, two, finishNextTestJob(t, q, "octopus", testResult{}, nil)) @@ -482,7 +551,7 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) { _, _, _, _, err = q.DequeueByID(context.Background(), one) require.Equal(t, jobqueue.ErrNotPending, err) - err = q.FinishJob(one, nil) + err = q.RequeueOrFinishJob(one, 0, nil) require.NoError(t, err) _, _, _, _, err = q.DequeueByID(context.Background(), one) diff --git a/internal/worker/server.go b/internal/worker/server.go index 6e170edb6..d0b9604b6 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -103,6 +103,8 @@ func (s *Server) Handler() http.Handler { return e } +const maxHeartbeatRetries = 2 + // This function should be started as a goroutine // Every 30 seconds it goes through all running jobs, removing any unresponsive ones. // It fails jobs which fail to check if they cancelled for more than 2 minutes. @@ -110,11 +112,10 @@ func (s *Server) WatchHeartbeats() { //nolint:staticcheck // avoid SA1015, this is an endless function for range time.Tick(time.Second * 30) { for _, token := range s.jobs.Heartbeats(time.Second * 120) { - id, _ := s.jobs.IdFromToken(token) - logrus.Infof("Removing unresponsive job: %s\n", id) - missingHeartbeatResult := JobResult{ - JobError: clienterrors.WorkerClientError(clienterrors.ErrorJobMissingHeartbeat, "Worker running this job stopped responding.", nil), + JobError: clienterrors.WorkerClientError(clienterrors.ErrorJobMissingHeartbeat, + fmt.Sprintf("Workers running this job stopped responding more than %d times.", maxHeartbeatRetries), + nil), } resJson, err := json.Marshal(missingHeartbeatResult) @@ -122,9 +123,9 @@ func (s *Server) WatchHeartbeats() { logrus.Panicf("Cannot marshal the heartbeat error: %v", err) } - err = s.FinishJob(token, resJson) + err = s.RequeueOrFinishJob(token, maxHeartbeatRetries, resJson) if err != nil { - logrus.Errorf("Error finishing unresponsive job: %v", err) + logrus.Errorf("Error requeueing or finishing unresponsive job: %v", err) } } } @@ -618,6 +619,10 @@ func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, } func (s *Server) FinishJob(token uuid.UUID, result json.RawMessage) error { + return s.RequeueOrFinishJob(token, 0, result) +} + +func (s *Server) RequeueOrFinishJob(token uuid.UUID, maxRetries uint64, result json.RawMessage) error { jobId, err := s.jobs.IdFromToken(token) if err != nil { switch err { @@ -628,7 +633,7 @@ func (s *Server) FinishJob(token uuid.UUID, result json.RawMessage) error { } } - err = s.jobs.FinishJob(jobId, result) + err = s.jobs.RequeueOrFinishJob(jobId, maxRetries, result) if err != nil { switch err { case jobqueue.ErrNotRunning: diff --git a/pkg/jobqueue/dbjobqueue/dbjobqueue.go b/pkg/jobqueue/dbjobqueue/dbjobqueue.go index 92968f1e0..9ec482982 100644 --- a/pkg/jobqueue/dbjobqueue/dbjobqueue.go +++ b/pkg/jobqueue/dbjobqueue/dbjobqueue.go @@ -55,6 +55,11 @@ const ( ) RETURNING token, type, args, queued_at, started_at` + sqlRequeue = ` + UPDATE jobs + SET started_at = NULL, token = NULL, retries = retries + 1 + WHERE id = $1 AND started_at IS NOT NULL AND finished_at IS NULL` + sqlInsertDependency = `INSERT INTO job_dependencies VALUES ($1, $2)` sqlQueryDependencies = ` SELECT dependency_id @@ -66,7 +71,7 @@ const ( WHERE dependency_id = $1` sqlQueryJob = ` - SELECT type, args, channel, started_at, finished_at, canceled + SELECT type, args, channel, started_at, finished_at, retries, canceled FROM jobs WHERE id = $1` sqlQueryJobStatus = ` @@ -396,7 +401,7 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, return token, dependencies, jobType, args, nil } -func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error { +func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) error { conn, err := q.pool.Acquire(context.Background()) if err != nil { return fmt.Errorf("error connecting to database: %v", err) @@ -410,46 +415,57 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error { defer func() { err = tx.Rollback(context.Background()) if err != nil && !errors.Is(err, pgx.ErrTxClosed) { - q.logger.Error(err, "Error rolling back finish job transaction", "job_id", id.String()) + q.logger.Error(err, "Error rolling back retry job transaction", "job_id", id.String()) } }() // Use double pointers for timestamps because they might be NULL, which would result in *time.Time == nil - var started, finished *time.Time var jobType string + var started, finished *time.Time + var retries uint64 canceled := false - err = tx.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, nil, nil, &started, &finished, &canceled) + err = tx.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, nil, nil, &started, &finished, &retries, &canceled) if err == pgx.ErrNoRows { return jobqueue.ErrNotExist } if canceled { return jobqueue.ErrCanceled } - if finished != nil { + if started == nil || finished != nil { return jobqueue.ErrNotRunning } // Remove from heartbeats tag, err := tx.Exec(context.Background(), sqlDeleteHeartbeat, id) if err != nil { - return fmt.Errorf("error finishing job %s: %v", id, err) + return fmt.Errorf("error removing job %s from heartbeats: %v", id, err) } if tag.RowsAffected() != 1 { return jobqueue.ErrNotExist } - err = tx.QueryRow(context.Background(), sqlFinishJob, result, id).Scan(&finished) + if retries >= maxRetries { + err = tx.QueryRow(context.Background(), sqlFinishJob, result, id).Scan(&finished) + if err == pgx.ErrNoRows { + return jobqueue.ErrNotExist + } + if err != nil { + return fmt.Errorf("error finishing job %s: %v", id, err) + } + } else { + tag, err = tx.Exec(context.Background(), sqlRequeue, id) + if err != nil { + return fmt.Errorf("error requeueing job %s: %v", id, err) + } - if err == pgx.ErrNoRows { - return jobqueue.ErrNotExist - } - if err != nil { - return fmt.Errorf("error finishing job %s: %v", id, err) + if tag.RowsAffected() != 1 { + return jobqueue.ErrNotExist + } } - _, err = conn.Exec(context.Background(), sqlNotify) + _, err = tx.Exec(context.Background(), sqlNotify) if err != nil { return fmt.Errorf("error notifying jobs channel: %v", err) } @@ -459,8 +475,11 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error { return fmt.Errorf("unable to commit database transaction: %v", err) } - q.logger.Info("Finished job", "job_type", jobType, "job_id", id.String()) - + if retries >= maxRetries { + q.logger.Info("Finished job", "job_type", jobType, "job_id", id.String()) + } else { + q.logger.Info("Requeued job", "job_type", jobType, "job_id", id.String()) + } return nil } @@ -530,7 +549,7 @@ func (q *DBJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, de } defer conn.Release() - err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, &args, &channel, nil, nil, nil) + err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, &args, &channel, nil, nil, nil, nil) if err == pgx.ErrNoRows { err = jobqueue.ErrNotExist return diff --git a/pkg/jobqueue/dbjobqueue/schemas/006_retry_count.sql b/pkg/jobqueue/dbjobqueue/schemas/006_retry_count.sql new file mode 100644 index 000000000..e6020bcc0 --- /dev/null +++ b/pkg/jobqueue/dbjobqueue/schemas/006_retry_count.sql @@ -0,0 +1,16 @@ +-- add the expires_at column +ALTER TABLE jobs +ADD COLUMN retries BIGINT DEFAULT 0; + +-- We added a column, thus we have to recreate the view. +CREATE OR REPLACE VIEW ready_jobs AS +SELECT * +FROM jobs +WHERE started_at IS NULL + AND canceled = FALSE + AND id NOT IN ( + SELECT job_id + FROM job_dependencies JOIN jobs ON dependency_id = id + WHERE finished_at IS NULL +) +ORDER BY queued_at ASC diff --git a/pkg/jobqueue/jobqueue.go b/pkg/jobqueue/jobqueue.go index 3d5bd8dd0..8b11f5f86 100644 --- a/pkg/jobqueue/jobqueue.go +++ b/pkg/jobqueue/jobqueue.go @@ -49,9 +49,12 @@ type JobQueue interface { // can be unmarshaled to the type given in Enqueue(). DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) - // Mark the job with `id` as finished. `result` must fit the associated - // job type and must be serializable to JSON. - FinishJob(id uuid.UUID, result interface{}) error + // Tries to requeue a running job by its ID + // + // Returns the given job to the pending state. If the job has reached + // the maxRetries number of retries already, finish the job instead. + // `result` must fit the associated job type and must be serializable to JSON. + RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) error // Cancel a job. Does nothing if the job has already finished. CancelJob(id uuid.UUID) error @@ -95,6 +98,6 @@ var ( ErrNotExist = errors.New("job does not exist") ErrNotPending = errors.New("job is not pending") ErrNotRunning = errors.New("job is not running") - ErrCanceled = errors.New("job ws canceled") + ErrCanceled = errors.New("job was canceled") ErrDequeueTimeout = errors.New("dequeue context timed out or was canceled") ) diff --git a/test/cases/api.sh b/test/cases/api.sh index 79950c261..caf2ca46a 100755 --- a/test/cases/api.sh +++ b/test/cases/api.sh @@ -481,11 +481,29 @@ jq '.customizations.packages = [ "jesuisunpaquetquinexistepas" ]' "$REQUEST_FILE sendCompose "$REQUEST_FILE2" waitForState "failure" -# crashed/stopped/killed worker should result in a failed state +# crashed/stopped/killed worker should result in the job being retried sendCompose "$REQUEST_FILE" waitForState "building" sudo systemctl stop "osbuild-remote-worker@*" -waitForState "failure" +RETRIED=0 +for RETRY in {1..10}; do + ROWS=$(sudo ${CONTAINER_RUNTIME} exec "${DB_CONTAINER_NAME}" psql -U postgres -d osbuildcomposer -c \ + "SELECT retries FROM jobs WHERE id = '$COMPOSE_ID' AND retries = 1") + if grep -q "1 row" <<< "$ROWS"; then + RETRIED=1 + break + else + echo "Waiting until job is retried ($RETRY/10)" + sleep 30 + fi +done +if [ "$RETRIED" != 1 ]; then + echo "Job $COMPOSE_ID wasn't retried after killing the worker" + exit 1 +fi +# remove the job from the queue so the worker doesn't pick it up again +sudo ${CONTAINER_RUNTIME} exec "${DB_CONTAINER_NAME}" psql -U postgres -d osbuildcomposer -c \ + "DELETE FROM jobs WHERE id = '$COMPOSE_ID'" sudo systemctl start "osbuild-remote-worker@localhost:8700.service" # full integration case