jobqueue: rename to worker

This package does not contain an actual queue, but a server and client
implementation for osbuild's worker API. Name it accordingly.

The queue is in package `store` right now, but about to be split off.
This rename makes the `jobqueue` name free for that effort.
This commit is contained in:
Lars Karlitski 2020-04-15 23:12:49 +02:00 committed by Tom Gundersen
parent 76bd5ab984
commit ac40b0e73b
6 changed files with 15 additions and 15 deletions

View file

@ -1,177 +0,0 @@
package jobqueue
import (
"encoding/json"
"fmt"
"log"
"net"
"net/http"
"strconv"
"github.com/google/uuid"
"github.com/julienschmidt/httprouter"
"github.com/osbuild/osbuild-composer/internal/store"
)
type API struct {
logger *log.Logger
store *store.Store
router *httprouter.Router
}
func New(logger *log.Logger, store *store.Store) *API {
api := &API{
logger: logger,
store: store,
}
api.router = httprouter.New()
api.router.RedirectTrailingSlash = false
api.router.RedirectFixedPath = false
api.router.MethodNotAllowed = http.HandlerFunc(methodNotAllowedHandler)
api.router.NotFound = http.HandlerFunc(notFoundHandler)
api.router.POST("/job-queue/v1/jobs", api.addJobHandler)
api.router.PATCH("/job-queue/v1/jobs/:job_id/builds/:build_id", api.updateJobHandler)
api.router.POST("/job-queue/v1/jobs/:job_id/builds/:build_id/image", api.addJobImageHandler)
return api
}
func (api *API) Serve(listener net.Listener) error {
server := http.Server{Handler: api}
err := server.Serve(listener)
if err != nil && err != http.ErrServerClosed {
return err
}
return nil
}
func (api *API) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
if api.logger != nil {
log.Println(request.Method, request.URL.Path)
}
writer.Header().Set("Content-Type", "application/json; charset=utf-8")
api.router.ServeHTTP(writer, request)
}
// jsonErrorf() is similar to http.Error(), but returns the message in a json
// object with a "message" field.
func jsonErrorf(writer http.ResponseWriter, code int, message string, args ...interface{}) {
writer.WriteHeader(code)
// ignore error, because we cannot do anything useful with it
_ = json.NewEncoder(writer).Encode(&errorResponse{
Message: fmt.Sprintf(message, args...),
})
}
func methodNotAllowedHandler(writer http.ResponseWriter, request *http.Request) {
jsonErrorf(writer, http.StatusMethodNotAllowed, "method not allowed")
}
func notFoundHandler(writer http.ResponseWriter, request *http.Request) {
jsonErrorf(writer, http.StatusNotFound, "not found")
}
func (api *API) addJobHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) {
contentType := request.Header["Content-Type"]
if len(contentType) != 1 || contentType[0] != "application/json" {
jsonErrorf(writer, http.StatusUnsupportedMediaType, "request must contain application/json data")
return
}
var body addJobRequest
err := json.NewDecoder(request.Body).Decode(&body)
if err != nil {
jsonErrorf(writer, http.StatusBadRequest, "%v", err)
return
}
nextJob := api.store.PopJob()
writer.WriteHeader(http.StatusCreated)
// FIXME: handle or comment this possible error
_ = json.NewEncoder(writer).Encode(addJobResponse{
ComposeID: nextJob.ComposeID,
ImageBuildID: nextJob.ImageBuildID,
Manifest: nextJob.Manifest,
Targets: nextJob.Targets,
})
}
func (api *API) updateJobHandler(writer http.ResponseWriter, request *http.Request, params httprouter.Params) {
contentType := request.Header["Content-Type"]
if len(contentType) != 1 || contentType[0] != "application/json" {
jsonErrorf(writer, http.StatusUnsupportedMediaType, "request must contain application/json data")
return
}
id, err := uuid.Parse(params.ByName("job_id"))
if err != nil {
jsonErrorf(writer, http.StatusBadRequest, "cannot parse compose id: %v", err)
return
}
imageBuildId, err := strconv.Atoi(params.ByName("build_id"))
if err != nil {
jsonErrorf(writer, http.StatusBadRequest, "cannot parse image build id: %v", err)
return
}
var body updateJobRequest
err = json.NewDecoder(request.Body).Decode(&body)
if err != nil {
jsonErrorf(writer, http.StatusBadRequest, "cannot parse request body: %v", err)
return
}
err = api.store.UpdateImageBuildInCompose(id, imageBuildId, body.Status, body.Result)
if err != nil {
switch err.(type) {
case *store.NotFoundError, *store.NotPendingError:
jsonErrorf(writer, http.StatusNotFound, "%v", err)
case *store.NotRunningError, *store.InvalidRequestError:
jsonErrorf(writer, http.StatusBadRequest, "%v", err)
default:
jsonErrorf(writer, http.StatusInternalServerError, "%v", err)
}
return
}
_ = json.NewEncoder(writer).Encode(updateJobResponse{})
}
func (api *API) addJobImageHandler(writer http.ResponseWriter, request *http.Request, params httprouter.Params) {
id, err := uuid.Parse(params.ByName("job_id"))
if err != nil {
jsonErrorf(writer, http.StatusBadRequest, "cannot parse compose id: %v", err)
return
}
imageBuildId, err := strconv.Atoi(params.ByName("build_id"))
if err != nil {
jsonErrorf(writer, http.StatusBadRequest, "cannot parse image build id: %v", err)
return
}
err = api.store.AddImageToImageUpload(id, imageBuildId, request.Body)
if err != nil {
switch err.(type) {
case *store.NotFoundError:
jsonErrorf(writer, http.StatusNotFound, "%v", err)
case *store.NoLocalTargetError:
jsonErrorf(writer, http.StatusBadRequest, "%v", err)
default:
jsonErrorf(writer, http.StatusInternalServerError, "%v", err)
}
return
}
}

