worker/api: introduce job tokens

Don't give out job ids to workers, but `tokens`, which serve as an
indirection. This way, restarting composer won't confuse it when a stray
worker returns a result for a job that was still running. Also,
artifacts are only moved to the final location once a job finishes.

This change breaks backwards compatibility, but we're not yet promising
a stable worker API to anyone.

This drops the transition tests in server_test.go. These don't make much
sense anymore, because there's only one allowed transition, from running
to finished. They heavily relied on job slot ids, which are not easily
accessible with the `TestRoute` API. Overall, adjusting this seemed like
too much work for their benefit.
This commit is contained in:
Lars Karlitski 2020-09-07 13:12:06 +02:00 committed by Tom Gundersen
parent 783a88d8cc
commit 26b36ba704
7 changed files with 319 additions and 322 deletions

View file

@ -16,6 +16,7 @@ import (
"github.com/google/uuid"
"github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/distro"
"github.com/osbuild/osbuild-composer/internal/osbuild"
"github.com/osbuild/osbuild-composer/internal/target"
"github.com/osbuild/osbuild-composer/internal/upload/awsupload"
@ -67,7 +68,7 @@ func (e *TargetsError) Error() string {
return errString
}
func RunJob(job *worker.Job, store string, uploadFunc func(uuid.UUID, string, io.Reader) error) (*osbuild.Result, error) {
func RunJob(token uuid.UUID, manifest distro.Manifest, targets []*target.Target, store string, uploadFunc func(uuid.UUID, string, io.Reader) error) (*osbuild.Result, error) {
outputDirectory, err := ioutil.TempDir("/var/tmp", "osbuild-worker-*")
if err != nil {
return nil, fmt.Errorf("error creating temporary output directory: %v", err)
@ -79,14 +80,14 @@ func RunJob(job *worker.Job, store string, uploadFunc func(uuid.UUID, string, io
}
}()
result, err := RunOSBuild(job.Manifest, store, outputDirectory, os.Stderr)
result, err := RunOSBuild(manifest, store, outputDirectory, os.Stderr)
if err != nil {
return nil, err
}
var r []error
for _, t := range job.Targets {
for _, t := range targets {
switch options := t.Options.(type) {
case *target.LocalTargetOptions:
var f *os.File
@ -105,7 +106,7 @@ func RunJob(job *worker.Job, store string, uploadFunc func(uuid.UUID, string, io
}
}
err = uploadFunc(job.Id, options.Filename, f)
err = uploadFunc(token, options.Filename, f)
if err != nil {
r = append(r, err)
continue
@ -120,7 +121,7 @@ func RunJob(job *worker.Job, store string, uploadFunc func(uuid.UUID, string, io
}
if options.Key == "" {
options.Key = job.Id.String()
options.Key = token.String()
}
_, err = a.Upload(path.Join(outputDirectory, options.Filename), options.Bucket, options.Key)
@ -180,11 +181,11 @@ func RunJob(job *worker.Job, store string, uploadFunc func(uuid.UUID, string, io
// It would be cleaner to kill the osbuild process using (`exec.CommandContext`
// or similar), but osbuild does not currently support this. Exiting here will
// make systemd clean up the whole cgroup and restart this service.
func WatchJob(ctx context.Context, client *worker.Client, job *worker.Job) {
func WatchJob(ctx context.Context, client *worker.Client, token uuid.UUID) {
for {
select {
case <-time.After(15 * time.Second):
if client.JobCanceled(job) {
if client.JobCanceled(token) {
log.Println("Job was canceled. Exiting.")
os.Exit(0)
}
@ -238,18 +239,18 @@ func main() {
for {
fmt.Println("Waiting for a new job...")
job, err := client.AddJob()
token, manifest, targets, err := client.RequestJob()
if err != nil {
log.Fatal(err)
}
fmt.Printf("Running job %s\n", job.Id)
fmt.Printf("Running job %s\n", token)
ctx, cancel := context.WithCancel(context.Background())
go WatchJob(ctx, client, job)
go WatchJob(ctx, client, token)
var status common.ImageBuildState
result, err := RunJob(job, store, client.UploadImage)
result, err := RunJob(token, manifest, targets, store, client.UploadImage)
if err != nil {
log.Printf(" Job failed: %v", err)
status = common.IBFailed
@ -275,14 +276,14 @@ func main() {
// flag to indicate all error kinds.
result.Success = false
} else {
log.Printf(" 🎉 Job completed successfully: %s", job.Id)
log.Printf(" 🎉 Job completed successfully: %s", token)
status = common.IBFinished
}
// signal to WatchJob() that it can stop watching
cancel()
err = client.UpdateJob(job, status, result)
err = client.UpdateJob(token, status, result)
if err != nil {
log.Fatalf("Error reporting job result: %v", err)
}

View file

@ -17,8 +17,8 @@ import (
"strings"
)
// PostJobJSONBody defines parameters for PostJob.
type PostJobJSONBody map[string]interface{}
// RequestJobJSONBody defines parameters for RequestJob.
type RequestJobJSONBody map[string]interface{}
// UpdateJobJSONBody defines parameters for UpdateJob.
type UpdateJobJSONBody struct {
@ -26,8 +26,8 @@ type UpdateJobJSONBody struct {
Status string `json:"status"`
}
// PostJobRequestBody defines body for PostJob for application/json ContentType.
type PostJobJSONRequestBody PostJobJSONBody
// RequestJobRequestBody defines body for RequestJob for application/json ContentType.
type RequestJobJSONRequestBody RequestJobJSONBody
// UpdateJobRequestBody defines body for UpdateJob for application/json ContentType.
type UpdateJobJSONRequestBody UpdateJobJSONBody
@ -103,28 +103,28 @@ func WithRequestEditorFn(fn RequestEditorFn) ClientOption {
// The interface specification for the client above.
type ClientInterface interface {
// PostJob request with any body
PostJobWithBody(ctx context.Context, contentType string, body io.Reader) (*http.Response, error)
// RequestJob request with any body
RequestJobWithBody(ctx context.Context, contentType string, body io.Reader) (*http.Response, error)
PostJob(ctx context.Context, body PostJobJSONRequestBody) (*http.Response, error)
RequestJob(ctx context.Context, body RequestJobJSONRequestBody) (*http.Response, error)
// GetJob request
GetJob(ctx context.Context, jobId string) (*http.Response, error)
GetJob(ctx context.Context, token string) (*http.Response, error)
// UpdateJob request with any body
UpdateJobWithBody(ctx context.Context, jobId string, contentType string, body io.Reader) (*http.Response, error)
UpdateJobWithBody(ctx context.Context, token string, contentType string, body io.Reader) (*http.Response, error)
UpdateJob(ctx context.Context, jobId string, body UpdateJobJSONRequestBody) (*http.Response, error)
UpdateJob(ctx context.Context, token string, body UpdateJobJSONRequestBody) (*http.Response, error)
// PostJobArtifact request with any body
PostJobArtifactWithBody(ctx context.Context, jobId string, name string, contentType string, body io.Reader) (*http.Response, error)
// UploadJobArtifact request with any body
UploadJobArtifactWithBody(ctx context.Context, token string, name string, contentType string, body io.Reader) (*http.Response, error)
// GetStatus request
GetStatus(ctx context.Context) (*http.Response, error)
}
func (c *Client) PostJobWithBody(ctx context.Context, contentType string, body io.Reader) (*http.Response, error) {
req, err := NewPostJobRequestWithBody(c.Server, contentType, body)
func (c *Client) RequestJobWithBody(ctx context.Context, contentType string, body io.Reader) (*http.Response, error) {
req, err := NewRequestJobRequestWithBody(c.Server, contentType, body)
if err != nil {
return nil, err
}
@ -138,8 +138,8 @@ func (c *Client) PostJobWithBody(ctx context.Context, contentType string, body i
return c.Client.Do(req)
}
func (c *Client) PostJob(ctx context.Context, body PostJobJSONRequestBody) (*http.Response, error) {
req, err := NewPostJobRequest(c.Server, body)
func (c *Client) RequestJob(ctx context.Context, body RequestJobJSONRequestBody) (*http.Response, error) {
req, err := NewRequestJobRequest(c.Server, body)
if err != nil {
return nil, err
}
@ -153,8 +153,8 @@ func (c *Client) PostJob(ctx context.Context, body PostJobJSONRequestBody) (*htt
return c.Client.Do(req)
}
func (c *Client) GetJob(ctx context.Context, jobId string) (*http.Response, error) {
req, err := NewGetJobRequest(c.Server, jobId)
func (c *Client) GetJob(ctx context.Context, token string) (*http.Response, error) {
req, err := NewGetJobRequest(c.Server, token)
if err != nil {
return nil, err
}
@ -168,8 +168,8 @@ func (c *Client) GetJob(ctx context.Context, jobId string) (*http.Response, erro
return c.Client.Do(req)
}
func (c *Client) UpdateJobWithBody(ctx context.Context, jobId string, contentType string, body io.Reader) (*http.Response, error) {
req, err := NewUpdateJobRequestWithBody(c.Server, jobId, contentType, body)
func (c *Client) UpdateJobWithBody(ctx context.Context, token string, contentType string, body io.Reader) (*http.Response, error) {
req, err := NewUpdateJobRequestWithBody(c.Server, token, contentType, body)
if err != nil {
return nil, err
}
@ -183,8 +183,8 @@ func (c *Client) UpdateJobWithBody(ctx context.Context, jobId string, contentTyp
return c.Client.Do(req)
}
func (c *Client) UpdateJob(ctx context.Context, jobId string, body UpdateJobJSONRequestBody) (*http.Response, error) {
req, err := NewUpdateJobRequest(c.Server, jobId, body)
func (c *Client) UpdateJob(ctx context.Context, token string, body UpdateJobJSONRequestBody) (*http.Response, error) {
req, err := NewUpdateJobRequest(c.Server, token, body)
if err != nil {
return nil, err
}
@ -198,8 +198,8 @@ func (c *Client) UpdateJob(ctx context.Context, jobId string, body UpdateJobJSON
return c.Client.Do(req)
}
func (c *Client) PostJobArtifactWithBody(ctx context.Context, jobId string, name string, contentType string, body io.Reader) (*http.Response, error) {
req, err := NewPostJobArtifactRequestWithBody(c.Server, jobId, name, contentType, body)
func (c *Client) UploadJobArtifactWithBody(ctx context.Context, token string, name string, contentType string, body io.Reader) (*http.Response, error) {
req, err := NewUploadJobArtifactRequestWithBody(c.Server, token, name, contentType, body)
if err != nil {
return nil, err
}
@ -228,19 +228,19 @@ func (c *Client) GetStatus(ctx context.Context) (*http.Response, error) {
return c.Client.Do(req)
}
// NewPostJobRequest calls the generic PostJob builder with application/json body
func NewPostJobRequest(server string, body PostJobJSONRequestBody) (*http.Request, error) {
// NewRequestJobRequest calls the generic RequestJob builder with application/json body
func NewRequestJobRequest(server string, body RequestJobJSONRequestBody) (*http.Request, error) {
var bodyReader io.Reader
buf, err := json.Marshal(body)
if err != nil {
return nil, err
}
bodyReader = bytes.NewReader(buf)
return NewPostJobRequestWithBody(server, "application/json", bodyReader)
return NewRequestJobRequestWithBody(server, "application/json", bodyReader)
}
// NewPostJobRequestWithBody generates requests for PostJob with any type of body
func NewPostJobRequestWithBody(server string, contentType string, body io.Reader) (*http.Request, error) {
// NewRequestJobRequestWithBody generates requests for RequestJob with any type of body
func NewRequestJobRequestWithBody(server string, contentType string, body io.Reader) (*http.Request, error) {
var err error
queryUrl, err := url.Parse(server)
@ -268,12 +268,12 @@ func NewPostJobRequestWithBody(server string, contentType string, body io.Reader
}
// NewGetJobRequest generates requests for GetJob
func NewGetJobRequest(server string, jobId string) (*http.Request, error) {
func NewGetJobRequest(server string, token string) (*http.Request, error) {
var err error
var pathParam0 string
pathParam0, err = runtime.StyleParam("simple", false, "job_id", jobId)
pathParam0, err = runtime.StyleParam("simple", false, "token", token)
if err != nil {
return nil, err
}
@ -302,23 +302,23 @@ func NewGetJobRequest(server string, jobId string) (*http.Request, error) {
}
// NewUpdateJobRequest calls the generic UpdateJob builder with application/json body
func NewUpdateJobRequest(server string, jobId string, body UpdateJobJSONRequestBody) (*http.Request, error) {
func NewUpdateJobRequest(server string, token string, body UpdateJobJSONRequestBody) (*http.Request, error) {
var bodyReader io.Reader
buf, err := json.Marshal(body)
if err != nil {
return nil, err
}
bodyReader = bytes.NewReader(buf)
return NewUpdateJobRequestWithBody(server, jobId, "application/json", bodyReader)
return NewUpdateJobRequestWithBody(server, token, "application/json", bodyReader)
}
// NewUpdateJobRequestWithBody generates requests for UpdateJob with any type of body
func NewUpdateJobRequestWithBody(server string, jobId string, contentType string, body io.Reader) (*http.Request, error) {
func NewUpdateJobRequestWithBody(server string, token string, contentType string, body io.Reader) (*http.Request, error) {
var err error
var pathParam0 string
pathParam0, err = runtime.StyleParam("simple", false, "job_id", jobId)
pathParam0, err = runtime.StyleParam("simple", false, "token", token)
if err != nil {
return nil, err
}
@ -347,13 +347,13 @@ func NewUpdateJobRequestWithBody(server string, jobId string, contentType string
return req, nil
}
// NewPostJobArtifactRequestWithBody generates requests for PostJobArtifact with any type of body
func NewPostJobArtifactRequestWithBody(server string, jobId string, name string, contentType string, body io.Reader) (*http.Request, error) {
// NewUploadJobArtifactRequestWithBody generates requests for UploadJobArtifact with any type of body
func NewUploadJobArtifactRequestWithBody(server string, token string, name string, contentType string, body io.Reader) (*http.Request, error) {
var err error
var pathParam0 string
pathParam0, err = runtime.StyleParam("simple", false, "job_id", jobId)
pathParam0, err = runtime.StyleParam("simple", false, "token", token)
if err != nil {
return nil, err
}
@ -380,7 +380,7 @@ func NewPostJobArtifactRequestWithBody(server string, jobId string, name string,
return nil, err
}
req, err := http.NewRequest("POST", queryUrl.String(), body)
req, err := http.NewRequest("PUT", queryUrl.String(), body)
if err != nil {
return nil, err
}
@ -445,38 +445,38 @@ func WithBaseURL(baseURL string) ClientOption {
// ClientWithResponsesInterface is the interface specification for the client with responses above.
type ClientWithResponsesInterface interface {
// PostJob request with any body
PostJobWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader) (*PostJobResponse, error)
// RequestJob request with any body
RequestJobWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader) (*RequestJobResponse, error)
PostJobWithResponse(ctx context.Context, body PostJobJSONRequestBody) (*PostJobResponse, error)
RequestJobWithResponse(ctx context.Context, body RequestJobJSONRequestBody) (*RequestJobResponse, error)
// GetJob request
GetJobWithResponse(ctx context.Context, jobId string) (*GetJobResponse, error)
GetJobWithResponse(ctx context.Context, token string) (*GetJobResponse, error)
// UpdateJob request with any body
UpdateJobWithBodyWithResponse(ctx context.Context, jobId string, contentType string, body io.Reader) (*UpdateJobResponse, error)
UpdateJobWithBodyWithResponse(ctx context.Context, token string, contentType string, body io.Reader) (*UpdateJobResponse, error)
UpdateJobWithResponse(ctx context.Context, jobId string, body UpdateJobJSONRequestBody) (*UpdateJobResponse, error)
UpdateJobWithResponse(ctx context.Context, token string, body UpdateJobJSONRequestBody) (*UpdateJobResponse, error)
// PostJobArtifact request with any body
PostJobArtifactWithBodyWithResponse(ctx context.Context, jobId string, name string, contentType string, body io.Reader) (*PostJobArtifactResponse, error)
// UploadJobArtifact request with any body
UploadJobArtifactWithBodyWithResponse(ctx context.Context, token string, name string, contentType string, body io.Reader) (*UploadJobArtifactResponse, error)
// GetStatus request
GetStatusWithResponse(ctx context.Context) (*GetStatusResponse, error)
}
type PostJobResponse struct {
type RequestJobResponse struct {
Body []byte
HTTPResponse *http.Response
JSON200 *struct {
Id string `json:"id"`
Manifest interface{} `json:"manifest"`
Targets []interface{} `json:"targets"`
JSON201 *struct {
Manifest interface{} `json:"manifest"`
Targets *[]interface{} `json:"targets,omitempty"`
Token string `json:"token"`
}
}
// Status returns HTTPResponse.Status
func (r PostJobResponse) Status() string {
func (r RequestJobResponse) Status() string {
if r.HTTPResponse != nil {
return r.HTTPResponse.Status
}
@ -484,7 +484,7 @@ func (r PostJobResponse) Status() string {
}
// StatusCode returns HTTPResponse.StatusCode
func (r PostJobResponse) StatusCode() int {
func (r RequestJobResponse) StatusCode() int {
if r.HTTPResponse != nil {
return r.HTTPResponse.StatusCode
}
@ -495,8 +495,7 @@ type GetJobResponse struct {
Body []byte
HTTPResponse *http.Response
JSON200 *struct {
Canceled bool `json:"canceled"`
Id string `json:"id"`
Canceled bool `json:"canceled"`
}
}
@ -537,13 +536,13 @@ func (r UpdateJobResponse) StatusCode() int {
return 0
}
type PostJobArtifactResponse struct {
type UploadJobArtifactResponse struct {
Body []byte
HTTPResponse *http.Response
}
// Status returns HTTPResponse.Status
func (r PostJobArtifactResponse) Status() string {
func (r UploadJobArtifactResponse) Status() string {
if r.HTTPResponse != nil {
return r.HTTPResponse.Status
}
@ -551,7 +550,7 @@ func (r PostJobArtifactResponse) Status() string {
}
// StatusCode returns HTTPResponse.StatusCode
func (r PostJobArtifactResponse) StatusCode() int {
func (r UploadJobArtifactResponse) StatusCode() int {
if r.HTTPResponse != nil {
return r.HTTPResponse.StatusCode
}
@ -582,26 +581,26 @@ func (r GetStatusResponse) StatusCode() int {
return 0
}
// PostJobWithBodyWithResponse request with arbitrary body returning *PostJobResponse
func (c *ClientWithResponses) PostJobWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader) (*PostJobResponse, error) {
rsp, err := c.PostJobWithBody(ctx, contentType, body)
// RequestJobWithBodyWithResponse request with arbitrary body returning *RequestJobResponse
func (c *ClientWithResponses) RequestJobWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader) (*RequestJobResponse, error) {
rsp, err := c.RequestJobWithBody(ctx, contentType, body)
if err != nil {
return nil, err
}
return ParsePostJobResponse(rsp)
return ParseRequestJobResponse(rsp)
}
func (c *ClientWithResponses) PostJobWithResponse(ctx context.Context, body PostJobJSONRequestBody) (*PostJobResponse, error) {
rsp, err := c.PostJob(ctx, body)
func (c *ClientWithResponses) RequestJobWithResponse(ctx context.Context, body RequestJobJSONRequestBody) (*RequestJobResponse, error) {
rsp, err := c.RequestJob(ctx, body)
if err != nil {
return nil, err
}
return ParsePostJobResponse(rsp)
return ParseRequestJobResponse(rsp)
}
// GetJobWithResponse request returning *GetJobResponse
func (c *ClientWithResponses) GetJobWithResponse(ctx context.Context, jobId string) (*GetJobResponse, error) {
rsp, err := c.GetJob(ctx, jobId)
func (c *ClientWithResponses) GetJobWithResponse(ctx context.Context, token string) (*GetJobResponse, error) {
rsp, err := c.GetJob(ctx, token)
if err != nil {
return nil, err
}
@ -609,29 +608,29 @@ func (c *ClientWithResponses) GetJobWithResponse(ctx context.Context, jobId stri
}
// UpdateJobWithBodyWithResponse request with arbitrary body returning *UpdateJobResponse
func (c *ClientWithResponses) UpdateJobWithBodyWithResponse(ctx context.Context, jobId string, contentType string, body io.Reader) (*UpdateJobResponse, error) {
rsp, err := c.UpdateJobWithBody(ctx, jobId, contentType, body)
func (c *ClientWithResponses) UpdateJobWithBodyWithResponse(ctx context.Context, token string, contentType string, body io.Reader) (*UpdateJobResponse, error) {
rsp, err := c.UpdateJobWithBody(ctx, token, contentType, body)
if err != nil {
return nil, err
}
return ParseUpdateJobResponse(rsp)
}
func (c *ClientWithResponses) UpdateJobWithResponse(ctx context.Context, jobId string, body UpdateJobJSONRequestBody) (*UpdateJobResponse, error) {
rsp, err := c.UpdateJob(ctx, jobId, body)
func (c *ClientWithResponses) UpdateJobWithResponse(ctx context.Context, token string, body UpdateJobJSONRequestBody) (*UpdateJobResponse, error) {
rsp, err := c.UpdateJob(ctx, token, body)
if err != nil {
return nil, err
}
return ParseUpdateJobResponse(rsp)
}
// PostJobArtifactWithBodyWithResponse request with arbitrary body returning *PostJobArtifactResponse
func (c *ClientWithResponses) PostJobArtifactWithBodyWithResponse(ctx context.Context, jobId string, name string, contentType string, body io.Reader) (*PostJobArtifactResponse, error) {
rsp, err := c.PostJobArtifactWithBody(ctx, jobId, name, contentType, body)
// UploadJobArtifactWithBodyWithResponse request with arbitrary body returning *UploadJobArtifactResponse
func (c *ClientWithResponses) UploadJobArtifactWithBodyWithResponse(ctx context.Context, token string, name string, contentType string, body io.Reader) (*UploadJobArtifactResponse, error) {
rsp, err := c.UploadJobArtifactWithBody(ctx, token, name, contentType, body)
if err != nil {
return nil, err
}
return ParsePostJobArtifactResponse(rsp)
return ParseUploadJobArtifactResponse(rsp)
}
// GetStatusWithResponse request returning *GetStatusResponse
@ -643,30 +642,30 @@ func (c *ClientWithResponses) GetStatusWithResponse(ctx context.Context) (*GetSt
return ParseGetStatusResponse(rsp)
}
// ParsePostJobResponse parses an HTTP response from a PostJobWithResponse call
func ParsePostJobResponse(rsp *http.Response) (*PostJobResponse, error) {
// ParseRequestJobResponse parses an HTTP response from a RequestJobWithResponse call
func ParseRequestJobResponse(rsp *http.Response) (*RequestJobResponse, error) {
bodyBytes, err := ioutil.ReadAll(rsp.Body)
defer rsp.Body.Close()
if err != nil {
return nil, err
}
response := &PostJobResponse{
response := &RequestJobResponse{
Body: bodyBytes,
HTTPResponse: rsp,
}
switch {
case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 200:
case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 201:
var dest struct {
Id string `json:"id"`
Manifest interface{} `json:"manifest"`
Targets []interface{} `json:"targets"`
Manifest interface{} `json:"manifest"`
Targets *[]interface{} `json:"targets,omitempty"`
Token string `json:"token"`
}
if err := json.Unmarshal(bodyBytes, &dest); err != nil {
return nil, err
}
response.JSON200 = &dest
response.JSON201 = &dest
}
@ -689,8 +688,7 @@ func ParseGetJobResponse(rsp *http.Response) (*GetJobResponse, error) {
switch {
case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 200:
var dest struct {
Canceled bool `json:"canceled"`
Id string `json:"id"`
Canceled bool `json:"canceled"`
}
if err := json.Unmarshal(bodyBytes, &dest); err != nil {
return nil, err
@ -721,15 +719,15 @@ func ParseUpdateJobResponse(rsp *http.Response) (*UpdateJobResponse, error) {
return response, nil
}
// ParsePostJobArtifactResponse parses an HTTP response from a PostJobArtifactWithResponse call
func ParsePostJobArtifactResponse(rsp *http.Response) (*PostJobArtifactResponse, error) {
// ParseUploadJobArtifactResponse parses an HTTP response from a UploadJobArtifactWithResponse call
func ParseUploadJobArtifactResponse(rsp *http.Response) (*UploadJobArtifactResponse, error) {
bodyBytes, err := ioutil.ReadAll(rsp.Body)
defer rsp.Body.Close()
if err != nil {
return nil, err
}
response := &PostJobArtifactResponse{
response := &UploadJobArtifactResponse{
Body: bodyBytes,
HTTPResponse: rsp,
}
@ -770,18 +768,18 @@ func ParseGetStatusResponse(rsp *http.Response) (*GetStatusResponse, error) {
// ServerInterface represents all server handlers.
type ServerInterface interface {
// create-job
// Request a job
// (POST /jobs)
PostJob(ctx echo.Context) error
// get-job
// (GET /jobs/{job_id})
GetJob(ctx echo.Context, jobId string) error
// update-job
// (PATCH /jobs/{job_id})
UpdateJob(ctx echo.Context, jobId string) error
// add-image
// (POST /jobs/{job_id}/artifacts/{name})
PostJobArtifact(ctx echo.Context, jobId string, name string) error
RequestJob(ctx echo.Context) error
// Get running job
// (GET /jobs/{token})
GetJob(ctx echo.Context, token string) error
// Update a running job
// (PATCH /jobs/{token})
UpdateJob(ctx echo.Context, token string) error
// Upload an artifact
// (PUT /jobs/{token}/artifacts/{name})
UploadJobArtifact(ctx echo.Context, token string, name string) error
// status
// (GET /status)
GetStatus(ctx echo.Context) error
@ -792,56 +790,56 @@ type ServerInterfaceWrapper struct {
Handler ServerInterface
}
// PostJob converts echo context to params.
func (w *ServerInterfaceWrapper) PostJob(ctx echo.Context) error {
// RequestJob converts echo context to params.
func (w *ServerInterfaceWrapper) RequestJob(ctx echo.Context) error {
var err error
// Invoke the callback with all the unmarshalled arguments
err = w.Handler.PostJob(ctx)
err = w.Handler.RequestJob(ctx)
return err
}
// GetJob converts echo context to params.
func (w *ServerInterfaceWrapper) GetJob(ctx echo.Context) error {
var err error
// ------------- Path parameter "job_id" -------------
var jobId string
// ------------- Path parameter "token" -------------
var token string
err = runtime.BindStyledParameter("simple", false, "job_id", ctx.Param("job_id"), &jobId)
err = runtime.BindStyledParameter("simple", false, "token", ctx.Param("token"), &token)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter job_id: %s", err))
return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter token: %s", err))
}
// Invoke the callback with all the unmarshalled arguments
err = w.Handler.GetJob(ctx, jobId)
err = w.Handler.GetJob(ctx, token)
return err
}
// UpdateJob converts echo context to params.
func (w *ServerInterfaceWrapper) UpdateJob(ctx echo.Context) error {
var err error
// ------------- Path parameter "job_id" -------------
var jobId string
// ------------- Path parameter "token" -------------
var token string
err = runtime.BindStyledParameter("simple", false, "job_id", ctx.Param("job_id"), &jobId)
err = runtime.BindStyledParameter("simple", false, "token", ctx.Param("token"), &token)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter job_id: %s", err))
return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter token: %s", err))
}
// Invoke the callback with all the unmarshalled arguments
err = w.Handler.UpdateJob(ctx, jobId)
err = w.Handler.UpdateJob(ctx, token)
return err
}
// PostJobArtifact converts echo context to params.
func (w *ServerInterfaceWrapper) PostJobArtifact(ctx echo.Context) error {
// UploadJobArtifact converts echo context to params.
func (w *ServerInterfaceWrapper) UploadJobArtifact(ctx echo.Context) error {
var err error
// ------------- Path parameter "job_id" -------------
var jobId string
// ------------- Path parameter "token" -------------
var token string
err = runtime.BindStyledParameter("simple", false, "job_id", ctx.Param("job_id"), &jobId)
err = runtime.BindStyledParameter("simple", false, "token", ctx.Param("token"), &token)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter job_id: %s", err))
return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter token: %s", err))
}
// ------------- Path parameter "name" -------------
@ -853,7 +851,7 @@ func (w *ServerInterfaceWrapper) PostJobArtifact(ctx echo.Context) error {
}
// Invoke the callback with all the unmarshalled arguments
err = w.Handler.PostJobArtifact(ctx, jobId, name)
err = w.Handler.UploadJobArtifact(ctx, token, name)
return err
}
@ -888,10 +886,10 @@ func RegisterHandlers(router EchoRouter, si ServerInterface) {
Handler: si,
}
router.POST("/jobs", wrapper.PostJob)
router.GET("/jobs/:job_id", wrapper.GetJob)
router.PATCH("/jobs/:job_id", wrapper.UpdateJob)
router.POST("/jobs/:job_id/artifacts/:name", wrapper.PostJobArtifact)
router.POST("/jobs", wrapper.RequestJob)
router.GET("/jobs/:token", wrapper.GetJob)
router.PATCH("/jobs/:token", wrapper.UpdateJob)
router.PUT("/jobs/:token/artifacts/:name", wrapper.UploadJobArtifact)
router.GET("/status", wrapper.GetStatus)
}

View file

@ -27,18 +27,18 @@ paths:
description: Simple status handler to check whether the service is up.
/jobs:
post:
summary: create-job
summary: Request a job
tags: []
responses:
'200':
description: OK
'201':
description: Created
content:
application/json:
schema:
type: object
additionalProperties: false
properties:
id:
token:
type: string
format: uuid
manifest: {}
@ -46,26 +46,27 @@ paths:
type: array
items: {}
required:
- id
- token
- manifest
- targets
operationId: PostJob
operationId: RequestJob
requestBody:
content:
application/json:
schema:
type: object
properties: {}
additionalProperties: false
description: ''
description: Requests a job. This operation blocks until a job is available.
parameters: []
'/jobs/{job_id}':
'/jobs/{token}':
parameters:
- schema:
type: string
name: job_id
name: token
in: path
required: true
get:
summary: get-job
summary: Get running job
tags: []
responses:
'200':
@ -75,17 +76,14 @@ paths:
schema:
type: object
properties:
id:
type: string
format: uuid
canceled:
type: boolean
required:
- id
- canceled
operationId: GetJob
description: ''
patch:
summary: update-job
summary: Update a running job
tags: []
responses: {}
operationId: UpdateJob
@ -106,25 +104,25 @@ paths:
required:
- status
- result
'/jobs/{job_id}/artifacts/{name}':
'/jobs/{token}/artifacts/{name}':
parameters:
- schema:
type: string
name: job_id
in: path
required: true
- schema:
type: string
name: name
in: path
required: true
post:
summary: add-image
- schema:
type: string
name: token
in: path
required: true
put:
summary: Upload an artifact
tags: []
responses:
'200':
description: OK
operationId: PostJobArtifact
operationId: UploadJobArtifact
requestBody:
content:
application/octet-stream:

View file

@ -23,12 +23,6 @@ type Client struct {
api *api.Client
}
type Job struct {
Id uuid.UUID
Manifest distro.Manifest
Targets []*target.Target
}
func NewClient(baseURL string, conf *tls.Config) (*Client, error) {
httpClient := http.Client{
Transport: &http.Transport{
@ -61,34 +55,30 @@ func NewClientUnix(path string) *Client {
return &Client{c}
}
func (c *Client) AddJob() (*Job, error) {
response, err := c.api.PostJob(context.Background(), api.PostJobJSONRequestBody{})
func (c *Client) RequestJob() (uuid.UUID, distro.Manifest, []*target.Target, error) {
response, err := c.api.RequestJob(context.Background(), api.RequestJobJSONRequestBody{})
if err != nil {
return nil, err
return uuid.Nil, nil, nil, err
}
defer response.Body.Close()
if response.StatusCode != http.StatusCreated {
var er errorResponse
_ = json.NewDecoder(response.Body).Decode(&er)
return nil, fmt.Errorf("couldn't create job, got %d: %s", response.StatusCode, er.Message)
return uuid.Nil, nil, nil, fmt.Errorf("couldn't create job, got %d: %s", response.StatusCode, er.Message)
}
var jr addJobResponse
var jr requestJobResponse
err = json.NewDecoder(response.Body).Decode(&jr)
if err != nil {
return nil, err
return uuid.Nil, nil, nil, err
}
return &Job{
jr.Id,
jr.Manifest,
jr.Targets,
}, nil
return jr.Token, jr.Manifest, jr.Targets, nil
}
func (c *Client) JobCanceled(job *Job) bool {
response, err := c.api.GetJob(context.Background(), job.Id.String())
func (c *Client) JobCanceled(token uuid.UUID) bool {
response, err := c.api.GetJob(context.Background(), token.String())
if err != nil {
return true
}
@ -98,7 +88,7 @@ func (c *Client) JobCanceled(job *Job) bool {
return true
}
var jr jobResponse
var jr getJobResponse
err = json.NewDecoder(response.Body).Decode(&jr)
if err != nil {
return true
@ -107,8 +97,8 @@ func (c *Client) JobCanceled(job *Job) bool {
return jr.Canceled
}
func (c *Client) UpdateJob(job *Job, status common.ImageBuildState, result *osbuild.Result) error {
response, err := c.api.UpdateJob(context.Background(), job.Id.String(), api.UpdateJobJSONRequestBody{
func (c *Client) UpdateJob(token uuid.UUID, status common.ImageBuildState, result *osbuild.Result) error {
response, err := c.api.UpdateJob(context.Background(), token.String(), api.UpdateJobJSONRequestBody{
Result: result,
Status: status.ToString(),
})
@ -123,9 +113,9 @@ func (c *Client) UpdateJob(job *Job, status common.ImageBuildState, result *osbu
return nil
}
func (c *Client) UploadImage(job uuid.UUID, name string, reader io.Reader) error {
_, err := c.api.PostJobArtifactWithBody(context.Background(),
job.String(), name, "application/octet-stream", reader)
func (c *Client) UploadImage(token uuid.UUID, name string, reader io.Reader) error {
_, err := c.api.UploadJobArtifactWithBody(context.Background(),
token.String(), name, "application/octet-stream", reader)
return err
}

View file

@ -34,18 +34,14 @@ type errorResponse struct {
Message string `json:"message"`
}
type addJobRequest struct {
}
type addJobResponse struct {
Id uuid.UUID `json:"id"`
type requestJobResponse struct {
Token uuid.UUID `json:"token"`
Manifest distro.Manifest `json:"manifest"`
Targets []*target.Target `json:"targets,omitempty"`
}
type jobResponse struct {
Id uuid.UUID `json:"id"`
Canceled bool `json:"canceled"`
type getJobResponse struct {
Canceled bool `json:"canceled"`
}
type updateJobRequest struct {

View file

@ -1,7 +1,9 @@
package worker
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
@ -10,6 +12,7 @@ import (
"net/http"
"os"
"path"
"sync"
"time"
"github.com/google/uuid"
@ -26,6 +29,18 @@ type Server struct {
jobs jobqueue.JobQueue
echo *echo.Echo
artifactsDir string
// Currently running jobs. Workers are not handed job ids, but
// independent tokens which serve as an indirection. This enables
// race-free uploading of artifacts and makes restarting composer more
// robust (workers from an old run cannot report results for jobs
// composer thinks are not running).
// This map maps these tokens to job ids. Artifacts are stored in
// `$STATE_DIRECTORY/artifacts/tmp/$TOKEN` while the worker is running,
// and renamed to `$STATE_DIRECTORY/artifacts/$JOB_ID` once the job is
// reported as done.
running map[uuid.UUID]uuid.UUID
runningMutex sync.Mutex
}
type JobStatus struct {
@ -37,10 +52,13 @@ type JobStatus struct {
Result OSBuildJobResult
}
var ErrTokenNotExist = errors.New("worker token does not exist")
func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, artifactsDir string) *Server {
s := &Server{
jobs: jobs,
artifactsDir: artifactsDir,
running: make(map[uuid.UUID]uuid.UUID),
}
s.echo = echo.New()
@ -151,6 +169,72 @@ func (s *Server) DeleteArtifacts(id uuid.UUID) error {
return os.RemoveAll(path.Join(s.artifactsDir, id.String()))
}
func (s *Server) RequestJob(ctx context.Context) (uuid.UUID, *OSBuildJob, error) {
token := uuid.New()
var args OSBuildJob
jobId, err := s.jobs.Dequeue(ctx, []string{"osbuild"}, &args)
if err != nil {
return uuid.Nil, nil, err
}
if s.artifactsDir != "" {
err := os.MkdirAll(path.Join(s.artifactsDir, "tmp", token.String()), 0700)
if err != nil {
return uuid.Nil, nil, fmt.Errorf("cannot create artifact directory: %v", err)
}
}
s.runningMutex.Lock()
defer s.runningMutex.Unlock()
s.running[token] = jobId
return token, &args, nil
}
func (s *Server) RunningJob(token uuid.UUID) (uuid.UUID, error) {
s.runningMutex.Lock()
defer s.runningMutex.Unlock()
jobId, ok := s.running[token]
if !ok {
return uuid.Nil, ErrTokenNotExist
}
return jobId, nil
}
func (s *Server) FinishJob(token uuid.UUID, result *OSBuildJobResult) error {
s.runningMutex.Lock()
defer s.runningMutex.Unlock()
jobId, ok := s.running[token]
if !ok {
return ErrTokenNotExist
}
// Always delete the running job, even if there are errors finishing
// the job, because callers won't call this a second time on error.
delete(s.running, token)
err := s.jobs.FinishJob(jobId, result)
if err != nil {
return fmt.Errorf("error finishing job: %v", err)
}
// Move artifacts from the temporary location to the final job
// location. Log any errors, but do not treat them as fatal. The job is
// already finished.
if s.artifactsDir != "" {
err := os.Rename(path.Join(s.artifactsDir, "tmp", token.String()), path.Join(s.artifactsDir, jobId.String()))
if err != nil {
log.Printf("Error moving artifacts for job%s: %v", jobId, err)
}
}
return nil
}
// apiHandlers implements api.ServerInterface - the http api route handlers
// generated from api/openapi.yml. This is a separate object, because these
// handlers should not be exposed on the `Server` object.
@ -164,52 +248,59 @@ func (h *apiHandlers) GetStatus(ctx echo.Context) error {
})
}
func (h *apiHandlers) GetJob(ctx echo.Context, jobId string) error {
id, err := uuid.Parse(jobId)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, "cannot parse compose id: %v", err)
}
status, err := h.server.JobStatus(id)
if err != nil {
switch err {
case jobqueue.ErrNotExist:
return echo.NewHTTPError(http.StatusNotFound, "job does not exist: %s", id)
default:
return err
}
}
return ctx.JSON(http.StatusOK, jobResponse{
Id: id,
Canceled: status.Canceled,
})
}
func (h *apiHandlers) PostJob(ctx echo.Context) error {
var body addJobRequest
func (h *apiHandlers) RequestJob(ctx echo.Context) error {
var body struct{}
err := ctx.Bind(&body)
if err != nil {
return err
}
var job OSBuildJob
id, err := h.server.jobs.Dequeue(ctx.Request().Context(), []string{"osbuild"}, &job)
token, jobArgs, err := h.server.RequestJob(ctx.Request().Context())
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "%v", err)
}
return ctx.JSON(http.StatusCreated, addJobResponse{
Id: id,
Manifest: job.Manifest,
Targets: job.Targets,
return ctx.JSON(http.StatusCreated, requestJobResponse{
Token: token,
Manifest: jobArgs.Manifest,
Targets: jobArgs.Targets,
})
}
func (h *apiHandlers) UpdateJob(ctx echo.Context, jobId string) error {
id, err := uuid.Parse(jobId)
func (h *apiHandlers) GetJob(ctx echo.Context, tokenstr string) error {
token, err := uuid.Parse(tokenstr)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, "cannot parse compose id: %v", err)
return echo.NewHTTPError(http.StatusBadRequest, "cannot parse job token: %v", err)
}
jobId, err := h.server.RunningJob(token)
if err != nil {
switch err {
case ErrTokenNotExist:
return echo.NewHTTPError(http.StatusNotFound, "job is not running")
default:
return err
}
}
if jobId == uuid.Nil {
return ctx.JSON(http.StatusOK, getJobResponse{})
}
status, err := h.server.JobStatus(jobId)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "error retrieving job status: %v", err)
}
return ctx.JSON(http.StatusOK, getJobResponse{
Canceled: status.Canceled,
})
}
func (h *apiHandlers) UpdateJob(ctx echo.Context, idstr string) error {
token, err := uuid.Parse(idstr)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, "cannot parse token: %v", err)
}
var body updateJobRequest
@ -225,13 +316,11 @@ func (h *apiHandlers) UpdateJob(ctx echo.Context, jobId string) error {
return echo.NewHTTPError(http.StatusBadRequest, "setting status of a job to waiting or running is not supported")
}
err = h.server.jobs.FinishJob(id, OSBuildJobResult{OSBuildOutput: body.Result})
err = h.server.FinishJob(token, &OSBuildJobResult{OSBuildOutput: body.Result})
if err != nil {
switch err {
case jobqueue.ErrNotExist:
return echo.NewHTTPError(http.StatusNotFound, "job does not exist: %s", id)
case jobqueue.ErrNotRunning:
return echo.NewHTTPError(http.StatusBadRequest, "job is not running: %s", id)
case ErrTokenNotExist:
return echo.NewHTTPError(http.StatusNotFound, "job does not exist: %v", token)
default:
return err
}
@ -240,10 +329,10 @@ func (h *apiHandlers) UpdateJob(ctx echo.Context, jobId string) error {
return ctx.JSON(http.StatusOK, updateJobResponse{})
}
func (h *apiHandlers) PostJobArtifact(ctx echo.Context, jobId string, name string) error {
id, err := uuid.Parse(jobId)
func (h *apiHandlers) UploadJobArtifact(ctx echo.Context, tokenstr string, name string) error {
token, err := uuid.Parse(tokenstr)
if err != nil {
return echo.NewHTTPError(http.StatusBadRequest, "cannot parse compose id: %v", err)
return echo.NewHTTPError(http.StatusBadRequest, "cannot parse job token: %v", err)
}
request := ctx.Request()
@ -256,12 +345,7 @@ func (h *apiHandlers) PostJobArtifact(ctx echo.Context, jobId string, name strin
return ctx.NoContent(http.StatusOK)
}
err = os.Mkdir(path.Join(h.server.artifactsDir, id.String()), 0700)
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "cannot create artifact directory: %v", err)
}
f, err := os.Create(path.Join(h.server.artifactsDir, id.String(), name))
f, err := os.Create(path.Join(h.server.artifactsDir, "tmp", token.String(), name))
if err != nil {
return echo.NewHTTPError(http.StatusInternalServerError, "cannot create artifact file: %v", err)
}

View file

@ -1,11 +1,11 @@
package worker_test
import (
"context"
"fmt"
"net/http"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/osbuild/osbuild-composer/internal/distro"
@ -64,14 +64,11 @@ func TestCreate(t *testing.T) {
}
server := worker.NewServer(nil, testjobqueue.New(), "")
id, err := server.Enqueue(manifest, nil)
_, err = server.Enqueue(manifest, nil)
require.NoError(t, err)
test.TestRoute(t, server, false, "POST", "/jobs", `{}`, http.StatusCreated,
`{"id":"`+id.String()+`","manifest":{"sources":{},"pipeline":{}}}`, "created")
test.TestRoute(t, server, false, "GET", fmt.Sprintf("/jobs/%s", id), `{}`, http.StatusOK,
`{"id":"`+id.String()+`","canceled":false}`)
`{"manifest":{"sources":{},"pipeline":{}}}`, "token", "created")
}
func TestCancel(t *testing.T) {
@ -90,82 +87,15 @@ func TestCancel(t *testing.T) {
}
server := worker.NewServer(nil, testjobqueue.New(), "")
id, err := server.Enqueue(manifest, nil)
jobId, err := server.Enqueue(manifest, nil)
require.NoError(t, err)
test.TestRoute(t, server, false, "POST", "/jobs", `{}`, http.StatusCreated,
`{"id":"`+id.String()+`","manifest":{"sources":{},"pipeline":{}}}`, "created")
err = server.Cancel(id)
token, _, err := server.RequestJob(context.Background())
require.NoError(t, err)
test.TestRoute(t, server, false, "GET", fmt.Sprintf("/jobs/%s", id), `{}`, http.StatusOK,
`{"id":"`+id.String()+`","canceled":true}`)
}
func testUpdateTransition(t *testing.T, from, to string, expectedStatus int) {
distroStruct := fedoratest.New()
arch, err := distroStruct.GetArch("x86_64")
if err != nil {
t.Fatalf("error getting arch from distro")
}
imageType, err := arch.GetImageType("qcow2")
if err != nil {
t.Fatalf("error getting image type from arch")
}
server := worker.NewServer(nil, testjobqueue.New(), "")
id := uuid.Nil
if from != "VOID" {
manifest, err := imageType.Manifest(nil, distro.ImageOptions{Size: imageType.Size(0)}, nil, nil, nil)
if err != nil {
t.Fatalf("error creating osbuild manifest")
}
id, err = server.Enqueue(manifest, nil)
require.NoError(t, err)
if from != "WAITING" {
test.SendHTTP(server, false, "POST", "/jobs", `{}`)
if from != "RUNNING" {
test.SendHTTP(server, false, "PATCH", "/jobs/"+id.String(), `{"status":"`+from+`"}`)
}
}
}
test.TestRoute(t, server, false, "PATCH", "/jobs/"+id.String(), `{"status":"`+to+`"}`, expectedStatus, "{}", "message")
}
func TestUpdate(t *testing.T) {
var cases = []struct {
From string
To string
ExpectedStatus int
}{
{"VOID", "WAITING", http.StatusBadRequest},
{"VOID", "RUNNING", http.StatusBadRequest},
{"VOID", "FINISHED", http.StatusNotFound},
{"VOID", "FAILED", http.StatusNotFound},
{"WAITING", "WAITING", http.StatusBadRequest},
{"WAITING", "RUNNING", http.StatusBadRequest},
{"WAITING", "FINISHED", http.StatusBadRequest},
{"WAITING", "FAILED", http.StatusBadRequest},
{"RUNNING", "WAITING", http.StatusBadRequest},
{"RUNNING", "RUNNING", http.StatusBadRequest},
{"RUNNING", "FINISHED", http.StatusOK},
{"RUNNING", "FAILED", http.StatusOK},
{"FINISHED", "WAITING", http.StatusBadRequest},
{"FINISHED", "RUNNING", http.StatusBadRequest},
{"FINISHED", "FINISHED", http.StatusBadRequest},
{"FINISHED", "FAILED", http.StatusBadRequest},
{"FAILED", "WAITING", http.StatusBadRequest},
{"FAILED", "RUNNING", http.StatusBadRequest},
{"FAILED", "FINISHED", http.StatusBadRequest},
{"FAILED", "FAILED", http.StatusBadRequest},
}
for _, c := range cases {
t.Log(c)
testUpdateTransition(t, c.From, c.To, c.ExpectedStatus)
}
err = server.Cancel(jobId)
require.NoError(t, err)
test.TestRoute(t, server, false, "GET", fmt.Sprintf("/jobs/%s", token), `{}`, http.StatusOK,
`{"canceled":true}`)
}