diff --git a/cmd/osbuild-worker/main.go b/cmd/osbuild-worker/main.go index dafe2a1cf..14010956c 100644 --- a/cmd/osbuild-worker/main.go +++ b/cmd/osbuild-worker/main.go @@ -16,6 +16,7 @@ import ( "github.com/google/uuid" "github.com/osbuild/osbuild-composer/internal/common" + "github.com/osbuild/osbuild-composer/internal/distro" "github.com/osbuild/osbuild-composer/internal/osbuild" "github.com/osbuild/osbuild-composer/internal/target" "github.com/osbuild/osbuild-composer/internal/upload/awsupload" @@ -67,7 +68,7 @@ func (e *TargetsError) Error() string { return errString } -func RunJob(job *worker.Job, store string, uploadFunc func(uuid.UUID, string, io.Reader) error) (*osbuild.Result, error) { +func RunJob(token uuid.UUID, manifest distro.Manifest, targets []*target.Target, store string, uploadFunc func(uuid.UUID, string, io.Reader) error) (*osbuild.Result, error) { outputDirectory, err := ioutil.TempDir("/var/tmp", "osbuild-worker-*") if err != nil { return nil, fmt.Errorf("error creating temporary output directory: %v", err) @@ -79,14 +80,14 @@ func RunJob(job *worker.Job, store string, uploadFunc func(uuid.UUID, string, io } }() - result, err := RunOSBuild(job.Manifest, store, outputDirectory, os.Stderr) + result, err := RunOSBuild(manifest, store, outputDirectory, os.Stderr) if err != nil { return nil, err } var r []error - for _, t := range job.Targets { + for _, t := range targets { switch options := t.Options.(type) { case *target.LocalTargetOptions: var f *os.File @@ -105,7 +106,7 @@ func RunJob(job *worker.Job, store string, uploadFunc func(uuid.UUID, string, io } } - err = uploadFunc(job.Id, options.Filename, f) + err = uploadFunc(token, options.Filename, f) if err != nil { r = append(r, err) continue @@ -120,7 +121,7 @@ func RunJob(job *worker.Job, store string, uploadFunc func(uuid.UUID, string, io } if options.Key == "" { - options.Key = job.Id.String() + options.Key = token.String() } _, err = a.Upload(path.Join(outputDirectory, options.Filename), options.Bucket, options.Key) @@ -180,11 +181,11 @@ func RunJob(job *worker.Job, store string, uploadFunc func(uuid.UUID, string, io // It would be cleaner to kill the osbuild process using (`exec.CommandContext` // or similar), but osbuild does not currently support this. Exiting here will // make systemd clean up the whole cgroup and restart this service. -func WatchJob(ctx context.Context, client *worker.Client, job *worker.Job) { +func WatchJob(ctx context.Context, client *worker.Client, token uuid.UUID) { for { select { case <-time.After(15 * time.Second): - if client.JobCanceled(job) { + if client.JobCanceled(token) { log.Println("Job was canceled. Exiting.") os.Exit(0) } @@ -238,18 +239,18 @@ func main() { for { fmt.Println("Waiting for a new job...") - job, err := client.AddJob() + token, manifest, targets, err := client.RequestJob() if err != nil { log.Fatal(err) } - fmt.Printf("Running job %s\n", job.Id) + fmt.Printf("Running job %s\n", token) ctx, cancel := context.WithCancel(context.Background()) - go WatchJob(ctx, client, job) + go WatchJob(ctx, client, token) var status common.ImageBuildState - result, err := RunJob(job, store, client.UploadImage) + result, err := RunJob(token, manifest, targets, store, client.UploadImage) if err != nil { log.Printf(" Job failed: %v", err) status = common.IBFailed @@ -275,14 +276,14 @@ func main() { // flag to indicate all error kinds. result.Success = false } else { - log.Printf(" 🎉 Job completed successfully: %s", job.Id) + log.Printf(" 🎉 Job completed successfully: %s", token) status = common.IBFinished } // signal to WatchJob() that it can stop watching cancel() - err = client.UpdateJob(job, status, result) + err = client.UpdateJob(token, status, result) if err != nil { log.Fatalf("Error reporting job result: %v", err) } diff --git a/internal/worker/api/api.gen.go b/internal/worker/api/api.gen.go index 1f489580c..35067a6ee 100644 --- a/internal/worker/api/api.gen.go +++ b/internal/worker/api/api.gen.go @@ -17,8 +17,8 @@ import ( "strings" ) -// PostJobJSONBody defines parameters for PostJob. -type PostJobJSONBody map[string]interface{} +// RequestJobJSONBody defines parameters for RequestJob. +type RequestJobJSONBody map[string]interface{} // UpdateJobJSONBody defines parameters for UpdateJob. type UpdateJobJSONBody struct { @@ -26,8 +26,8 @@ type UpdateJobJSONBody struct { Status string `json:"status"` } -// PostJobRequestBody defines body for PostJob for application/json ContentType. -type PostJobJSONRequestBody PostJobJSONBody +// RequestJobRequestBody defines body for RequestJob for application/json ContentType. +type RequestJobJSONRequestBody RequestJobJSONBody // UpdateJobRequestBody defines body for UpdateJob for application/json ContentType. type UpdateJobJSONRequestBody UpdateJobJSONBody @@ -103,28 +103,28 @@ func WithRequestEditorFn(fn RequestEditorFn) ClientOption { // The interface specification for the client above. type ClientInterface interface { - // PostJob request with any body - PostJobWithBody(ctx context.Context, contentType string, body io.Reader) (*http.Response, error) + // RequestJob request with any body + RequestJobWithBody(ctx context.Context, contentType string, body io.Reader) (*http.Response, error) - PostJob(ctx context.Context, body PostJobJSONRequestBody) (*http.Response, error) + RequestJob(ctx context.Context, body RequestJobJSONRequestBody) (*http.Response, error) // GetJob request - GetJob(ctx context.Context, jobId string) (*http.Response, error) + GetJob(ctx context.Context, token string) (*http.Response, error) // UpdateJob request with any body - UpdateJobWithBody(ctx context.Context, jobId string, contentType string, body io.Reader) (*http.Response, error) + UpdateJobWithBody(ctx context.Context, token string, contentType string, body io.Reader) (*http.Response, error) - UpdateJob(ctx context.Context, jobId string, body UpdateJobJSONRequestBody) (*http.Response, error) + UpdateJob(ctx context.Context, token string, body UpdateJobJSONRequestBody) (*http.Response, error) - // PostJobArtifact request with any body - PostJobArtifactWithBody(ctx context.Context, jobId string, name string, contentType string, body io.Reader) (*http.Response, error) + // UploadJobArtifact request with any body + UploadJobArtifactWithBody(ctx context.Context, token string, name string, contentType string, body io.Reader) (*http.Response, error) // GetStatus request GetStatus(ctx context.Context) (*http.Response, error) } -func (c *Client) PostJobWithBody(ctx context.Context, contentType string, body io.Reader) (*http.Response, error) { - req, err := NewPostJobRequestWithBody(c.Server, contentType, body) +func (c *Client) RequestJobWithBody(ctx context.Context, contentType string, body io.Reader) (*http.Response, error) { + req, err := NewRequestJobRequestWithBody(c.Server, contentType, body) if err != nil { return nil, err } @@ -138,8 +138,8 @@ func (c *Client) PostJobWithBody(ctx context.Context, contentType string, body i return c.Client.Do(req) } -func (c *Client) PostJob(ctx context.Context, body PostJobJSONRequestBody) (*http.Response, error) { - req, err := NewPostJobRequest(c.Server, body) +func (c *Client) RequestJob(ctx context.Context, body RequestJobJSONRequestBody) (*http.Response, error) { + req, err := NewRequestJobRequest(c.Server, body) if err != nil { return nil, err } @@ -153,8 +153,8 @@ func (c *Client) PostJob(ctx context.Context, body PostJobJSONRequestBody) (*htt return c.Client.Do(req) } -func (c *Client) GetJob(ctx context.Context, jobId string) (*http.Response, error) { - req, err := NewGetJobRequest(c.Server, jobId) +func (c *Client) GetJob(ctx context.Context, token string) (*http.Response, error) { + req, err := NewGetJobRequest(c.Server, token) if err != nil { return nil, err } @@ -168,8 +168,8 @@ func (c *Client) GetJob(ctx context.Context, jobId string) (*http.Response, erro return c.Client.Do(req) } -func (c *Client) UpdateJobWithBody(ctx context.Context, jobId string, contentType string, body io.Reader) (*http.Response, error) { - req, err := NewUpdateJobRequestWithBody(c.Server, jobId, contentType, body) +func (c *Client) UpdateJobWithBody(ctx context.Context, token string, contentType string, body io.Reader) (*http.Response, error) { + req, err := NewUpdateJobRequestWithBody(c.Server, token, contentType, body) if err != nil { return nil, err } @@ -183,8 +183,8 @@ func (c *Client) UpdateJobWithBody(ctx context.Context, jobId string, contentTyp return c.Client.Do(req) } -func (c *Client) UpdateJob(ctx context.Context, jobId string, body UpdateJobJSONRequestBody) (*http.Response, error) { - req, err := NewUpdateJobRequest(c.Server, jobId, body) +func (c *Client) UpdateJob(ctx context.Context, token string, body UpdateJobJSONRequestBody) (*http.Response, error) { + req, err := NewUpdateJobRequest(c.Server, token, body) if err != nil { return nil, err } @@ -198,8 +198,8 @@ func (c *Client) UpdateJob(ctx context.Context, jobId string, body UpdateJobJSON return c.Client.Do(req) } -func (c *Client) PostJobArtifactWithBody(ctx context.Context, jobId string, name string, contentType string, body io.Reader) (*http.Response, error) { - req, err := NewPostJobArtifactRequestWithBody(c.Server, jobId, name, contentType, body) +func (c *Client) UploadJobArtifactWithBody(ctx context.Context, token string, name string, contentType string, body io.Reader) (*http.Response, error) { + req, err := NewUploadJobArtifactRequestWithBody(c.Server, token, name, contentType, body) if err != nil { return nil, err } @@ -228,19 +228,19 @@ func (c *Client) GetStatus(ctx context.Context) (*http.Response, error) { return c.Client.Do(req) } -// NewPostJobRequest calls the generic PostJob builder with application/json body -func NewPostJobRequest(server string, body PostJobJSONRequestBody) (*http.Request, error) { +// NewRequestJobRequest calls the generic RequestJob builder with application/json body +func NewRequestJobRequest(server string, body RequestJobJSONRequestBody) (*http.Request, error) { var bodyReader io.Reader buf, err := json.Marshal(body) if err != nil { return nil, err } bodyReader = bytes.NewReader(buf) - return NewPostJobRequestWithBody(server, "application/json", bodyReader) + return NewRequestJobRequestWithBody(server, "application/json", bodyReader) } -// NewPostJobRequestWithBody generates requests for PostJob with any type of body -func NewPostJobRequestWithBody(server string, contentType string, body io.Reader) (*http.Request, error) { +// NewRequestJobRequestWithBody generates requests for RequestJob with any type of body +func NewRequestJobRequestWithBody(server string, contentType string, body io.Reader) (*http.Request, error) { var err error queryUrl, err := url.Parse(server) @@ -268,12 +268,12 @@ func NewPostJobRequestWithBody(server string, contentType string, body io.Reader } // NewGetJobRequest generates requests for GetJob -func NewGetJobRequest(server string, jobId string) (*http.Request, error) { +func NewGetJobRequest(server string, token string) (*http.Request, error) { var err error var pathParam0 string - pathParam0, err = runtime.StyleParam("simple", false, "job_id", jobId) + pathParam0, err = runtime.StyleParam("simple", false, "token", token) if err != nil { return nil, err } @@ -302,23 +302,23 @@ func NewGetJobRequest(server string, jobId string) (*http.Request, error) { } // NewUpdateJobRequest calls the generic UpdateJob builder with application/json body -func NewUpdateJobRequest(server string, jobId string, body UpdateJobJSONRequestBody) (*http.Request, error) { +func NewUpdateJobRequest(server string, token string, body UpdateJobJSONRequestBody) (*http.Request, error) { var bodyReader io.Reader buf, err := json.Marshal(body) if err != nil { return nil, err } bodyReader = bytes.NewReader(buf) - return NewUpdateJobRequestWithBody(server, jobId, "application/json", bodyReader) + return NewUpdateJobRequestWithBody(server, token, "application/json", bodyReader) } // NewUpdateJobRequestWithBody generates requests for UpdateJob with any type of body -func NewUpdateJobRequestWithBody(server string, jobId string, contentType string, body io.Reader) (*http.Request, error) { +func NewUpdateJobRequestWithBody(server string, token string, contentType string, body io.Reader) (*http.Request, error) { var err error var pathParam0 string - pathParam0, err = runtime.StyleParam("simple", false, "job_id", jobId) + pathParam0, err = runtime.StyleParam("simple", false, "token", token) if err != nil { return nil, err } @@ -347,13 +347,13 @@ func NewUpdateJobRequestWithBody(server string, jobId string, contentType string return req, nil } -// NewPostJobArtifactRequestWithBody generates requests for PostJobArtifact with any type of body -func NewPostJobArtifactRequestWithBody(server string, jobId string, name string, contentType string, body io.Reader) (*http.Request, error) { +// NewUploadJobArtifactRequestWithBody generates requests for UploadJobArtifact with any type of body +func NewUploadJobArtifactRequestWithBody(server string, token string, name string, contentType string, body io.Reader) (*http.Request, error) { var err error var pathParam0 string - pathParam0, err = runtime.StyleParam("simple", false, "job_id", jobId) + pathParam0, err = runtime.StyleParam("simple", false, "token", token) if err != nil { return nil, err } @@ -380,7 +380,7 @@ func NewPostJobArtifactRequestWithBody(server string, jobId string, name string, return nil, err } - req, err := http.NewRequest("POST", queryUrl.String(), body) + req, err := http.NewRequest("PUT", queryUrl.String(), body) if err != nil { return nil, err } @@ -445,38 +445,38 @@ func WithBaseURL(baseURL string) ClientOption { // ClientWithResponsesInterface is the interface specification for the client with responses above. type ClientWithResponsesInterface interface { - // PostJob request with any body - PostJobWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader) (*PostJobResponse, error) + // RequestJob request with any body + RequestJobWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader) (*RequestJobResponse, error) - PostJobWithResponse(ctx context.Context, body PostJobJSONRequestBody) (*PostJobResponse, error) + RequestJobWithResponse(ctx context.Context, body RequestJobJSONRequestBody) (*RequestJobResponse, error) // GetJob request - GetJobWithResponse(ctx context.Context, jobId string) (*GetJobResponse, error) + GetJobWithResponse(ctx context.Context, token string) (*GetJobResponse, error) // UpdateJob request with any body - UpdateJobWithBodyWithResponse(ctx context.Context, jobId string, contentType string, body io.Reader) (*UpdateJobResponse, error) + UpdateJobWithBodyWithResponse(ctx context.Context, token string, contentType string, body io.Reader) (*UpdateJobResponse, error) - UpdateJobWithResponse(ctx context.Context, jobId string, body UpdateJobJSONRequestBody) (*UpdateJobResponse, error) + UpdateJobWithResponse(ctx context.Context, token string, body UpdateJobJSONRequestBody) (*UpdateJobResponse, error) - // PostJobArtifact request with any body - PostJobArtifactWithBodyWithResponse(ctx context.Context, jobId string, name string, contentType string, body io.Reader) (*PostJobArtifactResponse, error) + // UploadJobArtifact request with any body + UploadJobArtifactWithBodyWithResponse(ctx context.Context, token string, name string, contentType string, body io.Reader) (*UploadJobArtifactResponse, error) // GetStatus request GetStatusWithResponse(ctx context.Context) (*GetStatusResponse, error) } -type PostJobResponse struct { +type RequestJobResponse struct { Body []byte HTTPResponse *http.Response - JSON200 *struct { - Id string `json:"id"` - Manifest interface{} `json:"manifest"` - Targets []interface{} `json:"targets"` + JSON201 *struct { + Manifest interface{} `json:"manifest"` + Targets *[]interface{} `json:"targets,omitempty"` + Token string `json:"token"` } } // Status returns HTTPResponse.Status -func (r PostJobResponse) Status() string { +func (r RequestJobResponse) Status() string { if r.HTTPResponse != nil { return r.HTTPResponse.Status } @@ -484,7 +484,7 @@ func (r PostJobResponse) Status() string { } // StatusCode returns HTTPResponse.StatusCode -func (r PostJobResponse) StatusCode() int { +func (r RequestJobResponse) StatusCode() int { if r.HTTPResponse != nil { return r.HTTPResponse.StatusCode } @@ -495,8 +495,7 @@ type GetJobResponse struct { Body []byte HTTPResponse *http.Response JSON200 *struct { - Canceled bool `json:"canceled"` - Id string `json:"id"` + Canceled bool `json:"canceled"` } } @@ -537,13 +536,13 @@ func (r UpdateJobResponse) StatusCode() int { return 0 } -type PostJobArtifactResponse struct { +type UploadJobArtifactResponse struct { Body []byte HTTPResponse *http.Response } // Status returns HTTPResponse.Status -func (r PostJobArtifactResponse) Status() string { +func (r UploadJobArtifactResponse) Status() string { if r.HTTPResponse != nil { return r.HTTPResponse.Status } @@ -551,7 +550,7 @@ func (r PostJobArtifactResponse) Status() string { } // StatusCode returns HTTPResponse.StatusCode -func (r PostJobArtifactResponse) StatusCode() int { +func (r UploadJobArtifactResponse) StatusCode() int { if r.HTTPResponse != nil { return r.HTTPResponse.StatusCode } @@ -582,26 +581,26 @@ func (r GetStatusResponse) StatusCode() int { return 0 } -// PostJobWithBodyWithResponse request with arbitrary body returning *PostJobResponse -func (c *ClientWithResponses) PostJobWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader) (*PostJobResponse, error) { - rsp, err := c.PostJobWithBody(ctx, contentType, body) +// RequestJobWithBodyWithResponse request with arbitrary body returning *RequestJobResponse +func (c *ClientWithResponses) RequestJobWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader) (*RequestJobResponse, error) { + rsp, err := c.RequestJobWithBody(ctx, contentType, body) if err != nil { return nil, err } - return ParsePostJobResponse(rsp) + return ParseRequestJobResponse(rsp) } -func (c *ClientWithResponses) PostJobWithResponse(ctx context.Context, body PostJobJSONRequestBody) (*PostJobResponse, error) { - rsp, err := c.PostJob(ctx, body) +func (c *ClientWithResponses) RequestJobWithResponse(ctx context.Context, body RequestJobJSONRequestBody) (*RequestJobResponse, error) { + rsp, err := c.RequestJob(ctx, body) if err != nil { return nil, err } - return ParsePostJobResponse(rsp) + return ParseRequestJobResponse(rsp) } // GetJobWithResponse request returning *GetJobResponse -func (c *ClientWithResponses) GetJobWithResponse(ctx context.Context, jobId string) (*GetJobResponse, error) { - rsp, err := c.GetJob(ctx, jobId) +func (c *ClientWithResponses) GetJobWithResponse(ctx context.Context, token string) (*GetJobResponse, error) { + rsp, err := c.GetJob(ctx, token) if err != nil { return nil, err } @@ -609,29 +608,29 @@ func (c *ClientWithResponses) GetJobWithResponse(ctx context.Context, jobId stri } // UpdateJobWithBodyWithResponse request with arbitrary body returning *UpdateJobResponse -func (c *ClientWithResponses) UpdateJobWithBodyWithResponse(ctx context.Context, jobId string, contentType string, body io.Reader) (*UpdateJobResponse, error) { - rsp, err := c.UpdateJobWithBody(ctx, jobId, contentType, body) +func (c *ClientWithResponses) UpdateJobWithBodyWithResponse(ctx context.Context, token string, contentType string, body io.Reader) (*UpdateJobResponse, error) { + rsp, err := c.UpdateJobWithBody(ctx, token, contentType, body) if err != nil { return nil, err } return ParseUpdateJobResponse(rsp) } -func (c *ClientWithResponses) UpdateJobWithResponse(ctx context.Context, jobId string, body UpdateJobJSONRequestBody) (*UpdateJobResponse, error) { - rsp, err := c.UpdateJob(ctx, jobId, body) +func (c *ClientWithResponses) UpdateJobWithResponse(ctx context.Context, token string, body UpdateJobJSONRequestBody) (*UpdateJobResponse, error) { + rsp, err := c.UpdateJob(ctx, token, body) if err != nil { return nil, err } return ParseUpdateJobResponse(rsp) } -// PostJobArtifactWithBodyWithResponse request with arbitrary body returning *PostJobArtifactResponse -func (c *ClientWithResponses) PostJobArtifactWithBodyWithResponse(ctx context.Context, jobId string, name string, contentType string, body io.Reader) (*PostJobArtifactResponse, error) { - rsp, err := c.PostJobArtifactWithBody(ctx, jobId, name, contentType, body) +// UploadJobArtifactWithBodyWithResponse request with arbitrary body returning *UploadJobArtifactResponse +func (c *ClientWithResponses) UploadJobArtifactWithBodyWithResponse(ctx context.Context, token string, name string, contentType string, body io.Reader) (*UploadJobArtifactResponse, error) { + rsp, err := c.UploadJobArtifactWithBody(ctx, token, name, contentType, body) if err != nil { return nil, err } - return ParsePostJobArtifactResponse(rsp) + return ParseUploadJobArtifactResponse(rsp) } // GetStatusWithResponse request returning *GetStatusResponse @@ -643,30 +642,30 @@ func (c *ClientWithResponses) GetStatusWithResponse(ctx context.Context) (*GetSt return ParseGetStatusResponse(rsp) } -// ParsePostJobResponse parses an HTTP response from a PostJobWithResponse call -func ParsePostJobResponse(rsp *http.Response) (*PostJobResponse, error) { +// ParseRequestJobResponse parses an HTTP response from a RequestJobWithResponse call +func ParseRequestJobResponse(rsp *http.Response) (*RequestJobResponse, error) { bodyBytes, err := ioutil.ReadAll(rsp.Body) defer rsp.Body.Close() if err != nil { return nil, err } - response := &PostJobResponse{ + response := &RequestJobResponse{ Body: bodyBytes, HTTPResponse: rsp, } switch { - case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 200: + case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 201: var dest struct { - Id string `json:"id"` - Manifest interface{} `json:"manifest"` - Targets []interface{} `json:"targets"` + Manifest interface{} `json:"manifest"` + Targets *[]interface{} `json:"targets,omitempty"` + Token string `json:"token"` } if err := json.Unmarshal(bodyBytes, &dest); err != nil { return nil, err } - response.JSON200 = &dest + response.JSON201 = &dest } @@ -689,8 +688,7 @@ func ParseGetJobResponse(rsp *http.Response) (*GetJobResponse, error) { switch { case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 200: var dest struct { - Canceled bool `json:"canceled"` - Id string `json:"id"` + Canceled bool `json:"canceled"` } if err := json.Unmarshal(bodyBytes, &dest); err != nil { return nil, err @@ -721,15 +719,15 @@ func ParseUpdateJobResponse(rsp *http.Response) (*UpdateJobResponse, error) { return response, nil } -// ParsePostJobArtifactResponse parses an HTTP response from a PostJobArtifactWithResponse call -func ParsePostJobArtifactResponse(rsp *http.Response) (*PostJobArtifactResponse, error) { +// ParseUploadJobArtifactResponse parses an HTTP response from a UploadJobArtifactWithResponse call +func ParseUploadJobArtifactResponse(rsp *http.Response) (*UploadJobArtifactResponse, error) { bodyBytes, err := ioutil.ReadAll(rsp.Body) defer rsp.Body.Close() if err != nil { return nil, err } - response := &PostJobArtifactResponse{ + response := &UploadJobArtifactResponse{ Body: bodyBytes, HTTPResponse: rsp, } @@ -770,18 +768,18 @@ func ParseGetStatusResponse(rsp *http.Response) (*GetStatusResponse, error) { // ServerInterface represents all server handlers. type ServerInterface interface { - // create-job + // Request a job // (POST /jobs) - PostJob(ctx echo.Context) error - // get-job - // (GET /jobs/{job_id}) - GetJob(ctx echo.Context, jobId string) error - // update-job - // (PATCH /jobs/{job_id}) - UpdateJob(ctx echo.Context, jobId string) error - // add-image - // (POST /jobs/{job_id}/artifacts/{name}) - PostJobArtifact(ctx echo.Context, jobId string, name string) error + RequestJob(ctx echo.Context) error + // Get running job + // (GET /jobs/{token}) + GetJob(ctx echo.Context, token string) error + // Update a running job + // (PATCH /jobs/{token}) + UpdateJob(ctx echo.Context, token string) error + // Upload an artifact + // (PUT /jobs/{token}/artifacts/{name}) + UploadJobArtifact(ctx echo.Context, token string, name string) error // status // (GET /status) GetStatus(ctx echo.Context) error @@ -792,56 +790,56 @@ type ServerInterfaceWrapper struct { Handler ServerInterface } -// PostJob converts echo context to params. -func (w *ServerInterfaceWrapper) PostJob(ctx echo.Context) error { +// RequestJob converts echo context to params. +func (w *ServerInterfaceWrapper) RequestJob(ctx echo.Context) error { var err error // Invoke the callback with all the unmarshalled arguments - err = w.Handler.PostJob(ctx) + err = w.Handler.RequestJob(ctx) return err } // GetJob converts echo context to params. func (w *ServerInterfaceWrapper) GetJob(ctx echo.Context) error { var err error - // ------------- Path parameter "job_id" ------------- - var jobId string + // ------------- Path parameter "token" ------------- + var token string - err = runtime.BindStyledParameter("simple", false, "job_id", ctx.Param("job_id"), &jobId) + err = runtime.BindStyledParameter("simple", false, "token", ctx.Param("token"), &token) if err != nil { - return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter job_id: %s", err)) + return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter token: %s", err)) } // Invoke the callback with all the unmarshalled arguments - err = w.Handler.GetJob(ctx, jobId) + err = w.Handler.GetJob(ctx, token) return err } // UpdateJob converts echo context to params. func (w *ServerInterfaceWrapper) UpdateJob(ctx echo.Context) error { var err error - // ------------- Path parameter "job_id" ------------- - var jobId string + // ------------- Path parameter "token" ------------- + var token string - err = runtime.BindStyledParameter("simple", false, "job_id", ctx.Param("job_id"), &jobId) + err = runtime.BindStyledParameter("simple", false, "token", ctx.Param("token"), &token) if err != nil { - return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter job_id: %s", err)) + return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter token: %s", err)) } // Invoke the callback with all the unmarshalled arguments - err = w.Handler.UpdateJob(ctx, jobId) + err = w.Handler.UpdateJob(ctx, token) return err } -// PostJobArtifact converts echo context to params. -func (w *ServerInterfaceWrapper) PostJobArtifact(ctx echo.Context) error { +// UploadJobArtifact converts echo context to params. +func (w *ServerInterfaceWrapper) UploadJobArtifact(ctx echo.Context) error { var err error - // ------------- Path parameter "job_id" ------------- - var jobId string + // ------------- Path parameter "token" ------------- + var token string - err = runtime.BindStyledParameter("simple", false, "job_id", ctx.Param("job_id"), &jobId) + err = runtime.BindStyledParameter("simple", false, "token", ctx.Param("token"), &token) if err != nil { - return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter job_id: %s", err)) + return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter token: %s", err)) } // ------------- Path parameter "name" ------------- @@ -853,7 +851,7 @@ func (w *ServerInterfaceWrapper) PostJobArtifact(ctx echo.Context) error { } // Invoke the callback with all the unmarshalled arguments - err = w.Handler.PostJobArtifact(ctx, jobId, name) + err = w.Handler.UploadJobArtifact(ctx, token, name) return err } @@ -888,10 +886,10 @@ func RegisterHandlers(router EchoRouter, si ServerInterface) { Handler: si, } - router.POST("/jobs", wrapper.PostJob) - router.GET("/jobs/:job_id", wrapper.GetJob) - router.PATCH("/jobs/:job_id", wrapper.UpdateJob) - router.POST("/jobs/:job_id/artifacts/:name", wrapper.PostJobArtifact) + router.POST("/jobs", wrapper.RequestJob) + router.GET("/jobs/:token", wrapper.GetJob) + router.PATCH("/jobs/:token", wrapper.UpdateJob) + router.PUT("/jobs/:token/artifacts/:name", wrapper.UploadJobArtifact) router.GET("/status", wrapper.GetStatus) } diff --git a/internal/worker/api/openapi.yml b/internal/worker/api/openapi.yml index 830233fec..ae79299f0 100644 --- a/internal/worker/api/openapi.yml +++ b/internal/worker/api/openapi.yml @@ -27,18 +27,18 @@ paths: description: Simple status handler to check whether the service is up. /jobs: post: - summary: create-job + summary: Request a job tags: [] responses: - '200': - description: OK + '201': + description: Created content: application/json: schema: type: object additionalProperties: false properties: - id: + token: type: string format: uuid manifest: {} @@ -46,26 +46,27 @@ paths: type: array items: {} required: - - id + - token - manifest - - targets - operationId: PostJob + operationId: RequestJob requestBody: content: application/json: schema: type: object - properties: {} + additionalProperties: false + description: '' + description: Requests a job. This operation blocks until a job is available. parameters: [] - '/jobs/{job_id}': + '/jobs/{token}': parameters: - schema: type: string - name: job_id + name: token in: path required: true get: - summary: get-job + summary: Get running job tags: [] responses: '200': @@ -75,17 +76,14 @@ paths: schema: type: object properties: - id: - type: string - format: uuid canceled: type: boolean required: - - id - canceled operationId: GetJob + description: '' patch: - summary: update-job + summary: Update a running job tags: [] responses: {} operationId: UpdateJob @@ -106,25 +104,25 @@ paths: required: - status - result - '/jobs/{job_id}/artifacts/{name}': + '/jobs/{token}/artifacts/{name}': parameters: - - schema: - type: string - name: job_id - in: path - required: true - schema: type: string name: name in: path required: true - post: - summary: add-image + - schema: + type: string + name: token + in: path + required: true + put: + summary: Upload an artifact tags: [] responses: '200': description: OK - operationId: PostJobArtifact + operationId: UploadJobArtifact requestBody: content: application/octet-stream: diff --git a/internal/worker/client.go b/internal/worker/client.go index cc38e07f5..0af7df0ba 100644 --- a/internal/worker/client.go +++ b/internal/worker/client.go @@ -23,12 +23,6 @@ type Client struct { api *api.Client } -type Job struct { - Id uuid.UUID - Manifest distro.Manifest - Targets []*target.Target -} - func NewClient(baseURL string, conf *tls.Config) (*Client, error) { httpClient := http.Client{ Transport: &http.Transport{ @@ -61,34 +55,30 @@ func NewClientUnix(path string) *Client { return &Client{c} } -func (c *Client) AddJob() (*Job, error) { - response, err := c.api.PostJob(context.Background(), api.PostJobJSONRequestBody{}) +func (c *Client) RequestJob() (uuid.UUID, distro.Manifest, []*target.Target, error) { + response, err := c.api.RequestJob(context.Background(), api.RequestJobJSONRequestBody{}) if err != nil { - return nil, err + return uuid.Nil, nil, nil, err } defer response.Body.Close() if response.StatusCode != http.StatusCreated { var er errorResponse _ = json.NewDecoder(response.Body).Decode(&er) - return nil, fmt.Errorf("couldn't create job, got %d: %s", response.StatusCode, er.Message) + return uuid.Nil, nil, nil, fmt.Errorf("couldn't create job, got %d: %s", response.StatusCode, er.Message) } - var jr addJobResponse + var jr requestJobResponse err = json.NewDecoder(response.Body).Decode(&jr) if err != nil { - return nil, err + return uuid.Nil, nil, nil, err } - return &Job{ - jr.Id, - jr.Manifest, - jr.Targets, - }, nil + return jr.Token, jr.Manifest, jr.Targets, nil } -func (c *Client) JobCanceled(job *Job) bool { - response, err := c.api.GetJob(context.Background(), job.Id.String()) +func (c *Client) JobCanceled(token uuid.UUID) bool { + response, err := c.api.GetJob(context.Background(), token.String()) if err != nil { return true } @@ -98,7 +88,7 @@ func (c *Client) JobCanceled(job *Job) bool { return true } - var jr jobResponse + var jr getJobResponse err = json.NewDecoder(response.Body).Decode(&jr) if err != nil { return true @@ -107,8 +97,8 @@ func (c *Client) JobCanceled(job *Job) bool { return jr.Canceled } -func (c *Client) UpdateJob(job *Job, status common.ImageBuildState, result *osbuild.Result) error { - response, err := c.api.UpdateJob(context.Background(), job.Id.String(), api.UpdateJobJSONRequestBody{ +func (c *Client) UpdateJob(token uuid.UUID, status common.ImageBuildState, result *osbuild.Result) error { + response, err := c.api.UpdateJob(context.Background(), token.String(), api.UpdateJobJSONRequestBody{ Result: result, Status: status.ToString(), }) @@ -123,9 +113,9 @@ func (c *Client) UpdateJob(job *Job, status common.ImageBuildState, result *osbu return nil } -func (c *Client) UploadImage(job uuid.UUID, name string, reader io.Reader) error { - _, err := c.api.PostJobArtifactWithBody(context.Background(), - job.String(), name, "application/octet-stream", reader) +func (c *Client) UploadImage(token uuid.UUID, name string, reader io.Reader) error { + _, err := c.api.UploadJobArtifactWithBody(context.Background(), + token.String(), name, "application/octet-stream", reader) return err } diff --git a/internal/worker/json.go b/internal/worker/json.go index 6f969721b..e686df68b 100644 --- a/internal/worker/json.go +++ b/internal/worker/json.go @@ -34,18 +34,14 @@ type errorResponse struct { Message string `json:"message"` } -type addJobRequest struct { -} - -type addJobResponse struct { - Id uuid.UUID `json:"id"` +type requestJobResponse struct { + Token uuid.UUID `json:"token"` Manifest distro.Manifest `json:"manifest"` Targets []*target.Target `json:"targets,omitempty"` } -type jobResponse struct { - Id uuid.UUID `json:"id"` - Canceled bool `json:"canceled"` +type getJobResponse struct { + Canceled bool `json:"canceled"` } type updateJobRequest struct { diff --git a/internal/worker/server.go b/internal/worker/server.go index 39c70d77c..37686d615 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -1,7 +1,9 @@ package worker import ( + "context" "encoding/json" + "errors" "fmt" "io" "io/ioutil" @@ -10,6 +12,7 @@ import ( "net/http" "os" "path" + "sync" "time" "github.com/google/uuid" @@ -26,6 +29,18 @@ type Server struct { jobs jobqueue.JobQueue echo *echo.Echo artifactsDir string + + // Currently running jobs. Workers are not handed job ids, but + // independent tokens which serve as an indirection. This enables + // race-free uploading of artifacts and makes restarting composer more + // robust (workers from an old run cannot report results for jobs + // composer thinks are not running). + // This map maps these tokens to job ids. Artifacts are stored in + // `$STATE_DIRECTORY/artifacts/tmp/$TOKEN` while the worker is running, + // and renamed to `$STATE_DIRECTORY/artifacts/$JOB_ID` once the job is + // reported as done. + running map[uuid.UUID]uuid.UUID + runningMutex sync.Mutex } type JobStatus struct { @@ -37,10 +52,13 @@ type JobStatus struct { Result OSBuildJobResult } +var ErrTokenNotExist = errors.New("worker token does not exist") + func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, artifactsDir string) *Server { s := &Server{ jobs: jobs, artifactsDir: artifactsDir, + running: make(map[uuid.UUID]uuid.UUID), } s.echo = echo.New() @@ -151,6 +169,72 @@ func (s *Server) DeleteArtifacts(id uuid.UUID) error { return os.RemoveAll(path.Join(s.artifactsDir, id.String())) } +func (s *Server) RequestJob(ctx context.Context) (uuid.UUID, *OSBuildJob, error) { + token := uuid.New() + + var args OSBuildJob + jobId, err := s.jobs.Dequeue(ctx, []string{"osbuild"}, &args) + if err != nil { + return uuid.Nil, nil, err + } + + if s.artifactsDir != "" { + err := os.MkdirAll(path.Join(s.artifactsDir, "tmp", token.String()), 0700) + if err != nil { + return uuid.Nil, nil, fmt.Errorf("cannot create artifact directory: %v", err) + } + } + + s.runningMutex.Lock() + defer s.runningMutex.Unlock() + s.running[token] = jobId + + return token, &args, nil +} + +func (s *Server) RunningJob(token uuid.UUID) (uuid.UUID, error) { + s.runningMutex.Lock() + defer s.runningMutex.Unlock() + + jobId, ok := s.running[token] + if !ok { + return uuid.Nil, ErrTokenNotExist + } + + return jobId, nil +} + +func (s *Server) FinishJob(token uuid.UUID, result *OSBuildJobResult) error { + s.runningMutex.Lock() + defer s.runningMutex.Unlock() + + jobId, ok := s.running[token] + if !ok { + return ErrTokenNotExist + } + + // Always delete the running job, even if there are errors finishing + // the job, because callers won't call this a second time on error. + delete(s.running, token) + + err := s.jobs.FinishJob(jobId, result) + if err != nil { + return fmt.Errorf("error finishing job: %v", err) + } + + // Move artifacts from the temporary location to the final job + // location. Log any errors, but do not treat them as fatal. The job is + // already finished. + if s.artifactsDir != "" { + err := os.Rename(path.Join(s.artifactsDir, "tmp", token.String()), path.Join(s.artifactsDir, jobId.String())) + if err != nil { + log.Printf("Error moving artifacts for job%s: %v", jobId, err) + } + } + + return nil +} + // apiHandlers implements api.ServerInterface - the http api route handlers // generated from api/openapi.yml. This is a separate object, because these // handlers should not be exposed on the `Server` object. @@ -164,52 +248,59 @@ func (h *apiHandlers) GetStatus(ctx echo.Context) error { }) } -func (h *apiHandlers) GetJob(ctx echo.Context, jobId string) error { - id, err := uuid.Parse(jobId) - if err != nil { - return echo.NewHTTPError(http.StatusBadRequest, "cannot parse compose id: %v", err) - } - - status, err := h.server.JobStatus(id) - if err != nil { - switch err { - case jobqueue.ErrNotExist: - return echo.NewHTTPError(http.StatusNotFound, "job does not exist: %s", id) - default: - return err - } - } - - return ctx.JSON(http.StatusOK, jobResponse{ - Id: id, - Canceled: status.Canceled, - }) -} - -func (h *apiHandlers) PostJob(ctx echo.Context) error { - var body addJobRequest +func (h *apiHandlers) RequestJob(ctx echo.Context) error { + var body struct{} err := ctx.Bind(&body) if err != nil { return err } - var job OSBuildJob - id, err := h.server.jobs.Dequeue(ctx.Request().Context(), []string{"osbuild"}, &job) + token, jobArgs, err := h.server.RequestJob(ctx.Request().Context()) if err != nil { return echo.NewHTTPError(http.StatusInternalServerError, "%v", err) } - return ctx.JSON(http.StatusCreated, addJobResponse{ - Id: id, - Manifest: job.Manifest, - Targets: job.Targets, + return ctx.JSON(http.StatusCreated, requestJobResponse{ + Token: token, + Manifest: jobArgs.Manifest, + Targets: jobArgs.Targets, }) } -func (h *apiHandlers) UpdateJob(ctx echo.Context, jobId string) error { - id, err := uuid.Parse(jobId) +func (h *apiHandlers) GetJob(ctx echo.Context, tokenstr string) error { + token, err := uuid.Parse(tokenstr) if err != nil { - return echo.NewHTTPError(http.StatusBadRequest, "cannot parse compose id: %v", err) + return echo.NewHTTPError(http.StatusBadRequest, "cannot parse job token: %v", err) + } + + jobId, err := h.server.RunningJob(token) + if err != nil { + switch err { + case ErrTokenNotExist: + return echo.NewHTTPError(http.StatusNotFound, "job is not running") + default: + return err + } + } + + if jobId == uuid.Nil { + return ctx.JSON(http.StatusOK, getJobResponse{}) + } + + status, err := h.server.JobStatus(jobId) + if err != nil { + return echo.NewHTTPError(http.StatusInternalServerError, "error retrieving job status: %v", err) + } + + return ctx.JSON(http.StatusOK, getJobResponse{ + Canceled: status.Canceled, + }) +} + +func (h *apiHandlers) UpdateJob(ctx echo.Context, idstr string) error { + token, err := uuid.Parse(idstr) + if err != nil { + return echo.NewHTTPError(http.StatusBadRequest, "cannot parse token: %v", err) } var body updateJobRequest @@ -225,13 +316,11 @@ func (h *apiHandlers) UpdateJob(ctx echo.Context, jobId string) error { return echo.NewHTTPError(http.StatusBadRequest, "setting status of a job to waiting or running is not supported") } - err = h.server.jobs.FinishJob(id, OSBuildJobResult{OSBuildOutput: body.Result}) + err = h.server.FinishJob(token, &OSBuildJobResult{OSBuildOutput: body.Result}) if err != nil { switch err { - case jobqueue.ErrNotExist: - return echo.NewHTTPError(http.StatusNotFound, "job does not exist: %s", id) - case jobqueue.ErrNotRunning: - return echo.NewHTTPError(http.StatusBadRequest, "job is not running: %s", id) + case ErrTokenNotExist: + return echo.NewHTTPError(http.StatusNotFound, "job does not exist: %v", token) default: return err } @@ -240,10 +329,10 @@ func (h *apiHandlers) UpdateJob(ctx echo.Context, jobId string) error { return ctx.JSON(http.StatusOK, updateJobResponse{}) } -func (h *apiHandlers) PostJobArtifact(ctx echo.Context, jobId string, name string) error { - id, err := uuid.Parse(jobId) +func (h *apiHandlers) UploadJobArtifact(ctx echo.Context, tokenstr string, name string) error { + token, err := uuid.Parse(tokenstr) if err != nil { - return echo.NewHTTPError(http.StatusBadRequest, "cannot parse compose id: %v", err) + return echo.NewHTTPError(http.StatusBadRequest, "cannot parse job token: %v", err) } request := ctx.Request() @@ -256,12 +345,7 @@ func (h *apiHandlers) PostJobArtifact(ctx echo.Context, jobId string, name strin return ctx.NoContent(http.StatusOK) } - err = os.Mkdir(path.Join(h.server.artifactsDir, id.String()), 0700) - if err != nil { - return echo.NewHTTPError(http.StatusInternalServerError, "cannot create artifact directory: %v", err) - } - - f, err := os.Create(path.Join(h.server.artifactsDir, id.String(), name)) + f, err := os.Create(path.Join(h.server.artifactsDir, "tmp", token.String(), name)) if err != nil { return echo.NewHTTPError(http.StatusInternalServerError, "cannot create artifact file: %v", err) } diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go index 698a733ce..3562aecb4 100644 --- a/internal/worker/server_test.go +++ b/internal/worker/server_test.go @@ -1,11 +1,11 @@ package worker_test import ( + "context" "fmt" "net/http" "testing" - "github.com/google/uuid" "github.com/stretchr/testify/require" "github.com/osbuild/osbuild-composer/internal/distro" @@ -64,14 +64,11 @@ func TestCreate(t *testing.T) { } server := worker.NewServer(nil, testjobqueue.New(), "") - id, err := server.Enqueue(manifest, nil) + _, err = server.Enqueue(manifest, nil) require.NoError(t, err) test.TestRoute(t, server, false, "POST", "/jobs", `{}`, http.StatusCreated, - `{"id":"`+id.String()+`","manifest":{"sources":{},"pipeline":{}}}`, "created") - - test.TestRoute(t, server, false, "GET", fmt.Sprintf("/jobs/%s", id), `{}`, http.StatusOK, - `{"id":"`+id.String()+`","canceled":false}`) + `{"manifest":{"sources":{},"pipeline":{}}}`, "token", "created") } func TestCancel(t *testing.T) { @@ -90,82 +87,15 @@ func TestCancel(t *testing.T) { } server := worker.NewServer(nil, testjobqueue.New(), "") - id, err := server.Enqueue(manifest, nil) + jobId, err := server.Enqueue(manifest, nil) require.NoError(t, err) - test.TestRoute(t, server, false, "POST", "/jobs", `{}`, http.StatusCreated, - `{"id":"`+id.String()+`","manifest":{"sources":{},"pipeline":{}}}`, "created") - - err = server.Cancel(id) + token, _, err := server.RequestJob(context.Background()) require.NoError(t, err) - test.TestRoute(t, server, false, "GET", fmt.Sprintf("/jobs/%s", id), `{}`, http.StatusOK, - `{"id":"`+id.String()+`","canceled":true}`) -} - -func testUpdateTransition(t *testing.T, from, to string, expectedStatus int) { - distroStruct := fedoratest.New() - arch, err := distroStruct.GetArch("x86_64") - if err != nil { - t.Fatalf("error getting arch from distro") - } - imageType, err := arch.GetImageType("qcow2") - if err != nil { - t.Fatalf("error getting image type from arch") - } - server := worker.NewServer(nil, testjobqueue.New(), "") - - id := uuid.Nil - if from != "VOID" { - manifest, err := imageType.Manifest(nil, distro.ImageOptions{Size: imageType.Size(0)}, nil, nil, nil) - if err != nil { - t.Fatalf("error creating osbuild manifest") - } - - id, err = server.Enqueue(manifest, nil) - require.NoError(t, err) - - if from != "WAITING" { - test.SendHTTP(server, false, "POST", "/jobs", `{}`) - if from != "RUNNING" { - test.SendHTTP(server, false, "PATCH", "/jobs/"+id.String(), `{"status":"`+from+`"}`) - } - } - } - - test.TestRoute(t, server, false, "PATCH", "/jobs/"+id.String(), `{"status":"`+to+`"}`, expectedStatus, "{}", "message") -} - -func TestUpdate(t *testing.T) { - var cases = []struct { - From string - To string - ExpectedStatus int - }{ - {"VOID", "WAITING", http.StatusBadRequest}, - {"VOID", "RUNNING", http.StatusBadRequest}, - {"VOID", "FINISHED", http.StatusNotFound}, - {"VOID", "FAILED", http.StatusNotFound}, - {"WAITING", "WAITING", http.StatusBadRequest}, - {"WAITING", "RUNNING", http.StatusBadRequest}, - {"WAITING", "FINISHED", http.StatusBadRequest}, - {"WAITING", "FAILED", http.StatusBadRequest}, - {"RUNNING", "WAITING", http.StatusBadRequest}, - {"RUNNING", "RUNNING", http.StatusBadRequest}, - {"RUNNING", "FINISHED", http.StatusOK}, - {"RUNNING", "FAILED", http.StatusOK}, - {"FINISHED", "WAITING", http.StatusBadRequest}, - {"FINISHED", "RUNNING", http.StatusBadRequest}, - {"FINISHED", "FINISHED", http.StatusBadRequest}, - {"FINISHED", "FAILED", http.StatusBadRequest}, - {"FAILED", "WAITING", http.StatusBadRequest}, - {"FAILED", "RUNNING", http.StatusBadRequest}, - {"FAILED", "FINISHED", http.StatusBadRequest}, - {"FAILED", "FAILED", http.StatusBadRequest}, - } - - for _, c := range cases { - t.Log(c) - testUpdateTransition(t, c.From, c.To, c.ExpectedStatus) - } + err = server.Cancel(jobId) + require.NoError(t, err) + + test.TestRoute(t, server, false, "GET", fmt.Sprintf("/jobs/%s", token), `{}`, http.StatusOK, + `{"canceled":true}`) }