View file

@ -1,127 +0,0 @@
package jobqueue_test
import (
"net/http"
"testing"
"github.com/google/uuid"
"github.com/osbuild/osbuild-composer/internal/blueprint"
"github.com/osbuild/osbuild-composer/internal/distro/fedoratest"
"github.com/osbuild/osbuild-composer/internal/jobqueue"
"github.com/osbuild/osbuild-composer/internal/store"
"github.com/osbuild/osbuild-composer/internal/test"
)
func TestErrors(t *testing.T) {
var cases = []struct {
Method string
Path string
Body string
ExpectedStatus int
}{
// Bogus path
{"GET", "/foo", ``, http.StatusNotFound},
// Create job with invalid body
{"POST", "/job-queue/v1/jobs", ``, http.StatusBadRequest},
// Wrong method
{"GET", "/job-queue/v1/jobs", ``, http.StatusMethodNotAllowed},
// Update job with invalid ID
{"PATCH", "/job-queue/v1/jobs/foo/builds/0", `{"status":"RUNNING"}`, http.StatusBadRequest},
// Update job that does not exist, with invalid body
{"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa/builds/0", ``, http.StatusBadRequest},
// Update job that does not exist
{"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa/builds/0", `{"status":"RUNNING"}`, http.StatusNotFound},
}
for _, c := range cases {
api := jobqueue.New(nil, store.New(nil))
test.TestRoute(t, api, false, c.Method, c.Path, c.Body, c.ExpectedStatus, "{}", "message")
}
}
func TestCreate(t *testing.T) {
distroStruct := fedoratest.New()
arch, err := distroStruct.GetArch("x86_64")
if err != nil {
t.Fatalf("error getting arch from distro")
}
imageType, err := arch.GetImageType("qcow2")
if err != nil {
t.Fatalf("error getting image type from arch")
}
store := store.New(nil)
api := jobqueue.New(nil, store)
id, err := store.PushCompose(imageType, &blueprint.Blueprint{}, nil, nil, nil, 0, nil)
if err != nil {
t.Fatalf("error pushing compose: %v", err)
}
test.TestRoute(t, api, false, "POST", "/job-queue/v1/jobs", `{}`, http.StatusCreated,
`{"compose_id":"`+id.String()+`","image_build_id":0,"manifest":{"sources":{},"pipeline":{}},"targets":[]}`, "created")
}
func testUpdateTransition(t *testing.T, from, to string, expectedStatus int) {
distroStruct := fedoratest.New()
arch, err := distroStruct.GetArch("x86_64")
if err != nil {
t.Fatalf("error getting arch from distro")
}
imageType, err := arch.GetImageType("qcow2")
if err != nil {
t.Fatalf("error getting image type from arch")
}
store := store.New(nil)
api := jobqueue.New(nil, store)
id := uuid.Nil
if from != "VOID" {
id, err = store.PushCompose(imageType, &blueprint.Blueprint{}, nil, nil, nil, 0, nil)
if err != nil {
t.Fatalf("error pushing compose: %v", err)
}
if from != "WAITING" {
test.SendHTTP(api, false, "POST", "/job-queue/v1/jobs", `{}`)
if from != "RUNNING" {
test.SendHTTP(api, false, "PATCH", "/job-queue/v1/jobs/"+id.String()+"/builds/0", `{"status":"`+from+`"}`)
}
}
}
test.TestRoute(t, api, false, "PATCH", "/job-queue/v1/jobs/"+id.String()+"/builds/0", `{"status":"`+to+`"}`, expectedStatus, "{}", "message")
}
func TestUpdate(t *testing.T) {
var cases = []struct {
From string
To string
ExpectedStatus int
}{
{"VOID", "WAITING", http.StatusNotFound},
{"VOID", "RUNNING", http.StatusNotFound},
{"VOID", "FINISHED", http.StatusNotFound},
{"VOID", "FAILED", http.StatusNotFound},
{"WAITING", "WAITING", http.StatusNotFound},
{"WAITING", "RUNNING", http.StatusNotFound},
{"WAITING", "FINISHED", http.StatusNotFound},
{"WAITING", "FAILED", http.StatusNotFound},
{"RUNNING", "WAITING", http.StatusBadRequest},
{"RUNNING", "RUNNING", http.StatusOK},
{"RUNNING", "FINISHED", http.StatusOK},
{"RUNNING", "FAILED", http.StatusOK},
{"FINISHED", "WAITING", http.StatusBadRequest},
{"FINISHED", "RUNNING", http.StatusBadRequest},
{"FINISHED", "FINISHED", http.StatusBadRequest},
{"FINISHED", "FAILED", http.StatusBadRequest},
{"FAILED", "WAITING", http.StatusBadRequest},
{"FAILED", "RUNNING", http.StatusBadRequest},
{"FAILED", "FINISHED", http.StatusBadRequest},
{"FAILED", "FAILED", http.StatusBadRequest},
}
for _, c := range cases {
testUpdateTransition(t, c.From, c.To, c.ExpectedStatus)
}
}

