debian-forge-composer/internal/jobqueue/api.go
Tom Gundersen 0bdd3a5c89 job-queue: pass a well-formed job object to the worker
For now we will hardcode the org.osbuild.local target, so we might
as well fix this up front.

We do not yet support any target types, but for testing purposes we
claim to support 'tar', and we pass a noop tar pipeline to the worker.

This makes introspecting the job-queu api using curl a bit more
pleasant.

Signed-off-by: Tom Gundersen <teg@jklm.no>
2019-09-26 23:48:19 +02:00

138 lines
3.5 KiB
Go

package jobqueue
import (
"encoding/json"
"log"
"net"
"net/http"
"osbuild-composer/internal/job"
"osbuild-composer/internal/pipeline"
"osbuild-composer/internal/target"
"github.com/julienschmidt/httprouter"
)
type API struct {
jobStore *job.Store
pendingJobs <-chan job.Job
logger *log.Logger
router *httprouter.Router
}
func New(logger *log.Logger, jobs <-chan job.Job) *API {
api := &API{
jobStore: job.NewStore(),
logger: logger,
pendingJobs: jobs,
}
api.router = httprouter.New()
api.router.RedirectTrailingSlash = false
api.router.RedirectFixedPath = false
api.router.MethodNotAllowed = http.HandlerFunc(methodNotAllowedHandler)
api.router.NotFound = http.HandlerFunc(notFoundHandler)
api.router.POST("/job-queue/v1/jobs", api.addJobHandler)
api.router.PATCH("/job-queue/v1/jobs/:id", api.updateJobHandler)
return api
}
func (api *API) Serve(listener net.Listener) error {
server := http.Server{Handler: api}
err := server.Serve(listener)
if err != nil && err != http.ErrServerClosed {
return err
}
return nil
}
func (api *API) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
if api.logger != nil {
log.Println(request.Method, request.URL.Path)
}
writer.Header().Set("Content-Type", "application/json; charset=utf-8")
api.router.ServeHTTP(writer, request)
}
func methodNotAllowedHandler(writer http.ResponseWriter, request *http.Request) {
writer.WriteHeader(http.StatusMethodNotAllowed)
}
func notFoundHandler(writer http.ResponseWriter, request *http.Request) {
writer.WriteHeader(http.StatusNotFound)
}
func statusResponseOK(writer http.ResponseWriter) {
writer.WriteHeader(http.StatusOK)
}
func statusResponseError(writer http.ResponseWriter, code int, errors ...string) {
writer.WriteHeader(code)
}
func (api *API) addJobHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) {
type requestBody struct {
ID string `json:"id"`
}
type replyBody struct {
Pipeline pipeline.Pipeline `json:"pipeline"`
Targets []target.Target `json:"targets"`
}
contentType := request.Header["Content-Type"]
if len(contentType) != 1 || contentType[0] != "application/json" {
statusResponseError(writer, http.StatusUnsupportedMediaType)
return
}
var body requestBody
err := json.NewDecoder(request.Body).Decode(&body)
if err != nil {
statusResponseError(writer, http.StatusBadRequest, "invalid id: "+err.Error())
return
}
id := body.ID
var jobSlot job.Job
if !api.jobStore.AddJob(id, jobSlot) {
statusResponseError(writer, http.StatusBadRequest)
return
}
nextJob := <-api.pendingJobs
api.jobStore.UpdateJob(id, nextJob)
json.NewEncoder(writer).Encode(replyBody{nextJob.Pipeline, nextJob.Targets})
}
func (api *API) updateJobHandler(writer http.ResponseWriter, request *http.Request, params httprouter.Params) {
type requestBody struct {
Status string `json:"status"`
}
contentType := request.Header["Content-Type"]
if len(contentType) != 1 || contentType[0] != "application/json" {
statusResponseError(writer, http.StatusUnsupportedMediaType)
return
}
var body requestBody
err := json.NewDecoder(request.Body).Decode(&body)
if err != nil {
statusResponseError(writer, http.StatusBadRequest, "invalid status: "+err.Error())
} else if body.Status == "running" {
statusResponseOK(writer)
} else if body.Status == "finished" {
api.jobStore.DeleteJob(params.ByName("id"))
statusResponseOK(writer)
} else {
statusResponseError(writer, http.StatusBadRequest, "invalid status: "+body.Status)
}
}