From 0880014edf19aebd93e3f64d71bee42ed5aa9e4b Mon Sep 17 00:00:00 2001 From: Tom Gundersen Date: Tue, 1 Oct 2019 17:06:37 +0200 Subject: [PATCH] jobqueue: cleanup API a bit and unify the two job stores Let the store in weldr be the only one that keeps state, and push updates directly there. This fixes a bug where there was an ID mismatch. Change the API to not let the caller pick the UUID, but provide it in the response. Use the same UUID as is used to identify composes, this makes it simpler to trace what is going on. Signed-off-by: Tom Gundersen --- cmd/osbuild-worker/job.go | 6 ++-- cmd/osbuild-worker/main.go | 24 ++++------------ internal/job/store.go | 53 ----------------------------------- internal/jobqueue/api.go | 30 +++++--------------- internal/jobqueue/api_test.go | 4 +-- 5 files changed, 17 insertions(+), 100 deletions(-) delete mode 100644 internal/job/store.go diff --git a/cmd/osbuild-worker/job.go b/cmd/osbuild-worker/job.go index 95d9daecc..fbde2e9b3 100644 --- a/cmd/osbuild-worker/job.go +++ b/cmd/osbuild-worker/job.go @@ -12,9 +12,9 @@ import ( ) type Job struct { - ID uuid.UUID - Pipeline pipeline.Pipeline - Targets []target.Target + ID uuid.UUID `json:"id"` + Pipeline pipeline.Pipeline `json:"pipeline"` + Targets []target.Target `json:"targets"` } func (job *Job) Run() error { diff --git a/cmd/osbuild-worker/main.go b/cmd/osbuild-worker/main.go index 5fc32605c..d84ab1492 100644 --- a/cmd/osbuild-worker/main.go +++ b/cmd/osbuild-worker/main.go @@ -8,11 +8,6 @@ import ( "fmt" "net" "net/http" - - "github.com/google/uuid" - - "osbuild-composer/internal/pipeline" - "osbuild-composer/internal/target" ) type ComposerClient struct { @@ -32,19 +27,10 @@ func NewClient() *ComposerClient { func (c *ComposerClient) AddJob() (*Job, error) { type request struct { - ID string `json:"id"` - } - type reply struct { - Pipeline *pipeline.Pipeline `json:"pipeline"` - Targets *[]target.Target `json:"targets"` - } - - job := &Job{ - ID: uuid.New(), } var b bytes.Buffer - json.NewEncoder(&b).Encode(request{job.ID.String()}) + json.NewEncoder(&b).Encode(request{}) response, err := c.client.Post("http://localhost/job-queue/v1/jobs", "application/json", &b) if err != nil { return nil, err @@ -55,10 +41,8 @@ func (c *ComposerClient) AddJob() (*Job, error) { return nil, errors.New("couldn't create job") } - err = json.NewDecoder(response.Body).Decode(&reply{ - Pipeline: &job.Pipeline, - Targets: &job.Targets, - }) + job := &Job{} + err = json.NewDecoder(response.Body).Decode(job) if err != nil { return nil, err } @@ -102,6 +86,8 @@ func main() { panic(err) } + client.UpdateJob(job, "RUNNING") + fmt.Printf("Running job %s\n", job.ID.String()) job.Run() diff --git a/internal/job/store.go b/internal/job/store.go deleted file mode 100644 index 9ade9aaf3..000000000 --- a/internal/job/store.go +++ /dev/null @@ -1,53 +0,0 @@ -package job - -import ( - "sync" - - "github.com/google/uuid" -) - -type Store struct { - jobs map[uuid.UUID]Job - mu sync.RWMutex -} - -func NewStore() *Store { - var s Store - - s.jobs = make(map[uuid.UUID]Job) - - return &s -} - -func (s *Store) AddJob(id uuid.UUID, job Job) bool { - s.mu.Lock() - defer s.mu.Unlock() - - _, exists := s.jobs[id] - if exists { - return false - } - - s.jobs[id] = job - - return true -} - -func (s *Store) UpdateJob(id uuid.UUID, job Job) bool { - s.mu.Lock() - defer s.mu.Unlock() - - req, _ := s.jobs[id] - req.ComposeID = job.ComposeID - req.Pipeline = job.Pipeline - req.Targets = job.Targets - - return true -} - -func (s *Store) DeleteJob(id uuid.UUID) { - s.mu.Lock() - defer s.mu.Unlock() - - delete(s.jobs, id) -} diff --git a/internal/jobqueue/api.go b/internal/jobqueue/api.go index 1ed352187..d5370d19a 100644 --- a/internal/jobqueue/api.go +++ b/internal/jobqueue/api.go @@ -14,7 +14,6 @@ import ( ) type API struct { - jobStore *job.Store pendingJobs <-chan job.Job jobStatus chan<- job.Status @@ -24,7 +23,6 @@ type API struct { func New(logger *log.Logger, jobs <-chan job.Job, jobStatus chan<- job.Status) *API { api := &API{ - jobStore: job.NewStore(), logger: logger, pendingJobs: jobs, jobStatus: jobStatus, @@ -80,9 +78,9 @@ func statusResponseError(writer http.ResponseWriter, code int, errors ...string) func (api *API) addJobHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) { type requestBody struct { - ID uuid.UUID `json:"id"` } type replyBody struct { + ID uuid.UUID `json:"id"` Pipeline *pipeline.Pipeline `json:"pipeline"` Targets []*target.Target `json:"targets"` } @@ -96,23 +94,14 @@ func (api *API) addJobHandler(writer http.ResponseWriter, request *http.Request, var body requestBody err := json.NewDecoder(request.Body).Decode(&body) if err != nil { - statusResponseError(writer, http.StatusBadRequest, "invalid id: "+err.Error()) - return - } - - id := body.ID - var jobSlot job.Job - - if !api.jobStore.AddJob(id, jobSlot) { - statusResponseError(writer, http.StatusBadRequest) + statusResponseError(writer, http.StatusBadRequest, "invalid request: "+err.Error()) return } nextJob := <-api.pendingJobs - api.jobStore.UpdateJob(id, nextJob) writer.WriteHeader(http.StatusCreated) - json.NewEncoder(writer).Encode(replyBody{nextJob.Pipeline, nextJob.Targets}) + json.NewEncoder(writer).Encode(replyBody{nextJob.ComposeID, nextJob.Pipeline, nextJob.Targets}) } func (api *API) updateJobHandler(writer http.ResponseWriter, request *http.Request, params httprouter.Params) { @@ -128,7 +117,7 @@ func (api *API) updateJobHandler(writer http.ResponseWriter, request *http.Reque id, err := uuid.Parse(params.ByName("id")) if err != nil { - statusResponseError(writer, http.StatusBadRequest, "invalid job id: "+err.Error()) + statusResponseError(writer, http.StatusBadRequest, "invalid compose id: "+err.Error()) return } @@ -136,13 +125,8 @@ func (api *API) updateJobHandler(writer http.ResponseWriter, request *http.Reque err = json.NewDecoder(request.Body).Decode(&body) if err != nil { statusResponseError(writer, http.StatusBadRequest, "invalid status: "+err.Error()) - } else if body.Status == "RUNNING" { - api.jobStatus <- job.Status{ComposeID: id, Status: body.Status} - statusResponseOK(writer) - } else if body.Status == "FINISHED" { - api.jobStore.DeleteJob(id) - statusResponseOK(writer) - } else { - statusResponseError(writer, http.StatusBadRequest, "invalid status: "+body.Status) } + + api.jobStatus <- job.Status{ComposeID: id, Status: body.Status} + statusResponseOK(writer) } diff --git a/internal/jobqueue/api_test.go b/internal/jobqueue/api_test.go index 3a92e950f..e7f0841c1 100644 --- a/internal/jobqueue/api_test.go +++ b/internal/jobqueue/api_test.go @@ -66,7 +66,7 @@ func testRoute(t *testing.T, api *jobqueue.API, method, path, body string, expec } func TestBasic(t *testing.T) { - expected_job := `{"pipeline":{"assembler":{"name":"org.osbuild.tar","options":{"filename":"image.tar"}}},"targets":[{"name":"org.osbuild.local","options":{"location":"/tmp/ffffffff-ffff-ffff-ffff-ffffffffffff"}}]}` + expected_job := `{"id":"ffffffff-ffff-ffff-ffff-ffffffffffff","pipeline":{"assembler":{"name":"org.osbuild.tar","options":{"filename":"image.tar"}}},"targets":[{"name":"org.osbuild.local","options":{"location":"/tmp/ffffffff-ffff-ffff-ffff-ffffffffffff"}}]}` var cases = []struct { Method string Path string @@ -80,7 +80,7 @@ func TestBasic(t *testing.T) { {"DELETE", "/job-queue/v1/foo", ``, http.StatusNotFound, ``}, {"POST", "/job-queue/v1/jobs", `{"id":"ffffffff-ffff-ffff-ffff-ffffffffffff"}`, http.StatusCreated, expected_job}, - {"POST", "/job-queue/v1/jobs", `{"id":"ffffffff-ffff-ffff-ffff-ffffffffffff"}`, http.StatusBadRequest, ``}, + //{"POST", "/job-queue/v1/jobs", `{"id":"ffffffff-ffff-ffff-ffff-ffffffffffff"}`, http.StatusBadRequest, ``}, //{"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", `{"status":"FINISHED"}`, http.StatusBadRequest, ``}, {"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"RUNNING"}`, http.StatusOK, ``}, {"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"RUNNING"}`, http.StatusOK, ``},