worker: introduce dynamicArgs
In addition to the arguments passed when scheduling a job, a job now also takes the results of its dependencies as additional arguments. We call these dynamic arguments for the lack of a better term. The immediate use-case for this is to allow koji jobs to be split up as follows: - koji-init: Creates a koji build, and returns us a token. - osbuild-koji: one job per architecture, depending on koji-init having succeeded. Builds the image, and uploads it to koji, returning metadata about the image produced. - koji-finalize: uses the token from koji-init and the metadata from osbuild-koji to import the build into koji if it succeeded or mark it as failed if it failed.
This commit is contained in:
parent
11d0da0b5c
commit
a2895376ae
5 changed files with 26 additions and 12 deletions
|
|
@ -63,6 +63,9 @@ paths:
|
||||||
enum:
|
enum:
|
||||||
- osbuild
|
- osbuild
|
||||||
args: {}
|
args: {}
|
||||||
|
dynamic_args:
|
||||||
|
type: array
|
||||||
|
items: {}
|
||||||
required:
|
required:
|
||||||
- type
|
- type
|
||||||
- location
|
- location
|
||||||
|
|
|
||||||
|
|
@ -37,6 +37,7 @@ type job struct {
|
||||||
artifactLocation string
|
artifactLocation string
|
||||||
jobType string
|
jobType string
|
||||||
args json.RawMessage
|
args json.RawMessage
|
||||||
|
dynamicArgs []json.RawMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClient(baseURL string, conf *tls.Config) (*Client, error) {
|
func NewClient(baseURL string, conf *tls.Config) (*Client, error) {
|
||||||
|
|
@ -128,6 +129,7 @@ func (c *Client) RequestJob(types []string) (Job, error) {
|
||||||
id: jr.Id,
|
id: jr.Id,
|
||||||
jobType: jr.Type,
|
jobType: jr.Type,
|
||||||
args: jr.Args,
|
args: jr.Args,
|
||||||
|
dynamicArgs: jr.DynamicArgs,
|
||||||
location: location.String(),
|
location: location.String(),
|
||||||
artifactLocation: artifactLocation.String(),
|
artifactLocation: artifactLocation.String(),
|
||||||
}, nil
|
}, nil
|
||||||
|
|
|
||||||
|
|
@ -35,11 +35,12 @@ type statusResponse struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type requestJobResponse struct {
|
type requestJobResponse struct {
|
||||||
Id uuid.UUID `json:"id"`
|
Id uuid.UUID `json:"id"`
|
||||||
Location string `json:"location"`
|
Location string `json:"location"`
|
||||||
ArtifactLocation string `json:"artifact_location"`
|
ArtifactLocation string `json:"artifact_location"`
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
Args json.RawMessage `json:"args,omitempty"`
|
Args json.RawMessage `json:"args,omitempty"`
|
||||||
|
DynamicArgs []json.RawMessage `json:"dynamic_args,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type getJobResponse struct {
|
type getJobResponse struct {
|
||||||
|
|
|
||||||
|
|
@ -168,7 +168,7 @@ func (s *Server) DeleteArtifacts(id uuid.UUID) error {
|
||||||
return os.RemoveAll(path.Join(s.artifactsDir, id.String()))
|
return os.RemoveAll(path.Join(s.artifactsDir, id.String()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string) (uuid.UUID, uuid.UUID, string, json.RawMessage, error) {
|
func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) {
|
||||||
token := uuid.New()
|
token := uuid.New()
|
||||||
|
|
||||||
// treat osbuild jobs specially until we have found a generic way to
|
// treat osbuild jobs specially until we have found a generic way to
|
||||||
|
|
@ -182,15 +182,21 @@ func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string)
|
||||||
jts = append(jts, t)
|
jts = append(jts, t)
|
||||||
}
|
}
|
||||||
|
|
||||||
jobId, _, jobType, args, err := s.jobs.Dequeue(ctx, jts)
|
jobId, depIDs, jobType, args, err := s.jobs.Dequeue(ctx, jts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return uuid.Nil, uuid.Nil, "", nil, err
|
return uuid.Nil, uuid.Nil, "", nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var dynamicArgs []json.RawMessage
|
||||||
|
for _, depID := range depIDs {
|
||||||
|
result, _, _, _, _, _ := s.jobs.JobStatus(depID)
|
||||||
|
dynamicArgs = append(dynamicArgs, result)
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.artifactsDir != "" {
|
if s.artifactsDir != "" {
|
||||||
err := os.MkdirAll(path.Join(s.artifactsDir, "tmp", token.String()), 0700)
|
err := os.MkdirAll(path.Join(s.artifactsDir, "tmp", token.String()), 0700)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return uuid.Nil, uuid.Nil, "", nil, fmt.Errorf("cannot create artifact directory: %v", err)
|
return uuid.Nil, uuid.Nil, "", nil, nil, fmt.Errorf("cannot create artifact directory: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -202,7 +208,7 @@ func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string)
|
||||||
jobType = "osbuild"
|
jobType = "osbuild"
|
||||||
}
|
}
|
||||||
|
|
||||||
return token, jobId, jobType, args, nil
|
return token, jobId, jobType, args, dynamicArgs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) RunningJob(token uuid.UUID) (uuid.UUID, error) {
|
func (s *Server) RunningJob(token uuid.UUID) (uuid.UUID, error) {
|
||||||
|
|
@ -268,7 +274,7 @@ func (h *apiHandlers) RequestJob(ctx echo.Context) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
token, jobId, jobType, jobArgs, err := h.server.RequestJob(ctx.Request().Context(), body.Arch, body.Types)
|
token, jobId, jobType, jobArgs, dynamicJobArgs, err := h.server.RequestJob(ctx.Request().Context(), body.Arch, body.Types)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -279,6 +285,7 @@ func (h *apiHandlers) RequestJob(ctx echo.Context) error {
|
||||||
ArtifactLocation: fmt.Sprintf("%s/jobs/%v/artifacts/", api.BasePath, token),
|
ArtifactLocation: fmt.Sprintf("%s/jobs/%v/artifacts/", api.BasePath, token),
|
||||||
Type: jobType,
|
Type: jobType,
|
||||||
Args: jobArgs,
|
Args: jobArgs,
|
||||||
|
DynamicArgs: dynamicJobArgs,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -90,11 +90,12 @@ func TestCancel(t *testing.T) {
|
||||||
jobId, err := server.Enqueue(arch.Name(), &worker.OSBuildJob{Manifest: manifest})
|
jobId, err := server.Enqueue(arch.Name(), &worker.OSBuildJob{Manifest: manifest})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
token, j, typ, args, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"})
|
token, j, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, jobId, j)
|
require.Equal(t, jobId, j)
|
||||||
require.Equal(t, "osbuild", typ)
|
require.Equal(t, "osbuild", typ)
|
||||||
require.NotNil(t, args)
|
require.NotNil(t, args)
|
||||||
|
require.Nil(t, dynamicArgs)
|
||||||
|
|
||||||
err = server.Cancel(jobId)
|
err = server.Cancel(jobId)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue