From 59e73a686aa87f0c55eb85d1ca1c8bff0e71a37a Mon Sep 17 00:00:00 2001 From: Lars Karlitski Date: Mon, 2 Nov 2020 21:49:18 +0100 Subject: [PATCH] worker: generalize job types in the server The worker server was heavily tied to OSBuildJob(Result). Untie it so that it can deal with different job types in the future. This necessitates a change in the jobqueue: Dequeue() now returns the job type, as well as job arguments as json.RawMessage. This is so that the server can wait on multiple job types with different argument types. The weldr, composer, and koji APIs continue to use only "osbuild" jobs. --- internal/cloudapi/server.go | 11 +-- internal/jobqueue/fsjobqueue/fsjobqueue.go | 19 ++--- .../jobqueue/fsjobqueue/fsjobqueue_test.go | 41 ++++++++--- internal/jobqueue/jobqueue.go | 9 ++- .../jobqueue/testjobqueue/testjobqueue.go | 11 +-- internal/kojiapi/server.go | 15 ++-- internal/weldr/api.go | 13 ++-- internal/worker/server.go | 70 +++++++++++-------- internal/worker/server_test.go | 4 +- 9 files changed, 108 insertions(+), 85 deletions(-) diff --git a/internal/cloudapi/server.go b/internal/cloudapi/server.go index 47363f31a..3e60438f9 100644 --- a/internal/cloudapi/server.go +++ b/internal/cloudapi/server.go @@ -215,17 +215,18 @@ func (server *Server) ComposeStatus(w http.ResponseWriter, r *http.Request, id s return } - status, err := server.workers.JobStatus(jobId) + var result worker.OSBuildJobResult + status, err := server.workers.JobStatus(jobId, &result) if err != nil { http.Error(w, fmt.Sprintf("Job %s not found: %s", id, err), http.StatusNotFound) return } response := ComposeStatus{ - Status: composeStatusFromJobStatus(status), + Status: composeStatusFromJobStatus(status, &result), ImageStatuses: &[]ImageStatus{ { - Status: composeStatusFromJobStatus(status), + Status: composeStatusFromJobStatus(status, &result), }, }, } @@ -236,7 +237,7 @@ func (server *Server) ComposeStatus(w http.ResponseWriter, r *http.Request, id s } } -func composeStatusFromJobStatus(js *worker.JobStatus) string { +func composeStatusFromJobStatus(js *worker.JobStatus, result *worker.OSBuildJobResult) string { if js.Canceled { return StatusFailure } @@ -249,7 +250,7 @@ func composeStatusFromJobStatus(js *worker.JobStatus) string { return StatusRunning } - if js.Result.Success { + if result.Success { return StatusSuccess } diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index 7a407d95d..7949bf748 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -145,13 +145,13 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu return j.Id, nil } -func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string, args interface{}) (uuid.UUID, error) { +func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, string, json.RawMessage, error) { q.mu.Lock() defer q.mu.Unlock() // Return early if the context is already canceled. if err := ctx.Err(); err != nil { - return uuid.Nil, err + return uuid.Nil, "", nil, err } // Filter q.pending by the `jobTypes`. Ignore those job types that this @@ -173,12 +173,12 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string, args interf q.mu.Lock() if err != nil { - return uuid.Nil, err + return uuid.Nil, "", nil, err } j, err = q.readJob(id) if err != nil { - return uuid.Nil, err + return uuid.Nil, "", nil, err } if !j.Canceled { @@ -186,19 +186,14 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string, args interf } } - err := json.Unmarshal(j.Args, args) - if err != nil { - return uuid.Nil, fmt.Errorf("error unmarshaling arguments for job '%s': %v", j.Id, err) - } - j.StartedAt = time.Now() - err = q.db.Write(j.Id.String(), j) + err := q.db.Write(j.Id.String(), j) if err != nil { - return uuid.Nil, fmt.Errorf("error writing job %s: %v", j.Id, err) + return uuid.Nil, "", nil, fmt.Errorf("error writing job %s: %v", j.Id, err) } - return j.Id, nil + return j.Id, j.Type, j.Args, nil } func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error { diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go index 108da8677..4c3fec619 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go @@ -43,9 +43,11 @@ func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interfa } func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result interface{}) uuid.UUID { - id, err := q.Dequeue(context.Background(), []string{jobType}, &json.RawMessage{}) + id, typ, args, err := q.Dequeue(context.Background(), []string{jobType}) require.NoError(t, err) require.NotEmpty(t, id) + require.Equal(t, jobType, typ) + require.NotNil(t, args) err = q.FinishJob(id, result) require.NoError(t, err) @@ -89,16 +91,23 @@ func TestArgs(t *testing.T) { twoargs := argument{42, "🐙"} two := pushTestJob(t, q, "octopus", twoargs, nil) - var args argument - id, err := q.Dequeue(context.Background(), []string{"octopus"}, &args) + var parsedArgs argument + + id, typ, args, err := q.Dequeue(context.Background(), []string{"octopus"}) require.NoError(t, err) require.Equal(t, two, id) - require.Equal(t, twoargs, args) + require.Equal(t, "octopus", typ) + err = json.Unmarshal(args, &parsedArgs) + require.NoError(t, err) + require.Equal(t, twoargs, parsedArgs) - id, err = q.Dequeue(context.Background(), []string{"fish"}, &args) + id, typ, args, err = q.Dequeue(context.Background(), []string{"fish"}) require.NoError(t, err) require.Equal(t, one, id) - require.Equal(t, oneargs, args) + require.Equal(t, "fish", typ) + err = json.Unmarshal(args, &parsedArgs) + require.NoError(t, err) + require.Equal(t, oneargs, parsedArgs) } func TestJobTypes(t *testing.T) { @@ -113,9 +122,11 @@ func TestJobTypes(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - id, err := q.Dequeue(ctx, []string{"zebra"}, nil) + id, typ, args, err := q.Dequeue(ctx, []string{"zebra"}) require.Equal(t, err, context.Canceled) require.Equal(t, uuid.Nil, id) + require.Equal(t, "", typ) + require.Nil(t, args) } func TestDependencies(t *testing.T) { @@ -188,9 +199,11 @@ func TestMultipleWorkers(t *testing.T) { defer close(done) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - id, err := q.Dequeue(ctx, []string{"octopus"}, &json.RawMessage{}) + id, typ, args, err := q.Dequeue(ctx, []string{"octopus"}) require.NoError(t, err) require.NotEmpty(t, id) + require.Equal(t, "octopus", typ) + require.Equal(t, json.RawMessage("null"), args) }() // Increase the likelihood that the above goroutine was scheduled and @@ -199,9 +212,11 @@ func TestMultipleWorkers(t *testing.T) { // This call to Dequeue() should not block on the one in the goroutine. id := pushTestJob(t, q, "clownfish", nil, nil) - r, err := q.Dequeue(context.Background(), []string{"clownfish"}, &json.RawMessage{}) + r, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}) require.NoError(t, err) require.Equal(t, id, r) + require.Equal(t, "clownfish", typ) + require.Equal(t, json.RawMessage("null"), args) // Now wake up the Dequeue() in the goroutine and wait for it to finish. _ = pushTestJob(t, q, "octopus", nil, nil) @@ -230,9 +245,11 @@ func TestCancel(t *testing.T) { // Cancel a running job, which should not dequeue the canceled job from above id = pushTestJob(t, q, "clownfish", nil, nil) require.NotEmpty(t, id) - r, err := q.Dequeue(context.Background(), []string{"clownfish"}, &json.RawMessage{}) + r, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}) require.NoError(t, err) require.Equal(t, id, r) + require.Equal(t, "clownfish", typ) + require.Equal(t, json.RawMessage("null"), args) err = q.CancelJob(id) require.NoError(t, err) _, _, _, canceled, err = q.JobStatus(id, &testResult{}) @@ -244,9 +261,11 @@ func TestCancel(t *testing.T) { // Cancel a finished job, which is a no-op id = pushTestJob(t, q, "clownfish", nil, nil) require.NotEmpty(t, id) - r, err = q.Dequeue(context.Background(), []string{"clownfish"}, &json.RawMessage{}) + r, typ, args, err = q.Dequeue(context.Background(), []string{"clownfish"}) require.NoError(t, err) require.Equal(t, id, r) + require.Equal(t, "clownfish", typ) + require.Equal(t, json.RawMessage("null"), args) err = q.FinishJob(id, &testResult{}) require.NoError(t, err) err = q.CancelJob(id) diff --git a/internal/jobqueue/jobqueue.go b/internal/jobqueue/jobqueue.go index 6dbd96b0e..751952c35 100644 --- a/internal/jobqueue/jobqueue.go +++ b/internal/jobqueue/jobqueue.go @@ -14,6 +14,7 @@ package jobqueue import ( "context" + "encoding/json" "errors" "time" @@ -38,11 +39,9 @@ type JobQueue interface { // Waits until a job with a type of any of `jobTypes` is available, or `ctx` is // canceled. // - // All jobs in `jobTypes` must take the same type of `args`, corresponding to - // the one that was passed to Enqueue(). - // - // Returns the job's id or an error. - Dequeue(ctx context.Context, jobTypes []string, args interface{}) (uuid.UUID, error) + // Returns the job's id, type, and arguments, or an error. Arguments + // can be unmarshaled to the type given in Enqueue(). + Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, string, json.RawMessage, error) // Mark the job with `id` as finished. `result` must fit the associated // job type and must be serializable to JSON. diff --git a/internal/jobqueue/testjobqueue/testjobqueue.go b/internal/jobqueue/testjobqueue/testjobqueue.go index 2c0d5e3f1..dc2635141 100644 --- a/internal/jobqueue/testjobqueue/testjobqueue.go +++ b/internal/jobqueue/testjobqueue/testjobqueue.go @@ -79,7 +79,7 @@ func (q *testJobQueue) Enqueue(jobType string, args interface{}, dependencies [] return j.Id, nil } -func (q *testJobQueue) Dequeue(ctx context.Context, jobTypes []string, args interface{}) (uuid.UUID, error) { +func (q *testJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, string, json.RawMessage, error) { for _, t := range jobTypes { if len(q.pending[t]) == 0 { continue @@ -90,16 +90,11 @@ func (q *testJobQueue) Dequeue(ctx context.Context, jobTypes []string, args inte j := q.jobs[id] - err := json.Unmarshal(j.Args, args) - if err != nil { - return uuid.Nil, err - } - j.StartedAt = time.Now() - return j.Id, nil + return j.Id, j.Type, j.Args, nil } - return uuid.Nil, errors.New("no job available") + return uuid.Nil, "", nil, errors.New("no job available") } func (q *testJobQueue) FinishJob(id uuid.UUID, result interface{}) error { diff --git a/internal/kojiapi/server.go b/internal/kojiapi/server.go index 21519d33a..c7a029364 100644 --- a/internal/kojiapi/server.go +++ b/internal/kojiapi/server.go @@ -227,7 +227,7 @@ func splitExtension(filename string) string { return "." + strings.Join(filenameParts[1:], ".") } -func composeStatusFromJobStatus(js *worker.JobStatus) string { +func composeStatusFromJobStatus(js *worker.JobStatus, result *worker.OSBuildJobResult) string { if js.Canceled { return "failure" } @@ -240,14 +240,14 @@ func composeStatusFromJobStatus(js *worker.JobStatus) string { return "pending" } - if js.Result.Success { + if result.Success { return "success" } return "failure" } -func imageStatusFromJobStatus(js *worker.JobStatus) string { +func imageStatusFromJobStatus(js *worker.JobStatus, result *worker.OSBuildJobResult) string { if js.Canceled { return "failure" } @@ -260,7 +260,7 @@ func imageStatusFromJobStatus(js *worker.JobStatus) string { return "building" } - if js.Result.Success { + if result.Success { return "success" } @@ -274,16 +274,17 @@ func (h *apiHandlers) GetComposeId(ctx echo.Context, idstr string) error { return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter id: %s", err)) } - status, err := h.server.workers.JobStatus(id) + var result worker.OSBuildJobResult + status, err := h.server.workers.JobStatus(id, &result) if err != nil { return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Job %s not found: %s", idstr, err)) } response := api.ComposeStatus{ - Status: composeStatusFromJobStatus(status), + Status: composeStatusFromJobStatus(status, &result), ImageStatuses: []api.ImageStatus{ { - Status: imageStatusFromJobStatus(status), + Status: imageStatusFromJobStatus(status, &result), }, }, } diff --git a/internal/weldr/api.go b/internal/weldr/api.go index d8b0f069d..6359ef474 100644 --- a/internal/weldr/api.go +++ b/internal/weldr/api.go @@ -194,7 +194,7 @@ type composeStatus struct { Result *osbuild.Result } -func composeStateFromJobStatus(js *worker.JobStatus) ComposeState { +func composeStateFromJobStatus(js *worker.JobStatus, result *worker.OSBuildJobResult) ComposeState { if js.Canceled { return ComposeFailed } @@ -207,7 +207,7 @@ func composeStateFromJobStatus(js *worker.JobStatus) ComposeState { return ComposeRunning } - if js.Result.Success { + if result.Success { return ComposeFinished } @@ -244,14 +244,17 @@ func (api *API) getComposeStatus(compose store.Compose) *composeStatus { } } + // All jobs are "osbuild" jobs. + var result worker.OSBuildJobResult + // is it ok to ignore this error? - jobStatus, _ := api.workers.JobStatus(jobId) + jobStatus, _ := api.workers.JobStatus(jobId, &result) return &composeStatus{ - State: composeStateFromJobStatus(jobStatus), + State: composeStateFromJobStatus(jobStatus, &result), Queued: jobStatus.Queued, Started: jobStatus.Started, Finished: jobStatus.Finished, - Result: jobStatus.Result.OSBuildOutput, + Result: result.OSBuildOutput, } } diff --git a/internal/worker/server.go b/internal/worker/server.go index 6cb9e1ac2..7a8fc8432 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -45,7 +45,6 @@ type JobStatus struct { Started time.Time Finished time.Time Canceled bool - Result OSBuildJobResult } var ErrTokenNotExist = errors.New("worker token does not exist") @@ -88,18 +87,18 @@ func (s *Server) Enqueue(arch string, job *OSBuildJob) (uuid.UUID, error) { return s.jobs.Enqueue("osbuild:"+arch, job, nil) } -func (s *Server) JobStatus(id uuid.UUID) (*JobStatus, error) { - var result OSBuildJobResult - - queued, started, finished, canceled, err := s.jobs.JobStatus(id, &result) +func (s *Server) JobStatus(id uuid.UUID, result interface{}) (*JobStatus, error) { + queued, started, finished, canceled, err := s.jobs.JobStatus(id, result) if err != nil { return nil, err } // For backwards compatibility: OSBuildJobResult didn't use to have a // top-level `Success` flag. Override it here by looking into the job. - if result.Success == false && result.OSBuildOutput != nil { - result.Success = result.OSBuildOutput.Success && len(result.TargetErrors) == 0 + if r, ok := result.(*OSBuildJobResult); ok { + if !r.Success && r.OSBuildOutput != nil { + r.Success = r.OSBuildOutput.Success && len(r.TargetErrors) == 0 + } } return &JobStatus{ @@ -107,7 +106,6 @@ func (s *Server) JobStatus(id uuid.UUID) (*JobStatus, error) { Started: started, Finished: finished, Canceled: canceled, - Result: result, }, nil } @@ -118,7 +116,11 @@ func (s *Server) Cancel(id uuid.UUID) error { // Provides access to artifacts of a job. Returns an io.Reader for the artifact // and the artifact's size. func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error) { - status, err := s.JobStatus(id) + if s.artifactsDir == "" { + return nil, 0, errors.New("Artifacts not enabled") + } + + status, err := s.JobStatus(id, &json.RawMessage{}) if err != nil { return nil, 0, err } @@ -143,7 +145,11 @@ func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error // Deletes all artifacts for job `id`. func (s *Server) DeleteArtifacts(id uuid.UUID) error { - status, err := s.JobStatus(id) + if s.artifactsDir == "" { + return errors.New("Artifacts not enabled") + } + + status, err := s.JobStatus(id, &json.RawMessage{}) if err != nil { return err } @@ -155,22 +161,29 @@ func (s *Server) DeleteArtifacts(id uuid.UUID) error { return os.RemoveAll(path.Join(s.artifactsDir, id.String())) } -func (s *Server) RequestOSBuildJob(ctx context.Context, arch string) (uuid.UUID, uuid.UUID, *OSBuildJob, error) { +func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string) (uuid.UUID, uuid.UUID, string, json.RawMessage, error) { token := uuid.New() - // wait on "osbuild" jobs for backwards compatiblity - jobTypes := []string{"osbuild", "osbuild:" + arch} + // treat osbuild jobs specially until we have found a generic way to + // specify dequeuing restrictions. For now, we only have one + // restriction: arch for osbuild jobs. + jts := []string{} + for _, t := range jobTypes { + if t == "osbuild" { + t = "osbuild:" + arch + } + jts = append(jts, t) + } - var args OSBuildJob - jobId, err := s.jobs.Dequeue(ctx, jobTypes, &args) + jobId, jobType, args, err := s.jobs.Dequeue(ctx, jts) if err != nil { - return uuid.Nil, uuid.Nil, nil, err + return uuid.Nil, uuid.Nil, "", nil, err } if s.artifactsDir != "" { err := os.MkdirAll(path.Join(s.artifactsDir, "tmp", token.String()), 0700) if err != nil { - return uuid.Nil, uuid.Nil, nil, fmt.Errorf("cannot create artifact directory: %v", err) + return uuid.Nil, uuid.Nil, "", nil, fmt.Errorf("cannot create artifact directory: %v", err) } } @@ -178,7 +191,11 @@ func (s *Server) RequestOSBuildJob(ctx context.Context, arch string) (uuid.UUID, defer s.runningMutex.Unlock() s.running[token] = jobId - return token, jobId, &args, nil + if jobType == "osbuild:"+arch { + jobType = "osbuild" + } + + return token, jobId, jobType, args, nil } func (s *Server) RunningJob(token uuid.UUID) (uuid.UUID, error) { @@ -244,16 +261,7 @@ func (h *apiHandlers) RequestJob(ctx echo.Context) error { return err } - if len(body.Types) != 1 || body.Types[0] != "osbuild" { - return echo.NewHTTPError(http.StatusBadRequest, "invalid job types") - } - - token, jobId, jobArgs, err := h.server.RequestOSBuildJob(ctx.Request().Context(), body.Arch) - if err != nil { - return err - } - - serializedArgs, err := json.Marshal(jobArgs) + token, jobId, jobType, jobArgs, err := h.server.RequestJob(ctx.Request().Context(), body.Arch, body.Types) if err != nil { return err } @@ -262,8 +270,8 @@ func (h *apiHandlers) RequestJob(ctx echo.Context) error { Id: jobId, Location: fmt.Sprintf("%s/jobs/%v", api.BasePath, token), ArtifactLocation: fmt.Sprintf("%s/jobs/%v/artifacts/", api.BasePath, token), - Type: "osbuild", - Args: serializedArgs, + Type: jobType, + Args: jobArgs, }) } @@ -287,7 +295,7 @@ func (h *apiHandlers) GetJob(ctx echo.Context, tokenstr string) error { return ctx.JSON(http.StatusOK, getJobResponse{}) } - status, err := h.server.JobStatus(jobId) + status, err := h.server.JobStatus(jobId, &json.RawMessage{}) if err != nil { return err } diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go index 8f392424b..56aa29668 100644 --- a/internal/worker/server_test.go +++ b/internal/worker/server_test.go @@ -90,9 +90,11 @@ func TestCancel(t *testing.T) { jobId, err := server.Enqueue(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) require.NoError(t, err) - token, j, _, err := server.RequestOSBuildJob(context.Background(), arch.Name()) + token, j, typ, args, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) require.NoError(t, err) require.Equal(t, jobId, j) + require.Equal(t, "osbuild", typ) + require.NotNil(t, args) err = server.Cancel(jobId) require.NoError(t, err)