From c777a18df070b63fea7e45c62490c4dc8a6c354d Mon Sep 17 00:00:00 2001 From: Tom Gundersen Date: Mon, 9 Nov 2020 14:04:58 +0000 Subject: [PATCH] jobqueue: expose dependencies when querying status The status of a job may depend on the status of its dependenices, as we do not repeat for instance the failed state in each dependent job. Return also the list of dependencies so these can be queried too. --- internal/cloudapi/server.go | 2 +- internal/jobqueue/fsjobqueue/fsjobqueue.go | 3 ++- .../jobqueue/fsjobqueue/fsjobqueue_test.go | 18 +++++++++++------- internal/jobqueue/jobqueue.go | 6 ++++-- internal/jobqueue/testjobqueue/testjobqueue.go | 3 ++- internal/worker/server.go | 18 +++++++++--------- 6 files changed, 29 insertions(+), 21 deletions(-) diff --git a/internal/cloudapi/server.go b/internal/cloudapi/server.go index 8bc7aaf60..983bbd4e9 100644 --- a/internal/cloudapi/server.go +++ b/internal/cloudapi/server.go @@ -216,7 +216,7 @@ func (server *Server) ComposeStatus(w http.ResponseWriter, r *http.Request, id s } var result worker.OSBuildJobResult - status, err := server.workers.JobStatus(jobId, &result) + status, _, err := server.workers.JobStatus(jobId, &result) if err != nil { http.Error(w, fmt.Sprintf("Job %s not found: %s", id, err), http.StatusNotFound) return diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index 6f01e2913..c041e7b68 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -263,7 +263,7 @@ func (q *fsJobQueue) CancelJob(id uuid.UUID) error { return nil } -func (q *fsJobQueue) JobStatus(id uuid.UUID) (result json.RawMessage, queued, started, finished time.Time, canceled bool, err error) { +func (q *fsJobQueue) JobStatus(id uuid.UUID) (result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) { j, err := q.readJob(id) if err != nil { return @@ -274,6 +274,7 @@ func (q *fsJobQueue) JobStatus(id uuid.UUID) (result json.RawMessage, queued, st started = j.StartedAt finished = j.FinishedAt canceled = j.Canceled + deps = j.Dependencies return } diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go index 9ceab3eeb..1b72fb600 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go @@ -147,21 +147,23 @@ func TestDependencies(t *testing.T) { require.ElementsMatch(t, []uuid.UUID{one, two}, r) j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two}) - _, queued, started, finished, canceled, err := q.JobStatus(j) + _, queued, started, finished, canceled, deps, err := q.JobStatus(j) require.NoError(t, err) require.True(t, !queued.IsZero()) require.True(t, started.IsZero()) require.True(t, finished.IsZero()) require.False(t, canceled) + require.ElementsMatch(t, deps, []uuid.UUID{one, two}) require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two})) - result, queued, started, finished, canceled, err := q.JobStatus(j) + result, queued, started, finished, canceled, deps, err := q.JobStatus(j) require.NoError(t, err) require.True(t, !queued.IsZero()) require.True(t, !started.IsZero()) require.True(t, !finished.IsZero()) require.False(t, canceled) + require.ElementsMatch(t, deps, []uuid.UUID{one, two}) err = json.Unmarshal(result, &testResult{}) require.NoError(t, err) @@ -172,12 +174,13 @@ func TestDependencies(t *testing.T) { two := pushTestJob(t, q, "test", nil, nil) j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two}) - _, queued, started, finished, canceled, err := q.JobStatus(j) + _, queued, started, finished, canceled, deps, err := q.JobStatus(j) require.NoError(t, err) require.True(t, !queued.IsZero()) require.True(t, started.IsZero()) require.True(t, finished.IsZero()) require.False(t, canceled) + require.ElementsMatch(t, deps, []uuid.UUID{one, two}) r := []uuid.UUID{} r = append(r, finishNextTestJob(t, q, "test", testResult{}, nil)) @@ -186,12 +189,13 @@ func TestDependencies(t *testing.T) { require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two})) - result, queued, started, finished, canceled, err := q.JobStatus(j) + result, queued, started, finished, canceled, deps, err := q.JobStatus(j) require.NoError(t, err) require.True(t, !queued.IsZero()) require.True(t, !started.IsZero()) require.True(t, !finished.IsZero()) require.False(t, canceled) + require.ElementsMatch(t, deps, []uuid.UUID{one, two}) err = json.Unmarshal(result, &testResult{}) require.NoError(t, err) @@ -248,7 +252,7 @@ func TestCancel(t *testing.T) { require.NotEmpty(t, id) err = q.CancelJob(id) require.NoError(t, err) - result, _, _, _, canceled, err := q.JobStatus(id) + result, _, _, _, canceled, _, err := q.JobStatus(id) require.NoError(t, err) require.True(t, canceled) require.Nil(t, result) @@ -266,7 +270,7 @@ func TestCancel(t *testing.T) { require.Equal(t, json.RawMessage("null"), args) err = q.CancelJob(id) require.NoError(t, err) - result, _, _, _, canceled, err = q.JobStatus(id) + result, _, _, _, canceled, _, err = q.JobStatus(id) require.NoError(t, err) require.True(t, canceled) require.Nil(t, result) @@ -286,7 +290,7 @@ func TestCancel(t *testing.T) { require.NoError(t, err) err = q.CancelJob(id) require.NoError(t, err) - result, _, _, _, canceled, err = q.JobStatus(id) + result, _, _, _, canceled, _, err = q.JobStatus(id) require.NoError(t, err) require.False(t, canceled) err = json.Unmarshal(result, &testResult{}) diff --git a/internal/jobqueue/jobqueue.go b/internal/jobqueue/jobqueue.go index 0f90d91c4..d95533aa2 100644 --- a/internal/jobqueue/jobqueue.go +++ b/internal/jobqueue/jobqueue.go @@ -50,13 +50,15 @@ type JobQueue interface { // Cancel a job. Does nothing if the job has already finished. CancelJob(id uuid.UUID) error + // If the job has finished, returns the result as raw JSON. + // // Returns the current status of the job, in the form of three times: // queued, started, and finished. `started` and `finished` might be the // zero time (check with t.IsZero()), when the job is not running or // finished, respectively. // - // If the job is finished, its result will be returned in `result`. - JobStatus(id uuid.UUID) (result json.RawMessage, queued, started, finished time.Time, canceled bool, err error) + // Lastly, the IDs of the jobs dependencies are returned. + JobStatus(id uuid.UUID) (result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) } var ( diff --git a/internal/jobqueue/testjobqueue/testjobqueue.go b/internal/jobqueue/testjobqueue/testjobqueue.go index afee7fa12..25d8ba323 100644 --- a/internal/jobqueue/testjobqueue/testjobqueue.go +++ b/internal/jobqueue/testjobqueue/testjobqueue.go @@ -141,7 +141,7 @@ func (q *testJobQueue) CancelJob(id uuid.UUID) error { return nil } -func (q *testJobQueue) JobStatus(id uuid.UUID) (result json.RawMessage, queued, started, finished time.Time, canceled bool, err error) { +func (q *testJobQueue) JobStatus(id uuid.UUID) (result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) { j, exists := q.jobs[id] if !exists { err = jobqueue.ErrNotExist @@ -153,6 +153,7 @@ func (q *testJobQueue) JobStatus(id uuid.UUID) (result json.RawMessage, queued, started = j.StartedAt finished = j.FinishedAt canceled = j.Canceled + deps = j.Dependencies return } diff --git a/internal/worker/server.go b/internal/worker/server.go index 5f1a106ea..baa2b7293 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -99,16 +99,16 @@ func (s *Server) EnqueueKojiFinalize(job *KojiFinalizeJob, initID uuid.UUID, bui return s.jobs.Enqueue("koji-finalize", job, append([]uuid.UUID{initID}, buildIDs...)) } -func (s *Server) JobStatus(id uuid.UUID, result interface{}) (*JobStatus, error) { - rawResult, queued, started, finished, canceled, err := s.jobs.JobStatus(id) +func (s *Server) JobStatus(id uuid.UUID, result interface{}) (*JobStatus, []uuid.UUID, error) { + rawResult, queued, started, finished, canceled, deps, err := s.jobs.JobStatus(id) if err != nil { - return nil, err + return nil, nil, err } if !finished.IsZero() && !canceled { err = json.Unmarshal(rawResult, result) if err != nil { - return nil, fmt.Errorf("error unmarshaling result for job '%s': %v", id, err) + return nil, nil, fmt.Errorf("error unmarshaling result for job '%s': %v", id, err) } } @@ -125,7 +125,7 @@ func (s *Server) JobStatus(id uuid.UUID, result interface{}) (*JobStatus, error) Started: started, Finished: finished, Canceled: canceled, - }, nil + }, deps, nil } func (s *Server) Cancel(id uuid.UUID) error { @@ -139,7 +139,7 @@ func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error return nil, 0, errors.New("Artifacts not enabled") } - status, err := s.JobStatus(id, &json.RawMessage{}) + status, _, err := s.JobStatus(id, &json.RawMessage{}) if err != nil { return nil, 0, err } @@ -168,7 +168,7 @@ func (s *Server) DeleteArtifacts(id uuid.UUID) error { return errors.New("Artifacts not enabled") } - status, err := s.JobStatus(id, &json.RawMessage{}) + status, _, err := s.JobStatus(id, &json.RawMessage{}) if err != nil { return err } @@ -201,7 +201,7 @@ func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string) var dynamicArgs []json.RawMessage for _, depID := range depIDs { - result, _, _, _, _, _ := s.jobs.JobStatus(depID) + result, _, _, _, _, _, _ := s.jobs.JobStatus(depID) dynamicArgs = append(dynamicArgs, result) } @@ -323,7 +323,7 @@ func (h *apiHandlers) GetJob(ctx echo.Context, tokenstr string) error { return ctx.JSON(http.StatusOK, getJobResponse{}) } - status, err := h.server.JobStatus(jobId, &json.RawMessage{}) + status, _, err := h.server.JobStatus(jobId, &json.RawMessage{}) if err != nil { return err }