diff --git a/cmd/osbuild-composer/main.go b/cmd/osbuild-composer/main.go index fd0c8d875..0c0249db1 100644 --- a/cmd/osbuild-composer/main.go +++ b/cmd/osbuild-composer/main.go @@ -8,7 +8,8 @@ import ( "os" "path/filepath" - "osbuild-composer/internal/queue" + "osbuild-composer/internal/job" + "osbuild-composer/internal/jobqueue" "osbuild-composer/internal/rpmmd" "osbuild-composer/internal/weldr" ) @@ -30,7 +31,22 @@ func main() { panic(err) } - listener, err := net.Listen("unix", "/run/weldr/api.socket") + weldrListener, err := net.Listen("unix", "/run/weldr/api.socket") + if err != nil { + panic(err) + } + + err = os.Remove("/run/osbuild-composer/job.socket") + if err != nil && !os.IsNotExist(err) { + panic(err) + } + + err = os.Mkdir("/run/osbuild-composer", 0755) + if err != nil && !os.IsExist(err) { + panic(err) + } + + jobListener, err := net.Listen("unix", "/run/osbuild-composer/job.socket") if err != nil { panic(err) } @@ -62,8 +78,9 @@ func main() { } stateChannel := make(chan []byte, 10) - buildChannel := make(chan queue.Build, 200) - api := weldr.New(repo, packages, logger, state, stateChannel, buildChannel) + jobChannel := make(chan job.Job, 200) + jobAPI := jobqueue.New(logger, jobChannel) + weldrAPI := weldr.New(repo, packages, logger, state, stateChannel, jobChannel) go func() { for { err := writeFileAtomically(StateFile, <-stateChannel, 0755) @@ -73,7 +90,8 @@ func main() { } }() - api.Serve(listener) + go jobAPI.Serve(jobListener) + weldrAPI.Serve(weldrListener) } func writeFileAtomically(filename string, data []byte, mode os.FileMode) error { diff --git a/internal/job/job.go b/internal/job/job.go new file mode 100644 index 000000000..8fd68256d --- /dev/null +++ b/internal/job/job.go @@ -0,0 +1,12 @@ +package job + +import ( + "osbuild-composer/internal/pipeline" + "osbuild-composer/internal/target" +) + +type Job struct { + ComposeID string + Pipeline pipeline.Pipeline + Target target.Target +} diff --git a/internal/job/store.go b/internal/job/store.go new file mode 100644 index 000000000..0f21f794f --- /dev/null +++ b/internal/job/store.go @@ -0,0 +1,51 @@ +package job + +import ( + "sync" +) + +type Store struct { + jobs map[string]Job + mu sync.RWMutex +} + +func NewStore() *Store { + var s Store + + s.jobs = make(map[string]Job) + + return &s +} + +func (s *Store) AddJob(id string, job Job) bool { + s.mu.Lock() + defer s.mu.Unlock() + + _, exists := s.jobs[id] + if exists { + return false + } + + s.jobs[id] = job + + return true +} + +func (s *Store) UpdateJob(id string, job Job) bool { + s.mu.Lock() + defer s.mu.Unlock() + + req, _ := s.jobs[id] + req.ComposeID = job.ComposeID + req.Pipeline = job.Pipeline + req.Target = job.Target + + return true +} + +func (s *Store) DeleteJob(id string) { + s.mu.Lock() + defer s.mu.Unlock() + + delete(s.jobs, id) +} diff --git a/internal/jobqueue/api.go b/internal/jobqueue/api.go new file mode 100644 index 000000000..8e6c66a35 --- /dev/null +++ b/internal/jobqueue/api.go @@ -0,0 +1,141 @@ +package jobqueue + +import ( + "encoding/json" + "log" + "net" + "net/http" + "osbuild-composer/internal/job" + "osbuild-composer/internal/pipeline" + "osbuild-composer/internal/target" + + "github.com/julienschmidt/httprouter" +) + +type API struct { + jobStore *job.Store + pendingJobs <-chan job.Job + + logger *log.Logger + router *httprouter.Router +} + +func New(logger *log.Logger, jobs <-chan job.Job) *API { + api := &API{ + jobStore: job.NewStore(), + logger: logger, + pendingJobs: jobs, + } + + api.router = httprouter.New() + api.router.RedirectTrailingSlash = false + api.router.RedirectFixedPath = false + api.router.MethodNotAllowed = http.HandlerFunc(methodNotAllowedHandler) + api.router.NotFound = http.HandlerFunc(notFoundHandler) + + api.router.POST("/job-queue/v1/jobs", api.addJobHandler) + api.router.PATCH("/job-queue/v1/jobs/:job-id", api.updateJobHandler) + + return api +} + +func (api *API) Serve(listener net.Listener) error { + server := http.Server{Handler: api} + + err := server.Serve(listener) + if err != nil && err != http.ErrServerClosed { + return err + } + + return nil +} + +func (api *API) ServeHTTP(writer http.ResponseWriter, request *http.Request) { + if api.logger != nil { + log.Println(request.Method, request.URL.Path) + } + + writer.Header().Set("Content-Type", "application/json; charset=utf-8") + api.router.ServeHTTP(writer, request) +} + +func methodNotAllowedHandler(writer http.ResponseWriter, request *http.Request) { + writer.WriteHeader(http.StatusMethodNotAllowed) +} + +func notFoundHandler(writer http.ResponseWriter, request *http.Request) { + writer.WriteHeader(http.StatusNotFound) +} + +func statusResponseOK(writer http.ResponseWriter) { + writer.WriteHeader(http.StatusOK) +} + +func statusResponseError(writer http.ResponseWriter, code int, errors ...string) { + writer.WriteHeader(code) +} + +func (api *API) addJobHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) { + type requestBody struct { + JobID string `json:"job-id"` + } + type replyBody struct { + Pipeline pipeline.Pipeline `json:"pipeline"` + Target target.Target `json:"target"` + } + + contentType := request.Header["Content-Type"] + if len(contentType) != 1 || contentType[0] != "application/json" { + statusResponseError(writer, http.StatusUnsupportedMediaType) + return + } + + var body requestBody + err := json.NewDecoder(request.Body).Decode(&body) + if err != nil { + statusResponseError(writer, http.StatusBadRequest, "invalid job-id: "+err.Error()) + return + } + + id := body.JobID + var req job.Job + + if !api.jobStore.AddJob(id, req) { + statusResponseError(writer, http.StatusBadRequest) + return + } + + req = <-api.pendingJobs + api.jobStore.UpdateJob(id, req) + + json.NewEncoder(writer).Encode(replyBody{req.Pipeline, req.Target}) + +} + +func (api *API) updateJobHandler(writer http.ResponseWriter, request *http.Request, params httprouter.Params) { + type requestBody struct { + Status string `json:"status"` + } + + contentType := request.Header["Content-Type"] + if len(contentType) != 1 || contentType[0] != "application/json" { + statusResponseError(writer, http.StatusUnsupportedMediaType) + return + } + + var body requestBody + err := json.NewDecoder(request.Body).Decode(&body) + if err != nil { + statusResponseError(writer, http.StatusBadRequest, "invalid status: "+err.Error()) + return + } else if body.Status == "running" { + statusResponseOK(writer) + return + } else if body.Status != "finished" { + statusResponseError(writer, http.StatusBadRequest, "invalid status: "+body.Status) + return + } + + api.jobStore.DeleteJob(params.ByName("job-id")) + statusResponseOK(writer) +} diff --git a/internal/jobqueue/api_test.go b/internal/jobqueue/api_test.go new file mode 100644 index 000000000..8160fc0f4 --- /dev/null +++ b/internal/jobqueue/api_test.go @@ -0,0 +1,99 @@ +package jobqueue_test + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "net/http" + "net/http/httptest" + "reflect" + "strings" + "testing" + + "osbuild-composer/internal/job" + "osbuild-composer/internal/jobqueue" + "osbuild-composer/internal/pipeline" + "osbuild-composer/internal/target" +) + +func testRoute(t *testing.T, api *jobqueue.API, method, path, body string, expectedStatus int, expectedJSON string) { + req := httptest.NewRequest(method, path, bytes.NewReader([]byte(body))) + req.Header.Set("Content-Type", "application/json") + resp := httptest.NewRecorder() + api.ServeHTTP(resp, req) + + if resp.Code != expectedStatus { + t.Errorf("%s: expected status %v, but got %v", path, expectedStatus, resp.Code) + return + } + + replyJSON, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Errorf("%s: could not read reponse body: %v", path, err) + return + } + + if expectedJSON == "" { + if len(replyJSON) != 0 { + t.Errorf("%s: expected no response body, but got:\n%s", path, replyJSON) + } + return + } + + var reply, expected interface{} + err = json.Unmarshal(replyJSON, &reply) + if err != nil { + t.Errorf("%s: %v\n%s", path, err, string(replyJSON)) + return + } + + if expectedJSON == "*" { + return + } + + err = json.Unmarshal([]byte(expectedJSON), &expected) + if err != nil { + t.Errorf("%s: expected JSON is invalid: %v", path, err) + return + } + + if !reflect.DeepEqual(reply, expected) { + t.Errorf("%s: reply != expected:\n reply: %s\nexpected: %s", path, strings.TrimSpace(string(replyJSON)), expectedJSON) + return + } +} + +func TestBasic(t *testing.T) { + var cases = []struct { + Method string + Path string + Body string + ExpectedStatus int + ExpectedJSON string + }{ + {"POST", "/job-queue/v1/foo", ``, http.StatusNotFound, ``}, + {"GET", "/job-queue/v1/foo", ``, http.StatusNotFound, ``}, + {"PATH", "/job-queue/v1/foo", ``, http.StatusNotFound, ``}, + {"DELETE", "/job-queue/v1/foo", ``, http.StatusNotFound, ``}, + + {"POST", "/job-queue/v1/jobs", `{"job-id":"ffffffff-ffff–ffff-ffff-ffffffffffff"}`, http.StatusOK, `{"pipeline":"pipeline","target":"target"}`}, + {"POST", "/job-queue/v1/jobs", `{"job-id":"ffffffff-ffff–ffff-ffff-ffffffffffff"}`, http.StatusBadRequest, ``}, + //{"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", `{"status":"finished"}`, http.StatusBadRequest, ``}, + {"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"running"}`, http.StatusOK, ``}, + {"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"running"}`, http.StatusOK, ``}, + {"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"finished"}`, http.StatusOK, ``}, + //{"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"running"}`, http.StatusNotAllowed, ``}, + //{"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"finished"}`, http.StatusNotAllowed, ``}, + } + + jobChannel := make(chan job.Job, 100) + api := jobqueue.New(nil, jobChannel) + for _, c := range cases { + jobChannel <- job.Job{ + ComposeID: "ID", + Pipeline: pipeline.Pipeline("pipeline"), + Target: target.Target("target"), + } + testRoute(t, api, c.Method, c.Path, c.Body, c.ExpectedStatus, c.ExpectedJSON) + } +} diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go new file mode 100644 index 000000000..17ff93ec7 --- /dev/null +++ b/internal/pipeline/pipeline.go @@ -0,0 +1,3 @@ +package pipeline + +type Pipeline string diff --git a/internal/queue/queue.go b/internal/queue/queue.go deleted file mode 100644 index 0d0a5c207..000000000 --- a/internal/queue/queue.go +++ /dev/null @@ -1,62 +0,0 @@ -package queue - -import "sync" - -// Build is a request waiting for a worker -type Build struct { - Pipeline string `json:"pipeline"` - Manifest string `json:"manifest"` -} - -// Manifest contains additional metadata attached do a pipeline that are necessary for workers -type Manifest struct { - destination string -} - -// Job is an image build already in progress -type Job struct { - UUID string `json:"uuid"` - Build Build `json:"build"` -} - -// JobQueue contains already running jobs waiting for -type JobQueue struct { - sync.Mutex - incomingBuilds chan Build // Channel of incoming builds form Weldr API, we never want to block on this - waitingBuilds []Build // Unbounded FIFO queue of waiting builds - runningJobs map[string]Job // Already running jobs, key is UUID -} - -// NewJobQueue creates object of type JobQueue -func NewJobQueue(timeout int, builds chan Build) *JobQueue { - jobs := &JobQueue{ - incomingBuilds: builds, - waitingBuilds: make([]Build, 0), - runningJobs: make(map[string]Job), - } - go func() { - for { - // This call will block, do not put it inside the locked zone - newBuild := <-jobs.incomingBuilds - // Locking the whole job queue => as short as possible - jobs.Lock() - jobs.waitingBuilds = append(jobs.waitingBuilds, newBuild) - jobs.Unlock() - } - }() - return jobs -} - -// StartNewJob starts a new job -func (j *JobQueue) StartNewJob(id string, worker string) Job { - j.Lock() - newBuild := j.waitingBuilds[0] // Take the first element - j.waitingBuilds = j.waitingBuilds[1:] // Discart 1st element - j.Unlock() - job := Job{ - UUID: id, - Build: newBuild, - } - j.runningJobs[id] = job - return job -} diff --git a/internal/target/target.go b/internal/target/target.go new file mode 100644 index 000000000..1481a1fdf --- /dev/null +++ b/internal/target/target.go @@ -0,0 +1,3 @@ +package target + +type Target string diff --git a/internal/weldr/api.go b/internal/weldr/api.go index c26e9ad07..66772f28e 100644 --- a/internal/weldr/api.go +++ b/internal/weldr/api.go @@ -10,13 +10,13 @@ import ( "github.com/julienschmidt/httprouter" - "osbuild-composer/internal/queue" + "osbuild-composer/internal/job" "osbuild-composer/internal/rpmmd" ) type API struct { - store *store - pendingBuilds chan queue.Build + store *store + pendingJobs chan<- job.Job repo rpmmd.RepoConfig packages rpmmd.PackageList @@ -25,15 +25,15 @@ type API struct { router *httprouter.Router } -func New(repo rpmmd.RepoConfig, packages rpmmd.PackageList, logger *log.Logger, initialState []byte, stateChannel chan<- []byte, builds chan queue.Build) *API { +func New(repo rpmmd.RepoConfig, packages rpmmd.PackageList, logger *log.Logger, initialState []byte, stateChannel chan<- []byte, jobs chan<- job.Job) *API { // This needs to be shared with the worker API so that they can communicate with each other // builds := make(chan queue.Build, 200) api := &API{ - store: newStore(initialState, stateChannel), - pendingBuilds: builds, - repo: repo, - packages: packages, - logger: logger, + store: newStore(initialState, stateChannel), + pendingJobs: jobs, + repo: repo, + packages: packages, + logger: logger, } // sample blueprint on first run @@ -131,7 +131,7 @@ func statusResponseError(writer http.ResponseWriter, code int, errors ...string) func (api *API) statusHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) { type reply struct { - Api uint `json:"api"` + API uint `json:"api"` DBSupported bool `json:"db_supported"` DBVersion string `json:"db_version"` SchemaVersion string `json:"schema_version"` @@ -141,7 +141,7 @@ func (api *API) statusHandler(writer http.ResponseWriter, request *http.Request, } json.NewEncoder(writer).Encode(reply{ - Api: 1, + API: 1, DBSupported: true, DBVersion: "0", SchemaVersion: "0", @@ -165,7 +165,7 @@ func (api *API) sourceInfoHandler(writer http.ResponseWriter, request *http.Requ // weldr uses a slightly different format than dnf to store repository // configuration type sourceConfig struct { - Id string `json:"id"` + ID string `json:"id"` Name string `json:"name"` Type string `json:"type"` URL string `json:"url"` @@ -185,7 +185,7 @@ func (api *API) sourceInfoHandler(writer http.ResponseWriter, request *http.Requ } cfg := sourceConfig{ - Id: api.repo.Id, + ID: api.repo.Id, Name: api.repo.Name, CheckGPG: true, CheckSSL: true, @@ -204,7 +204,7 @@ func (api *API) sourceInfoHandler(writer http.ResponseWriter, request *http.Requ } json.NewEncoder(writer).Encode(reply{ - Sources: map[string]sourceConfig{cfg.Id: cfg}, + Sources: map[string]sourceConfig{cfg.ID: cfg}, }) } @@ -270,7 +270,7 @@ func (api *API) modulesListHandler(writer http.ResponseWriter, request *http.Req for _, pkg := range api.packages { for _, name := range names { if strings.Contains(pkg.Name, name) { - total += 1 + total++ if total > offset && total < end { modules = append(modules, modulesListModule{pkg.Name, "rpm"}) } @@ -593,7 +593,7 @@ func (api *API) blueprintDeleteWorkspaceHandler(writer http.ResponseWriter, requ // Schedule new compose by first translating the appropriate blueprint into a pipeline and then // pushing it into the channel for waiting builds. -func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) { +func (api *API) composeHandler(writer http.ResponseWriter, httpRequest *http.Request, _ httprouter.Params) { // https://weldr.io/lorax/pylorax.api.html#pylorax.api.v0.v0_compose_start type ComposeRequest struct { BlueprintName string `json:"blueprint_name"` @@ -601,14 +601,14 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request Branch string `json:"branch"` } - contentType := request.Header["Content-Type"] + contentType := httpRequest.Header["Content-Type"] if len(contentType) != 1 || contentType[0] != "application/json" { statusResponseError(writer, http.StatusUnsupportedMediaType, "blueprint must be json") return } var cr ComposeRequest - err := json.NewDecoder(request.Body).Decode(&cr) + err := json.NewDecoder(httpRequest.Body).Decode(&cr) if err != nil { statusResponseError(writer, http.StatusBadRequest, "invalid request format: "+err.Error()) return @@ -619,9 +619,10 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request found := api.store.getBlueprint(cr.BlueprintName, &bp, &changed) // TODO: what to do with changed? if found { - api.pendingBuilds <- queue.Build{ - Pipeline: bp.translateToPipeline(cr.ComposeType), - Manifest: "{\"output-path\": \"/var/cache/osbuild\"}", + api.pendingJobs <- job.Job{ + ComposeID: "TODO", + Pipeline: bp.translateToPipeline(cr.ComposeType), + Target: `{"output-path":"/var/cache/osbuild-composer"}`, } } else { statusResponseError(writer, http.StatusBadRequest, "blueprint does not exist") diff --git a/internal/weldr/api_test.go b/internal/weldr/api_test.go index eca8a7ceb..3f3eac30a 100644 --- a/internal/weldr/api_test.go +++ b/internal/weldr/api_test.go @@ -10,8 +10,10 @@ import ( "strings" "testing" - "osbuild-composer/internal/queue" + "osbuild-composer/internal/job" + "osbuild-composer/internal/pipeline" "osbuild-composer/internal/rpmmd" + "osbuild-composer/internal/target" "osbuild-composer/internal/weldr" ) @@ -141,8 +143,8 @@ func TestBlueprints(t *testing.T) { } func TestCompose(t *testing.T) { - buildChannel := make(chan queue.Build, 200) - api := weldr.New(repo, packages, nil, nil, nil, buildChannel) + jobChannel := make(chan job.Job, 200) + api := weldr.New(repo, packages, nil, nil, nil, jobChannel) testRoute(t, api, "POST", "/api/v0/blueprints/new", `{"name":"test","description":"Test","packages":[{"name":"httpd","version":"2.4.*"}],"version":"0"}`, @@ -154,13 +156,13 @@ func TestCompose(t *testing.T) { testRoute(t, api, "POST", "/api/v0/compose", `{"blueprint_name": "test","compose_type": "tar","branch": "master"}`, http.StatusOK, `{"status":true}`) - build := <-buildChannel - expected_pipeline := `{"pipeline": "string"}` - expected_manifest := `{"output-path": "/var/cache/osbuild"}` - if expected_manifest != build.Manifest { - t.Errorf("Expected this manifest: %s; got this: %s", expected_manifest, build.Manifest) + job := <-jobChannel + expected_pipeline := pipeline.Pipeline(`{"pipeline":"string"}`) + expected_target := target.Target(`{"output-path":"/var/cache/osbuild-composer"}`) + if expected_target != job.Target { + t.Errorf("Expected this manifest: %s; got this: %s", expected_target, job.Target) } - if expected_pipeline != build.Pipeline { - t.Errorf("Expected this manifest: %s; got this: %s", expected_pipeline, build.Pipeline) + if expected_pipeline != job.Pipeline { + t.Errorf("Expected this manifest: %s; got this: %s", expected_pipeline, job.Pipeline) } } diff --git a/internal/weldr/store.go b/internal/weldr/store.go index 31fba5055..ba08a2f64 100644 --- a/internal/weldr/store.go +++ b/internal/weldr/store.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "log" + "osbuild-composer/internal/pipeline" "sort" "sync" ) @@ -156,6 +157,6 @@ func (s *store) deleteBlueprintFromWorkspace(name string) { }) } -func (b *blueprint) translateToPipeline(outputFormat string) string { - return fmt.Sprintf("{\"pipeline\": \"%s\"}", "string") +func (b *blueprint) translateToPipeline(outputFormat string) pipeline.Pipeline { + return pipeline.Pipeline(fmt.Sprintf("{\"pipeline\":\"%s\"}", "string")) } diff --git a/internal/weldr/util.go b/internal/weldr/util.go index b8ce4a2d6..2637235ed 100644 --- a/internal/weldr/util.go +++ b/internal/weldr/util.go @@ -30,7 +30,6 @@ func parseOffsetAndLimit(query url.Values) (uint, uint, error) { func min(a, b uint) uint { if a < b { return a - } else { - return b } + return b }