View file

@ -1,132 +0,0 @@
package jobqueue
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"github.com/google/uuid"
"github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/osbuild"
"github.com/osbuild/osbuild-composer/internal/target"
)
type Client struct {
client *http.Client
scheme string
hostname string
}
type Job struct {
ComposeID uuid.UUID
ImageBuildID int
Manifest *osbuild.Manifest
Targets []*target.Target
}
func NewClient(address string, conf *tls.Config) *Client {
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: conf,
},
}
var scheme string
if conf != nil {
scheme = "http"
} else {
scheme = "https"
}
return &Client{client, scheme, address}
}
func NewClientUnix(path string) *Client {
client := &http.Client{
Transport: &http.Transport{
DialContext: func(context context.Context, network, addr string) (net.Conn, error) {
return net.Dial("unix", path)
},
},
}
return &Client{client, "http", "localhost"}
}
func (c *Client) AddJob() (*Job, error) {
var b bytes.Buffer
err := json.NewEncoder(&b).Encode(addJobRequest{})
if err != nil {
panic(err)
}
response, err := c.client.Post(c.createURL("/job-queue/v1/jobs"), "application/json", &b)
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.StatusCode != http.StatusCreated {
var er errorResponse
_ = json.NewDecoder(response.Body).Decode(&er)
return nil, fmt.Errorf("couldn't create job, got %d: %s", response.StatusCode, er.Message)
}
var jr addJobResponse
err = json.NewDecoder(response.Body).Decode(&jr)
if err != nil {
return nil, err
}
return &Job{
jr.ComposeID,
jr.ImageBuildID,
jr.Manifest,
jr.Targets,
}, nil
}
func (c *Client) UpdateJob(job *Job, status common.ImageBuildState, result *common.ComposeResult) error {
var b bytes.Buffer
err := json.NewEncoder(&b).Encode(&updateJobRequest{status, result})
if err != nil {
panic(err)
}
urlPath := fmt.Sprintf("/job-queue/v1/jobs/%s/builds/%d", job.ComposeID.String(), job.ImageBuildID)
url := c.createURL(urlPath)
req, err := http.NewRequest("PATCH", url, &b)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
response, err := c.client.Do(req)
if err != nil {
return err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return errors.New("error setting job status")
}
return nil
}
func (c *Client) UploadImage(job *Job, reader io.Reader) error {
// content type doesn't really matter
url := c.createURL(fmt.Sprintf("/job-queue/v1/jobs/%s/builds/%d/image", job.ComposeID.String(), job.ImageBuildID))
_, err := c.client.Post(url, "application/octet-stream", reader)
return err
}
func (c *Client) createURL(path string) string {
return c.scheme + "://" + c.hostname + path
}

View file

@ -1,31 +0,0 @@
package jobqueue
import (
"github.com/google/uuid"
"github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/osbuild"
"github.com/osbuild/osbuild-composer/internal/target"
)
type errorResponse struct {
Message string `json:"message"`
}
type addJobRequest struct {
}
type addJobResponse struct {
ComposeID uuid.UUID `json:"compose_id"`
ImageBuildID int `json:"image_build_id"`
Manifest *osbuild.Manifest `json:"manifest"`
Targets []*target.Target `json:"targets"`
}
type updateJobRequest struct {
Status common.ImageBuildState `json:"status"`
Result *common.ComposeResult `json:"result"`
}
type updateJobResponse struct {
}