worker/api: remove token in favor of callback URLs

Instead of sending a `token` to workers, send back to URLs:

 1. "location": URL at which the job can be inspected (GET) and updated
    (PATCH).
 2. "artifact_location": URL at which artifacts should be uploaded to.

The actual URLs remain the same, but a client does not need to stitch
them together manually (except appending the artifact's name).

Unfortunately, the client code generated by `deepmap` does not lend
itself to this style of APIs. Use standard http.Client again, which is a
partial revert of 0962fbd30.
This commit is contained in:
Lars Karlitski 2020-09-08 17:00:18 +02:00 committed by Tom Gundersen
parent 901d724622
commit b03e1254e9
8 changed files with 166 additions and 813 deletions

View file

@ -7,7 +7,6 @@ import (
"errors"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"os"
@ -16,7 +15,6 @@ import (
"github.com/google/uuid"
"github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/distro"
"github.com/osbuild/osbuild-composer/internal/osbuild"
"github.com/osbuild/osbuild-composer/internal/target"
"github.com/osbuild/osbuild-composer/internal/upload/awsupload"
@ -68,7 +66,7 @@ func (e *TargetsError) Error() string {
return errString
}
func RunJob(token uuid.UUID, manifest distro.Manifest, targets []*target.Target, store string, uploadFunc func(uuid.UUID, string, io.Reader) error) (*osbuild.Result, error) {
func RunJob(job worker.Job, store string) (*osbuild.Result, error) {
outputDirectory, err := ioutil.TempDir("/var/tmp", "osbuild-worker-*")
if err != nil {
return nil, fmt.Errorf("error creating temporary output directory: %v", err)
@ -80,6 +78,11 @@ func RunJob(token uuid.UUID, manifest distro.Manifest, targets []*target.Target,
}
}()
manifest, targets, err := job.OSBuildArgs()
if err != nil {
return nil, err
}
result, err := RunOSBuild(manifest, store, outputDirectory, os.Stderr)
if err != nil {
return nil, err
@ -106,7 +109,7 @@ func RunJob(token uuid.UUID, manifest distro.Manifest, targets []*target.Target,
}
}
err = uploadFunc(token, options.Filename, f)
err = job.UploadArtifact(options.Filename, f)
if err != nil {
r = append(r, err)
continue
@ -182,11 +185,16 @@ func RunJob(token uuid.UUID, manifest distro.Manifest, targets []*target.Target,
// It would be cleaner to kill the osbuild process using (`exec.CommandContext`
// or similar), but osbuild does not currently support this. Exiting here will
// make systemd clean up the whole cgroup and restart this service.
func WatchJob(ctx context.Context, client *worker.Client, token uuid.UUID) {
func WatchJob(ctx context.Context, job worker.Job) {
for {
select {
case <-time.After(15 * time.Second):
if client.JobCanceled(token) {
canceled, err := job.Canceled()
if err != nil {
log.Printf("Error fetching job status: %v", err)
os.Exit(0)
}
if canceled {
log.Println("Job was canceled. Exiting.")
os.Exit(0)
}
@ -240,18 +248,18 @@ func main() {
for {
fmt.Println("Waiting for a new job...")
token, manifest, targets, err := client.RequestJob()
job, err := client.RequestJob()
if err != nil {
log.Fatal(err)
}
fmt.Printf("Running job %s\n", token)
fmt.Printf("Running next job\n")
ctx, cancel := context.WithCancel(context.Background())
go WatchJob(ctx, client, token)
go WatchJob(ctx, job)
var status common.ImageBuildState
result, err := RunJob(token, manifest, targets, store, client.UploadImage)
result, err := RunJob(job, store)
if err != nil {
log.Printf(" Job failed: %v", err)
status = common.IBFailed
@ -277,14 +285,14 @@ func main() {
// flag to indicate all error kinds.
result.Success = false
} else {
log.Printf(" 🎉 Job completed successfully: %s", token)
log.Printf(" 🎉 Job completed successfully")
status = common.IBFinished
}
// signal to WatchJob() that it can stop watching
cancel()
err = client.UpdateJob(token, status, result)
err = job.Update(status, result)
if err != nil {
log.Fatalf("Error reporting job result: %v", err)
}

View file

@ -4,17 +4,10 @@
package api
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/deepmap/oapi-codegen/pkg/runtime"
"github.com/labstack/echo/v4"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
)
// RequestJobJSONBody defines parameters for RequestJob.
@ -32,740 +25,6 @@ type RequestJobJSONRequestBody RequestJobJSONBody
// UpdateJobRequestBody defines body for UpdateJob for application/json ContentType.
type UpdateJobJSONRequestBody UpdateJobJSONBody
// RequestEditorFn is the function signature for the RequestEditor callback function
type RequestEditorFn func(ctx context.Context, req *http.Request) error
// Doer performs HTTP requests.
//
// The standard http.Client implements this interface.
type HttpRequestDoer interface {
Do(req *http.Request) (*http.Response, error)
}
// Client which conforms to the OpenAPI3 specification for this service.
type Client struct {
// The endpoint of the server conforming to this interface, with scheme,
// https://api.deepmap.com for example.
Server string
// Doer for performing requests, typically a *http.Client with any
// customized settings, such as certificate chains.
Client HttpRequestDoer
// A callback for modifying requests which are generated before sending over
// the network.
RequestEditor RequestEditorFn
}
// ClientOption allows setting custom parameters during construction
type ClientOption func(*Client) error
// Creates a new Client, with reasonable defaults
func NewClient(server string, opts ...ClientOption) (*Client, error) {
// create a client with sane default values
client := Client{
Server: server,
}
// mutate client and add all optional params
for _, o := range opts {
if err := o(&client); err != nil {
return nil, err
}
}
// ensure the server URL always has a trailing slash
if !strings.HasSuffix(client.Server, "/") {
client.Server += "/"
}
// create httpClient, if not already present
if client.Client == nil {
client.Client = http.DefaultClient
}
return &client, nil
}
// WithHTTPClient allows overriding the default Doer, which is
// automatically created using http.Client. This is useful for tests.
func WithHTTPClient(doer HttpRequestDoer) ClientOption {
return func(c *Client) error {
c.Client = doer
return nil
}
}
// WithRequestEditorFn allows setting up a callback function, which will be
// called right before sending the request. This can be used to mutate the request.
func WithRequestEditorFn(fn RequestEditorFn) ClientOption {
return func(c *Client) error {
c.RequestEditor = fn
return nil
}
}
// The interface specification for the client above.
type ClientInterface interface {
// RequestJob request with any body
RequestJobWithBody(ctx context.Context, contentType string, body io.Reader) (*http.Response, error)
RequestJob(ctx context.Context, body RequestJobJSONRequestBody) (*http.Response, error)
// GetJob request
GetJob(ctx context.Context, token string) (*http.Response, error)
// UpdateJob request with any body
UpdateJobWithBody(ctx context.Context, token string, contentType string, body io.Reader) (*http.Response, error)
UpdateJob(ctx context.Context, token string, body UpdateJobJSONRequestBody) (*http.Response, error)
// UploadJobArtifact request with any body
UploadJobArtifactWithBody(ctx context.Context, token string, name string, contentType string, body io.Reader) (*http.Response, error)
// GetStatus request
GetStatus(ctx context.Context) (*http.Response, error)
}
func (c *Client) RequestJobWithBody(ctx context.Context, contentType string, body io.Reader) (*http.Response, error) {
req, err := NewRequestJobRequestWithBody(c.Server, contentType, body)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
if c.RequestEditor != nil {
err = c.RequestEditor(ctx, req)
if err != nil {
return nil, err
}
}
return c.Client.Do(req)
}
func (c *Client) RequestJob(ctx context.Context, body RequestJobJSONRequestBody) (*http.Response, error) {
req, err := NewRequestJobRequest(c.Server, body)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
if c.RequestEditor != nil {
err = c.RequestEditor(ctx, req)
if err != nil {
return nil, err
}
}
return c.Client.Do(req)
}
func (c *Client) GetJob(ctx context.Context, token string) (*http.Response, error) {
req, err := NewGetJobRequest(c.Server, token)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
if c.RequestEditor != nil {
err = c.RequestEditor(ctx, req)
if err != nil {
return nil, err
}
}
return c.Client.Do(req)
}
func (c *Client) UpdateJobWithBody(ctx context.Context, token string, contentType string, body io.Reader) (*http.Response, error) {
req, err := NewUpdateJobRequestWithBody(c.Server, token, contentType, body)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
if c.RequestEditor != nil {
err = c.RequestEditor(ctx, req)
if err != nil {
return nil, err
}
}
return c.Client.Do(req)
}
func (c *Client) UpdateJob(ctx context.Context, token string, body UpdateJobJSONRequestBody) (*http.Response, error) {
req, err := NewUpdateJobRequest(c.Server, token, body)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
if c.RequestEditor != nil {
err = c.RequestEditor(ctx, req)
if err != nil {
return nil, err
}
}
return c.Client.Do(req)
}
func (c *Client) UploadJobArtifactWithBody(ctx context.Context, token string, name string, contentType string, body io.Reader) (*http.Response, error) {
req, err := NewUploadJobArtifactRequestWithBody(c.Server, token, name, contentType, body)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
if c.RequestEditor != nil {
err = c.RequestEditor(ctx, req)
if err != nil {
return nil, err
}
}
return c.Client.Do(req)
}
func (c *Client) GetStatus(ctx context.Context) (*http.Response, error) {
req, err := NewGetStatusRequest(c.Server)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
if c.RequestEditor != nil {
err = c.RequestEditor(ctx, req)
if err != nil {
return nil, err
}
}
return c.Client.Do(req)
}
// NewRequestJobRequest calls the generic RequestJob builder with application/json body
func NewRequestJobRequest(server string, body RequestJobJSONRequestBody) (*http.Request, error) {
var bodyReader io.Reader
buf, err := json.Marshal(body)
if err != nil {
return nil, err
}
bodyReader = bytes.NewReader(buf)
return NewRequestJobRequestWithBody(server, "application/json", bodyReader)
}
// NewRequestJobRequestWithBody generates requests for RequestJob with any type of body
func NewRequestJobRequestWithBody(server string, contentType string, body io.Reader) (*http.Request, error) {
var err error
queryUrl, err := url.Parse(server)
if err != nil {
return nil, err
}
basePath := fmt.Sprintf("/jobs")
if basePath[0] == '/' {
basePath = basePath[1:]
}
queryUrl, err = queryUrl.Parse(basePath)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", queryUrl.String(), body)
if err != nil {
return nil, err
}
req.Header.Add("Content-Type", contentType)
return req, nil
}
// NewGetJobRequest generates requests for GetJob
func NewGetJobRequest(server string, token string) (*http.Request, error) {
var err error
var pathParam0 string
pathParam0, err = runtime.StyleParam("simple", false, "token", token)
if err != nil {
return nil, err
}
queryUrl, err := url.Parse(server)
if err != nil {
return nil, err
}
basePath := fmt.Sprintf("/jobs/%s", pathParam0)
if basePath[0] == '/' {
basePath = basePath[1:]
}
queryUrl, err = queryUrl.Parse(basePath)
if err != nil {
return nil, err
}
req, err := http.NewRequest("GET", queryUrl.String(), nil)
if err != nil {
return nil, err
}
return req, nil
}
// NewUpdateJobRequest calls the generic UpdateJob builder with application/json body
func NewUpdateJobRequest(server string, token string, body UpdateJobJSONRequestBody) (*http.Request, error) {
var bodyReader io.Reader
buf, err := json.Marshal(body)
if err != nil {
return nil, err
}
bodyReader = bytes.NewReader(buf)
return NewUpdateJobRequestWithBody(server, token, "application/json", bodyReader)
}
// NewUpdateJobRequestWithBody generates requests for UpdateJob with any type of body
func NewUpdateJobRequestWithBody(server string, token string, contentType string, body io.Reader) (*http.Request, error) {
var err error
var pathParam0 string
pathParam0, err = runtime.StyleParam("simple", false, "token", token)
if err != nil {
return nil, err
}
queryUrl, err := url.Parse(server)
if err != nil {
return nil, err
}
basePath := fmt.Sprintf("/jobs/%s", pathParam0)
if basePath[0] == '/' {
basePath = basePath[1:]
}
queryUrl, err = queryUrl.Parse(basePath)
if err != nil {
return nil, err
}
req, err := http.NewRequest("PATCH", queryUrl.String(), body)
if err != nil {
return nil, err
}
req.Header.Add("Content-Type", contentType)
return req, nil
}
// NewUploadJobArtifactRequestWithBody generates requests for UploadJobArtifact with any type of body
func NewUploadJobArtifactRequestWithBody(server string, token string, name string, contentType string, body io.Reader) (*http.Request, error) {
var err error
var pathParam0 string
pathParam0, err = runtime.StyleParam("simple", false, "token", token)
if err != nil {
return nil, err
}
var pathParam1 string
pathParam1, err = runtime.StyleParam("simple", false, "name", name)
if err != nil {
return nil, err
}
queryUrl, err := url.Parse(server)
if err != nil {
return nil, err
}
basePath := fmt.Sprintf("/jobs/%s/artifacts/%s", pathParam0, pathParam1)
if basePath[0] == '/' {
basePath = basePath[1:]
}
queryUrl, err = queryUrl.Parse(basePath)
if err != nil {
return nil, err
}
req, err := http.NewRequest("PUT", queryUrl.String(), body)
if err != nil {
return nil, err
}
req.Header.Add("Content-Type", contentType)
return req, nil
}
// NewGetStatusRequest generates requests for GetStatus
func NewGetStatusRequest(server string) (*http.Request, error) {
var err error
queryUrl, err := url.Parse(server)
if err != nil {
return nil, err
}
basePath := fmt.Sprintf("/status")
if basePath[0] == '/' {
basePath = basePath[1:]
}
queryUrl, err = queryUrl.Parse(basePath)
if err != nil {
return nil, err
}
req, err := http.NewRequest("GET", queryUrl.String(), nil)
if err != nil {
return nil, err
}
return req, nil
}
// ClientWithResponses builds on ClientInterface to offer response payloads
type ClientWithResponses struct {
ClientInterface
}
// NewClientWithResponses creates a new ClientWithResponses, which wraps
// Client with return type handling
func NewClientWithResponses(server string, opts ...ClientOption) (*ClientWithResponses, error) {
client, err := NewClient(server, opts...)
if err != nil {
return nil, err
}
return &ClientWithResponses{client}, nil
}
// WithBaseURL overrides the baseURL.
func WithBaseURL(baseURL string) ClientOption {
return func(c *Client) error {
newBaseURL, err := url.Parse(baseURL)
if err != nil {
return err
}
c.Server = newBaseURL.String()
return nil
}
}
// ClientWithResponsesInterface is the interface specification for the client with responses above.
type ClientWithResponsesInterface interface {
// RequestJob request with any body
RequestJobWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader) (*RequestJobResponse, error)
RequestJobWithResponse(ctx context.Context, body RequestJobJSONRequestBody) (*RequestJobResponse, error)
// GetJob request
GetJobWithResponse(ctx context.Context, token string) (*GetJobResponse, error)
// UpdateJob request with any body
UpdateJobWithBodyWithResponse(ctx context.Context, token string, contentType string, body io.Reader) (*UpdateJobResponse, error)
UpdateJobWithResponse(ctx context.Context, token string, body UpdateJobJSONRequestBody) (*UpdateJobResponse, error)
// UploadJobArtifact request with any body
UploadJobArtifactWithBodyWithResponse(ctx context.Context, token string, name string, contentType string, body io.Reader) (*UploadJobArtifactResponse, error)
// GetStatus request
GetStatusWithResponse(ctx context.Context) (*GetStatusResponse, error)
}
type RequestJobResponse struct {
Body []byte
HTTPResponse *http.Response
JSON201 *struct {
Manifest interface{} `json:"manifest"`
Targets *[]interface{} `json:"targets,omitempty"`
Token string `json:"token"`
}
}
// Status returns HTTPResponse.Status
func (r RequestJobResponse) Status() string {
if r.HTTPResponse != nil {
return r.HTTPResponse.Status
}
return http.StatusText(0)
}
// StatusCode returns HTTPResponse.StatusCode
func (r RequestJobResponse) StatusCode() int {
if r.HTTPResponse != nil {
return r.HTTPResponse.StatusCode
}
return 0
}
type GetJobResponse struct {
Body []byte
HTTPResponse *http.Response
JSON200 *struct {
Canceled bool `json:"canceled"`
}
}
// Status returns HTTPResponse.Status
func (r GetJobResponse) Status() string {
if r.HTTPResponse != nil {
return r.HTTPResponse.Status
}
return http.StatusText(0)
}
// StatusCode returns HTTPResponse.StatusCode
func (r GetJobResponse) StatusCode() int {
if r.HTTPResponse != nil {
return r.HTTPResponse.StatusCode
}
return 0
}
type UpdateJobResponse struct {
Body []byte
HTTPResponse *http.Response
}
// Status returns HTTPResponse.Status
func (r UpdateJobResponse) Status() string {
if r.HTTPResponse != nil {
return r.HTTPResponse.Status
}
return http.StatusText(0)
}
// StatusCode returns HTTPResponse.StatusCode
func (r UpdateJobResponse) StatusCode() int {
if r.HTTPResponse != nil {
return r.HTTPResponse.StatusCode
}
return 0
}
type UploadJobArtifactResponse struct {
Body []byte
HTTPResponse *http.Response
}
// Status returns HTTPResponse.Status
func (r UploadJobArtifactResponse) Status() string {
if r.HTTPResponse != nil {
return r.HTTPResponse.Status
}
return http.StatusText(0)
}
// StatusCode returns HTTPResponse.StatusCode
func (r UploadJobArtifactResponse) StatusCode() int {
if r.HTTPResponse != nil {
return r.HTTPResponse.StatusCode
}
return 0
}
type GetStatusResponse struct {
Body []byte
HTTPResponse *http.Response
JSON200 *struct {
Status string `json:"status"`
}
}
// Status returns HTTPResponse.Status
func (r GetStatusResponse) Status() string {
if r.HTTPResponse != nil {
return r.HTTPResponse.Status
}
return http.StatusText(0)
}
// StatusCode returns HTTPResponse.StatusCode
func (r GetStatusResponse) StatusCode() int {
if r.HTTPResponse != nil {
return r.HTTPResponse.StatusCode
}
return 0
}
// RequestJobWithBodyWithResponse request with arbitrary body returning *RequestJobResponse
func (c *ClientWithResponses) RequestJobWithBodyWithResponse(ctx context.Context, contentType string, body io.Reader) (*RequestJobResponse, error) {
rsp, err := c.RequestJobWithBody(ctx, contentType, body)
if err != nil {
return nil, err
}
return ParseRequestJobResponse(rsp)
}
func (c *ClientWithResponses) RequestJobWithResponse(ctx context.Context, body RequestJobJSONRequestBody) (*RequestJobResponse, error) {
rsp, err := c.RequestJob(ctx, body)
if err != nil {
return nil, err
}
return ParseRequestJobResponse(rsp)
}
// GetJobWithResponse request returning *GetJobResponse
func (c *ClientWithResponses) GetJobWithResponse(ctx context.Context, token string) (*GetJobResponse, error) {
rsp, err := c.GetJob(ctx, token)
if err != nil {
return nil, err
}
return ParseGetJobResponse(rsp)
}
// UpdateJobWithBodyWithResponse request with arbitrary body returning *UpdateJobResponse
func (c *ClientWithResponses) UpdateJobWithBodyWithResponse(ctx context.Context, token string, contentType string, body io.Reader) (*UpdateJobResponse, error) {
rsp, err := c.UpdateJobWithBody(ctx, token, contentType, body)
if err != nil {
return nil, err
}
return ParseUpdateJobResponse(rsp)
}
func (c *ClientWithResponses) UpdateJobWithResponse(ctx context.Context, token string, body UpdateJobJSONRequestBody) (*UpdateJobResponse, error) {
rsp, err := c.UpdateJob(ctx, token, body)
if err != nil {
return nil, err
}
return ParseUpdateJobResponse(rsp)
}
// UploadJobArtifactWithBodyWithResponse request with arbitrary body returning *UploadJobArtifactResponse
func (c *ClientWithResponses) UploadJobArtifactWithBodyWithResponse(ctx context.Context, token string, name string, contentType string, body io.Reader) (*UploadJobArtifactResponse, error) {
rsp, err := c.UploadJobArtifactWithBody(ctx, token, name, contentType, body)
if err != nil {
return nil, err
}
return ParseUploadJobArtifactResponse(rsp)
}
// GetStatusWithResponse request returning *GetStatusResponse
func (c *ClientWithResponses) GetStatusWithResponse(ctx context.Context) (*GetStatusResponse, error) {
rsp, err := c.GetStatus(ctx)
if err != nil {
return nil, err
}
return ParseGetStatusResponse(rsp)
}
// ParseRequestJobResponse parses an HTTP response from a RequestJobWithResponse call
func ParseRequestJobResponse(rsp *http.Response) (*RequestJobResponse, error) {
bodyBytes, err := ioutil.ReadAll(rsp.Body)
defer rsp.Body.Close()
if err != nil {
return nil, err
}
response := &RequestJobResponse{
Body: bodyBytes,
HTTPResponse: rsp,
}
switch {
case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 201:
var dest struct {
Manifest interface{} `json:"manifest"`
Targets *[]interface{} `json:"targets,omitempty"`
Token string `json:"token"`
}
if err := json.Unmarshal(bodyBytes, &dest); err != nil {
return nil, err
}
response.JSON201 = &dest
}
return response, nil
}
// ParseGetJobResponse parses an HTTP response from a GetJobWithResponse call
func ParseGetJobResponse(rsp *http.Response) (*GetJobResponse, error) {
bodyBytes, err := ioutil.ReadAll(rsp.Body)
defer rsp.Body.Close()
if err != nil {
return nil, err
}
response := &GetJobResponse{
Body: bodyBytes,
HTTPResponse: rsp,
}
switch {
case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 200:
var dest struct {
Canceled bool `json:"canceled"`
}
if err := json.Unmarshal(bodyBytes, &dest); err != nil {
return nil, err
}
response.JSON200 = &dest
}
return response, nil
}
// ParseUpdateJobResponse parses an HTTP response from a UpdateJobWithResponse call
func ParseUpdateJobResponse(rsp *http.Response) (*UpdateJobResponse, error) {
bodyBytes, err := ioutil.ReadAll(rsp.Body)
defer rsp.Body.Close()
if err != nil {
return nil, err
}
response := &UpdateJobResponse{
Body: bodyBytes,
HTTPResponse: rsp,
}
switch {
}
return response, nil
}
// ParseUploadJobArtifactResponse parses an HTTP response from a UploadJobArtifactWithResponse call
func ParseUploadJobArtifactResponse(rsp *http.Response) (*UploadJobArtifactResponse, error) {
bodyBytes, err := ioutil.ReadAll(rsp.Body)
defer rsp.Body.Close()
if err != nil {
return nil, err
}
response := &UploadJobArtifactResponse{
Body: bodyBytes,
HTTPResponse: rsp,
}
switch {
}
return response, nil
}
// ParseGetStatusResponse parses an HTTP response from a GetStatusWithResponse call
func ParseGetStatusResponse(rsp *http.Response) (*GetStatusResponse, error) {
bodyBytes, err := ioutil.ReadAll(rsp.Body)
defer rsp.Body.Close()
if err != nil {
return nil, err
}
response := &GetStatusResponse{
Body: bodyBytes,
HTTPResponse: rsp,
}
switch {
case strings.Contains(rsp.Header.Get("Content-Type"), "json") && rsp.StatusCode == 200:
var dest struct {
Status string `json:"status"`
}
if err := json.Unmarshal(bodyBytes, &dest); err != nil {
return nil, err
}
response.JSON200 = &dest
}
return response, nil
}
// ServerInterface represents all server handlers.
type ServerInterface interface {
// Request a job

View file

@ -1,3 +1,3 @@
//go:generate go run github.com/deepmap/oapi-codegen/cmd/oapi-codegen -package=api -generate types,server,client -o api.gen.go openapi.yml
//go:generate go run github.com/deepmap/oapi-codegen/cmd/oapi-codegen -package=api -generate types,server -o api.gen.go openapi.yml
package api

View file

@ -38,16 +38,17 @@ paths:
type: object
additionalProperties: false
properties:
token:
type: string
format: uuid
manifest: {}
targets:
type: array
items: {}
location:
type: string
artifact_location:
type: string
required:
- token
- manifest
- location
operationId: RequestJob
requestBody:
content:

View file

@ -1,6 +1,7 @@
package worker
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
@ -9,8 +10,7 @@ import (
"io"
"net"
"net/http"
"github.com/google/uuid"
"net/url"
"github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/distro"
@ -20,26 +20,47 @@ import (
)
type Client struct {
api *api.Client
server *url.URL
requester *http.Client
}
type Job interface {
OSBuildArgs() (distro.Manifest, []*target.Target, error)
Update(status common.ImageBuildState, result *osbuild.Result) error
Canceled() (bool, error)
UploadArtifact(name string, reader io.Reader) error
}
type job struct {
requester *http.Client
manifest distro.Manifest
targets []*target.Target
location string
artifactLocation string
}
func NewClient(baseURL string, conf *tls.Config) (*Client, error) {
httpClient := http.Client{
server, err := url.Parse(baseURL)
if err != nil {
return nil, err
}
requester := &http.Client{
Transport: &http.Transport{
TLSClientConfig: conf,
},
}
c, err := api.NewClient(baseURL, api.WithHTTPClient(&httpClient))
if err != nil {
return nil, err
}
return &Client{c}, nil
return &Client{server, requester}, nil
}
func NewClientUnix(path string) *Client {
httpClient := http.Client{
server, err := url.Parse("http://localhost")
if err != nil {
panic(err)
}
requester := &http.Client{
Transport: &http.Transport{
DialContext: func(context context.Context, network, addr string) (net.Conn, error) {
return net.Dial("unix", path)
@ -47,65 +68,86 @@ func NewClientUnix(path string) *Client {
},
}
c, err := api.NewClient("http://localhost", api.WithHTTPClient(&httpClient))
return &Client{server, requester}
}
func (c *Client) RequestJob() (Job, error) {
url, err := c.server.Parse("/jobs")
if err != nil {
// This only happens when "/jobs" cannot be parsed.
panic(err)
}
var buf bytes.Buffer
err = json.NewEncoder(&buf).Encode(api.RequestJobJSONRequestBody{})
if err != nil {
panic(err)
}
return &Client{c}
}
func (c *Client) RequestJob() (uuid.UUID, distro.Manifest, []*target.Target, error) {
response, err := c.api.RequestJob(context.Background(), api.RequestJobJSONRequestBody{})
response, err := c.requester.Post(url.String(), "application/json", &buf)
if err != nil {
return uuid.Nil, nil, nil, err
return nil, fmt.Errorf("error requesting job: %v", err)
}
defer response.Body.Close()
if response.StatusCode != http.StatusCreated {
var er errorResponse
_ = json.NewDecoder(response.Body).Decode(&er)
return uuid.Nil, nil, nil, fmt.Errorf("couldn't create job, got %d: %s", response.StatusCode, er.Message)
return nil, fmt.Errorf("couldn't create job, got %d: %s", response.StatusCode, er.Message)
}
var jr requestJobResponse
err = json.NewDecoder(response.Body).Decode(&jr)
if err != nil {
return uuid.Nil, nil, nil, err
return nil, fmt.Errorf("error parsing response: %v", err)
}
return jr.Token, jr.Manifest, jr.Targets, nil
}
func (c *Client) JobCanceled(token uuid.UUID) bool {
response, err := c.api.GetJob(context.Background(), token.String())
location, err := c.server.Parse(jr.Location)
if err != nil {
return true
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return true
return nil, fmt.Errorf("error parsing location url in response: %v", err)
}
var jr getJobResponse
err = json.NewDecoder(response.Body).Decode(&jr)
artifactLocation, err := c.server.Parse(jr.ArtifactLocation)
if err != nil {
return true
return nil, fmt.Errorf("error parsing artifact location url in response: %v", err)
}
return jr.Canceled
return &job{
requester: c.requester,
manifest: jr.Manifest,
targets: jr.Targets,
location: location.String(),
artifactLocation: artifactLocation.String(),
}, nil
}
func (c *Client) UpdateJob(token uuid.UUID, status common.ImageBuildState, result *osbuild.Result) error {
response, err := c.api.UpdateJob(context.Background(), token.String(), api.UpdateJobJSONRequestBody{
func (j *job) OSBuildArgs() (distro.Manifest, []*target.Target, error) {
return j.manifest, j.targets, nil
}
func (j *job) Update(status common.ImageBuildState, result *osbuild.Result) error {
var buf bytes.Buffer
err := json.NewEncoder(&buf).Encode(api.UpdateJobJSONRequestBody{
Result: result,
Status: status.ToString(),
})
if err != nil {
return err
panic(err)
}
req, err := http.NewRequest("PATCH", j.location, &buf)
if err != nil {
panic(err)
}
req.Header.Add("Content-Type", "application/json")
response, err := j.requester.Do(req)
if err != nil {
return fmt.Errorf("error fetching job info: %v", err)
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return errors.New("error setting job status")
}
@ -113,9 +155,52 @@ func (c *Client) UpdateJob(token uuid.UUID, status common.ImageBuildState, resul
return nil
}
func (c *Client) UploadImage(token uuid.UUID, name string, reader io.Reader) error {
_, err := c.api.UploadJobArtifactWithBody(context.Background(),
token.String(), name, "application/octet-stream", reader)
return err
func (j *job) Canceled() (bool, error) {
response, err := j.requester.Get(j.location)
if err != nil {
return false, fmt.Errorf("error fetching job info: %v", err)
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return false, fmt.Errorf("unexpected return value: %v", response.StatusCode)
}
var jr getJobResponse
err = json.NewDecoder(response.Body).Decode(&jr)
if err != nil {
return false, fmt.Errorf("error parsing reponse: %v", err)
}
return jr.Canceled, nil
}
func (j *job) UploadArtifact(name string, reader io.Reader) error {
if j.artifactLocation == "" {
return fmt.Errorf("server does not accept artifacts for this job")
}
loc, err := url.Parse(j.artifactLocation)
if err != nil {
return fmt.Errorf("error parsing job location: %v", err)
}
loc, err = loc.Parse(url.PathEscape(name))
if err != nil {
panic(err)
}
req, err := http.NewRequest("PUT", loc.String(), reader)
if err != nil {
return fmt.Errorf("cannot create request: %v", err)
}
req.Header.Add("Content-Type", "application/octet-stream")
_, err = j.requester.Do(req)
if err != nil {
return fmt.Errorf("error uploading artifcat: %v", err)
}
return nil
}

View file

@ -1,8 +1,6 @@
package worker
import (
"github.com/google/uuid"
"github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/distro"
"github.com/osbuild/osbuild-composer/internal/osbuild"
@ -35,9 +33,10 @@ type errorResponse struct {
}
type requestJobResponse struct {
Token uuid.UUID `json:"token"`
Manifest distro.Manifest `json:"manifest"`
Targets []*target.Target `json:"targets,omitempty"`
Location string `json:"location"`
ArtifactLocation string `json:"artifact_location"`
}
type getJobResponse struct {

View file

@ -261,9 +261,10 @@ func (h *apiHandlers) RequestJob(ctx echo.Context) error {
}
return ctx.JSON(http.StatusCreated, requestJobResponse{
Token: token,
Manifest: jobArgs.Manifest,
Targets: jobArgs.Targets,
Location: fmt.Sprintf("/jobs/%v", token),
ArtifactLocation: fmt.Sprintf("/jobs/%v/artifacts/", token),
})
}

View file

@ -68,7 +68,7 @@ func TestCreate(t *testing.T) {
require.NoError(t, err)
test.TestRoute(t, server, false, "POST", "/jobs", `{}`, http.StatusCreated,
`{"manifest":{"sources":{},"pipeline":{}}}`, "token", "created")
`{"manifest":{"sources":{},"pipeline":{}}}`, "location", "artifact_location", "created")
}
func TestCancel(t *testing.T) {