diff --git a/internal/cloudapi/v2/server.go b/internal/cloudapi/v2/server.go index 2d702de8a..6c21cf516 100644 --- a/internal/cloudapi/v2/server.go +++ b/internal/cloudapi/v2/server.go @@ -3,6 +3,7 @@ package v2 import ( "context" "encoding/json" + "errors" "fmt" "net/http" "strings" @@ -263,7 +264,7 @@ func (s *Server) enqueueCompose(irs []imageRequest, channel string) (uuid.UUID, s.goroutinesGroup.Add(1) go func() { - serializeManifest(s.goroutinesCtx, manifestSource, s.workers, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID, ir.manifestSeed) + serializeManifest(s.goroutinesCtx, manifestSource, s.workers, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID, id, ir.manifestSeed) defer s.goroutinesGroup.Done() }() @@ -423,7 +424,7 @@ func (s *Server) enqueueKojiCompose(taskID uint64, server, name, version, releas // copy the image request while passing it into the goroutine to prevent data races s.goroutinesGroup.Add(1) go func(ir imageRequest) { - serializeManifest(s.goroutinesCtx, manifestSource, s.workers, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID, ir.manifestSeed) + serializeManifest(s.goroutinesCtx, manifestSource, s.workers, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID, buildID, ir.manifestSeed) defer s.goroutinesGroup.Done() }(ir) } @@ -444,23 +445,80 @@ func (s *Server) enqueueKojiCompose(taskID uint64, server, name, version, releas return id, nil } -func serializeManifest(ctx context.Context, manifestSource *manifest.Manifest, workers *worker.Server, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID uuid.UUID, seed int64) { - ctx, cancel := context.WithTimeout(ctx, time.Minute*5) +func serializeManifest(ctx context.Context, manifestSource *manifest.Manifest, workers *worker.Server, depsolveJobID, containerResolveJobID, ostreeResolveJobID, manifestJobID, osbuildJobID uuid.UUID, seed int64) { + // prepared to become a config variable + const depsolveTimeout = 5 + ctx, cancel := context.WithTimeout(ctx, time.Minute*depsolveTimeout) defer cancel() - // wait until job is in a pending state - var token uuid.UUID + jobResult := &worker.ManifestJobByIDResult{ + Manifest: nil, + ManifestInfo: worker.ManifestInfo{ + OSBuildComposerVersion: common.BuildVersion(), + }, + } + var dynArgs []json.RawMessage var err error + token := uuid.Nil logWithId := logrus.WithField("jobId", manifestJobID) + + defer func() { + // token == uuid.Nil indicates that no worker even started processing + if token == uuid.Nil { + if jobResult.JobError != nil { + // set all jobs to "failed" + jobs := map[string]uuid.UUID{ + "depsolve": depsolveJobID, + "containerResolve": containerResolveJobID, + "ostreeResolve": ostreeResolveJobID, + "manifest": manifestJobID, + "osbuild": osbuildJobID, + } + + for jobName, jobID := range jobs { + if jobID != uuid.Nil { + err := workers.SetFailed(jobID, jobResult.JobError) + if err != nil { + logWithId.Errorf("Error failing %s job: %v", jobName, err) + } + } + } + + } else { + logWithId.Errorf("Internal error, no worker started depsolve but we didn't get a reason.") + } + } else { + result, err := json.Marshal(jobResult) + if err != nil { + logWithId.Errorf("Error marshalling manifest job results: %v", err) + } + err = workers.FinishJob(token, result) + if err != nil { + logWithId.Errorf("Error finishing manifest job: %v", err) + } + if jobResult.JobError != nil { + logWithId.Errorf("Error in manifest job %v: %v", jobResult.JobError.Reason, err) + } + } + }() + + // wait until job is in a pending state for { _, token, _, _, dynArgs, err = workers.RequestJobById(ctx, "", manifestJobID) - if err == jobqueue.ErrNotPending { + if errors.Is(err, jobqueue.ErrNotPending) { logWithId.Debug("Manifest job not pending, waiting for depsolve job to finish") time.Sleep(time.Millisecond * 50) select { case <-ctx.Done(): - logWithId.Warning("Manifest job dependencies took longer than 5 minutes to finish, or the server is shutting down, returning to avoid dangling routines") + logWithId.Warning(fmt.Sprintf("Manifest job dependencies took longer than %d minutes to finish,"+ + " or the server is shutting down, returning to avoid dangling routines", depsolveTimeout)) + + jobResult.JobError = clienterrors.New(clienterrors.ErrorDepsolveTimeout, + "Timeout while waiting for package dependency resolution", + "There may be a temporary issue with compute resources. "+ + "We’re looking into it, please try again later.", + ) break default: continue @@ -473,13 +531,6 @@ func serializeManifest(ctx context.Context, manifestSource *manifest.Manifest, w break } - jobResult := &worker.ManifestJobByIDResult{ - Manifest: nil, - ManifestInfo: worker.ManifestInfo{ - OSBuildComposerVersion: common.BuildVersion(), - }, - } - // add osbuild/images dependency info to job result osbuildImagesDep, err := common.GetDepModuleInfoByPath(common.OSBuildImagesModulePath) if err != nil { @@ -491,22 +542,6 @@ func serializeManifest(ctx context.Context, manifestSource *manifest.Manifest, w jobResult.ManifestInfo.OSBuildComposerDeps = append(jobResult.ManifestInfo.OSBuildComposerDeps, osbuildImagesDepModule) } - defer func() { - if jobResult.JobError != nil { - logWithId.Errorf("Error in manifest job %v: %v", jobResult.JobError.Reason, err) - } - - result, err := json.Marshal(jobResult) - if err != nil { - logWithId.Errorf("Error marshalling manifest job results: %v", err) - } - - err = workers.FinishJob(token, result) - if err != nil { - logWithId.Errorf("Error finishing manifest job: %v", err) - } - }() - if len(dynArgs) == 0 { reason := "No dynamic arguments" jobResult.JobError = clienterrors.New(clienterrors.ErrorNoDynamicArgs, reason, nil) diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index c8f9e4b38..cc73abe51 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -403,6 +403,40 @@ func (q *fsJobQueue) CancelJob(id uuid.UUID) error { return nil } +func (q *fsJobQueue) FailJob(id uuid.UUID, result interface{}) error { + q.mu.Lock() + defer q.mu.Unlock() + + j, err := q.readJob(id) + if err != nil { + return err + } + + if !j.FinishedAt.IsZero() { + return jobqueue.ErrFinished + } + + if !j.StartedAt.IsZero() { + return jobqueue.ErrRunning + } + + j.Result, err = json.Marshal(result) + if err != nil { + return err + } + + j.StartedAt = time.Now() + j.FinishedAt = time.Now() + j.Token = uuid.New() + + err = q.db.Write(id.String(), j) + if err != nil { + return fmt.Errorf("error writing job %s: %v", id, err) + } + + return nil +} + func (q *fsJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, dependents []uuid.UUID, err error) { j, err := q.readJob(id) if err != nil { diff --git a/internal/jobqueue/jobqueuetest/jobqueuetest.go b/internal/jobqueue/jobqueuetest/jobqueuetest.go index bfd431cd5..27c0eb2ab 100644 --- a/internal/jobqueue/jobqueuetest/jobqueuetest.go +++ b/internal/jobqueue/jobqueuetest/jobqueuetest.go @@ -7,6 +7,8 @@ import ( "context" "encoding/json" "fmt" + "github.com/osbuild/osbuild-composer/internal/worker" + "github.com/osbuild/osbuild-composer/internal/worker/clienterrors" "os" "sync" "testing" @@ -55,6 +57,7 @@ func TestJobQueue(t *testing.T, makeJobQueue MakeJobQueue) { t.Run("multiple-channels", wrap(testMultipleChannels)) t.Run("100-dequeuers", wrap(test100dequeuers)) t.Run("workers", wrap(testWorkers)) + t.Run("fail", wrap(testFail)) } func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID, channel string) uuid.UUID { @@ -762,3 +765,56 @@ func testWorkers(t *testing.T, q jobqueue.JobQueue) { err = q.DeleteWorker(w2) require.NoError(t, err) } + +func testFail(t *testing.T, q jobqueue.JobQueue) { + startTime := time.Now() + + FailedJobErrorResult := worker.JobResult{ + JobError: clienterrors.New(clienterrors.ErrorDepsolveTimeout, + "Test timeout reason", + "Test timeout details", + ), + } + + testReason, err := json.Marshal(FailedJobErrorResult) + require.NoError(t, err) + + // set a non-existing job to failed + err = q.FailJob(uuid.New(), testReason) + require.Error(t, err) + + // Cancel a pending job + id := pushTestJob(t, q, "coralreef", nil, nil, "testchannel") + require.NotEmpty(t, id) + + err = q.FailJob(id, testReason) + require.NoError(t, err) + + //nolint:golint,ineffassign + jobType, channel, result, queued, started, finished, canceled, _, _, err := q.JobStatus(id) + require.NoError(t, err) + + endTime := time.Now() + type JobResult struct { + JobError *clienterrors.Error `json:"job_error"` + } + var r1 JobResult + err = json.Unmarshal(result, &r1) + require.NoError(t, err) + + require.NotNil(t, r1) + require.Equal(t, "Test timeout reason", r1.JobError.Reason) + require.Equal(t, "Test timeout details", r1.JobError.Details) + require.Equal(t, clienterrors.ErrorDepsolveTimeout, r1.JobError.ID) + require.Equal(t, "testchannel", channel) + require.Equal(t, "coralreef", jobType) + require.Equal(t, false, canceled) + + allTimings := []time.Time{queued, started, finished} + + for _, tmr := range allTimings { + require.Less(t, startTime, tmr) + require.Greater(t, endTime, tmr) + } + +} diff --git a/internal/worker/clienterrors/errors.go b/internal/worker/clienterrors/errors.go index 7d25610b0..417c0ee95 100644 --- a/internal/worker/clienterrors/errors.go +++ b/internal/worker/clienterrors/errors.go @@ -44,6 +44,7 @@ const ( ErrorJobPanicked ClientErrorCode = 37 ErrorGeneratingSignedURL ClientErrorCode = 38 ErrorInvalidRepositoryURL ClientErrorCode = 39 + ErrorDepsolveTimeout ClientErrorCode = 40 ) type ClientErrorCode int diff --git a/internal/worker/server.go b/internal/worker/server.go index 137ea1d7a..a5e2c8d5e 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -566,6 +566,20 @@ func (s *Server) Cancel(id uuid.UUID) error { return s.jobs.CancelJob(id) } +// SetFailed sets the given job id to "failed" with the given error +func (s *Server) SetFailed(id uuid.UUID, error *clienterrors.Error) error { + FailedJobErrorResult := JobResult{ + JobError: error, + } + + res, err := json.Marshal(FailedJobErrorResult) + if err != nil { + logrus.Errorf("error marshalling the error: %v", err) + return nil + } + return s.jobs.FailJob(id, res) +} + // Provides access to artifacts of a job. Returns an io.Reader for the artifact // and the artifact's size. func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error) { diff --git a/pkg/jobqueue/dbjobqueue/dbjobqueue.go b/pkg/jobqueue/dbjobqueue/dbjobqueue.go index 35e5531e9..58c34cc23 100644 --- a/pkg/jobqueue/dbjobqueue/dbjobqueue.go +++ b/pkg/jobqueue/dbjobqueue/dbjobqueue.go @@ -94,6 +94,12 @@ const ( WHERE id = $1 AND finished_at IS NULL RETURNING type, started_at` + sqlFailJob = ` + UPDATE jobs + SET token = $2, started_at = now(), finished_at = now(), result = $3 + WHERE id = $1 AND finished_at IS NULL AND started_at IS NULL AND token IS NULL + RETURNING id, type` + sqlInsertHeartbeat = ` INSERT INTO heartbeats(token, id, heartbeat) VALUES ($1, $2, now())` @@ -592,6 +598,32 @@ func (q *DBJobQueue) CancelJob(id uuid.UUID) error { return nil } +func (q *DBJobQueue) FailJob(id uuid.UUID, result interface{}) error { + conn, err := q.pool.Acquire(context.Background()) + if err != nil { + return fmt.Errorf("error connecting to database: %w", err) + } + defer conn.Release() + + var jobType string + var resultId uuid.UUID + dummyToken := uuid.New() + err = conn.QueryRow(context.Background(), sqlFailJob, id, dummyToken, result).Scan(&resultId, &jobType) + if errors.Is(err, pgx.ErrNoRows) { + return jobqueue.ErrNotRunning + } + if err != nil { + return fmt.Errorf("error failing job %s: %w", id, err) + } + if id != resultId { + return fmt.Errorf("that should never happen, I wanted to set %s to failed but got %s back from DB", id, resultId) + } + + q.logger.Info("Job set to failed", "job_type", jobType, "job_id", id.String()) + + return nil +} + func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, dependents []uuid.UUID, err error) { conn, err := q.pool.Acquire(context.Background()) if err != nil { diff --git a/pkg/jobqueue/jobqueue.go b/pkg/jobqueue/jobqueue.go index f92a00730..a1c679fcf 100644 --- a/pkg/jobqueue/jobqueue.go +++ b/pkg/jobqueue/jobqueue.go @@ -59,6 +59,9 @@ type JobQueue interface { // Cancel a job. Does nothing if the job has already finished. CancelJob(id uuid.UUID) error + // Fail a job that didn't even start (e.g. no worker available) + FailJob(id uuid.UUID, result interface{}) error + // If the job has finished, returns the result as raw JSON. // // Returns the current status of the job, in the form of three times: @@ -114,6 +117,8 @@ var ( ErrDequeueTimeout = errors.New("dequeue context timed out or was canceled") ErrActiveJobs = errors.New("worker has active jobs associated with it") ErrWorkerNotExist = errors.New("worker does not exist") + ErrRunning = errors.New("job is running, but wasn't expected to be") + ErrFinished = errors.New("job is finished, but wasn't expected to be") ) type Worker struct {