From a2895376ae879718d89f3b781766a19e2626d1b0 Mon Sep 17 00:00:00 2001 From: Tom Gundersen Date: Fri, 6 Nov 2020 17:06:36 +0000 Subject: [PATCH] worker: introduce dynamicArgs In addition to the arguments passed when scheduling a job, a job now also takes the results of its dependencies as additional arguments. We call these dynamic arguments for the lack of a better term. The immediate use-case for this is to allow koji jobs to be split up as follows: - koji-init: Creates a koji build, and returns us a token. - osbuild-koji: one job per architecture, depending on koji-init having succeeded. Builds the image, and uploads it to koji, returning metadata about the image produced. - koji-finalize: uses the token from koji-init and the metadata from osbuild-koji to import the build into koji if it succeeded or mark it as failed if it failed. --- internal/worker/api/openapi.yml | 3 +++ internal/worker/client.go | 2 ++ internal/worker/json.go | 11 ++++++----- internal/worker/server.go | 19 +++++++++++++------ internal/worker/server_test.go | 3 ++- 5 files changed, 26 insertions(+), 12 deletions(-) diff --git a/internal/worker/api/openapi.yml b/internal/worker/api/openapi.yml index c0a9d02ab..478abd603 100644 --- a/internal/worker/api/openapi.yml +++ b/internal/worker/api/openapi.yml @@ -63,6 +63,9 @@ paths: enum: - osbuild args: {} + dynamic_args: + type: array + items: {} required: - type - location diff --git a/internal/worker/client.go b/internal/worker/client.go index 09dbe6e0e..84e0a9d5b 100644 --- a/internal/worker/client.go +++ b/internal/worker/client.go @@ -37,6 +37,7 @@ type job struct { artifactLocation string jobType string args json.RawMessage + dynamicArgs []json.RawMessage } func NewClient(baseURL string, conf *tls.Config) (*Client, error) { @@ -128,6 +129,7 @@ func (c *Client) RequestJob(types []string) (Job, error) { id: jr.Id, jobType: jr.Type, args: jr.Args, + dynamicArgs: jr.DynamicArgs, location: location.String(), artifactLocation: artifactLocation.String(), }, nil diff --git a/internal/worker/json.go b/internal/worker/json.go index b7bfcb348..f75d99589 100644 --- a/internal/worker/json.go +++ b/internal/worker/json.go @@ -35,11 +35,12 @@ type statusResponse struct { } type requestJobResponse struct { - Id uuid.UUID `json:"id"` - Location string `json:"location"` - ArtifactLocation string `json:"artifact_location"` - Type string `json:"type"` - Args json.RawMessage `json:"args,omitempty"` + Id uuid.UUID `json:"id"` + Location string `json:"location"` + ArtifactLocation string `json:"artifact_location"` + Type string `json:"type"` + Args json.RawMessage `json:"args,omitempty"` + DynamicArgs []json.RawMessage `json:"dynamic_args,omitempty"` } type getJobResponse struct { diff --git a/internal/worker/server.go b/internal/worker/server.go index 04367ef35..9b91a7cab 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -168,7 +168,7 @@ func (s *Server) DeleteArtifacts(id uuid.UUID) error { return os.RemoveAll(path.Join(s.artifactsDir, id.String())) } -func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string) (uuid.UUID, uuid.UUID, string, json.RawMessage, error) { +func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) { token := uuid.New() // treat osbuild jobs specially until we have found a generic way to @@ -182,15 +182,21 @@ func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string) jts = append(jts, t) } - jobId, _, jobType, args, err := s.jobs.Dequeue(ctx, jts) + jobId, depIDs, jobType, args, err := s.jobs.Dequeue(ctx, jts) if err != nil { - return uuid.Nil, uuid.Nil, "", nil, err + return uuid.Nil, uuid.Nil, "", nil, nil, err + } + + var dynamicArgs []json.RawMessage + for _, depID := range depIDs { + result, _, _, _, _, _ := s.jobs.JobStatus(depID) + dynamicArgs = append(dynamicArgs, result) } if s.artifactsDir != "" { err := os.MkdirAll(path.Join(s.artifactsDir, "tmp", token.String()), 0700) if err != nil { - return uuid.Nil, uuid.Nil, "", nil, fmt.Errorf("cannot create artifact directory: %v", err) + return uuid.Nil, uuid.Nil, "", nil, nil, fmt.Errorf("cannot create artifact directory: %v", err) } } @@ -202,7 +208,7 @@ func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string) jobType = "osbuild" } - return token, jobId, jobType, args, nil + return token, jobId, jobType, args, dynamicArgs, nil } func (s *Server) RunningJob(token uuid.UUID) (uuid.UUID, error) { @@ -268,7 +274,7 @@ func (h *apiHandlers) RequestJob(ctx echo.Context) error { return err } - token, jobId, jobType, jobArgs, err := h.server.RequestJob(ctx.Request().Context(), body.Arch, body.Types) + token, jobId, jobType, jobArgs, dynamicJobArgs, err := h.server.RequestJob(ctx.Request().Context(), body.Arch, body.Types) if err != nil { return err } @@ -279,6 +285,7 @@ func (h *apiHandlers) RequestJob(ctx echo.Context) error { ArtifactLocation: fmt.Sprintf("%s/jobs/%v/artifacts/", api.BasePath, token), Type: jobType, Args: jobArgs, + DynamicArgs: dynamicJobArgs, }) } diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go index 56aa29668..96ea76c5b 100644 --- a/internal/worker/server_test.go +++ b/internal/worker/server_test.go @@ -90,11 +90,12 @@ func TestCancel(t *testing.T) { jobId, err := server.Enqueue(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) require.NoError(t, err) - token, j, typ, args, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + token, j, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) require.NoError(t, err) require.Equal(t, jobId, j) require.Equal(t, "osbuild", typ) require.NotNil(t, args) + require.Nil(t, dynamicArgs) err = server.Cancel(jobId) require.NoError(t, err)