diff --git a/cmd/osbuild-worker/job.go b/cmd/osbuild-worker/job.go index 95d9daecc..fbde2e9b3 100644 --- a/cmd/osbuild-worker/job.go +++ b/cmd/osbuild-worker/job.go @@ -12,9 +12,9 @@ import ( ) type Job struct { - ID uuid.UUID - Pipeline pipeline.Pipeline - Targets []target.Target + ID uuid.UUID `json:"id"` + Pipeline pipeline.Pipeline `json:"pipeline"` + Targets []target.Target `json:"targets"` } func (job *Job) Run() error { diff --git a/cmd/osbuild-worker/main.go b/cmd/osbuild-worker/main.go index 5fc32605c..d84ab1492 100644 --- a/cmd/osbuild-worker/main.go +++ b/cmd/osbuild-worker/main.go @@ -8,11 +8,6 @@ import ( "fmt" "net" "net/http" - - "github.com/google/uuid" - - "osbuild-composer/internal/pipeline" - "osbuild-composer/internal/target" ) type ComposerClient struct { @@ -32,19 +27,10 @@ func NewClient() *ComposerClient { func (c *ComposerClient) AddJob() (*Job, error) { type request struct { - ID string `json:"id"` - } - type reply struct { - Pipeline *pipeline.Pipeline `json:"pipeline"` - Targets *[]target.Target `json:"targets"` - } - - job := &Job{ - ID: uuid.New(), } var b bytes.Buffer - json.NewEncoder(&b).Encode(request{job.ID.String()}) + json.NewEncoder(&b).Encode(request{}) response, err := c.client.Post("http://localhost/job-queue/v1/jobs", "application/json", &b) if err != nil { return nil, err @@ -55,10 +41,8 @@ func (c *ComposerClient) AddJob() (*Job, error) { return nil, errors.New("couldn't create job") } - err = json.NewDecoder(response.Body).Decode(&reply{ - Pipeline: &job.Pipeline, - Targets: &job.Targets, - }) + job := &Job{} + err = json.NewDecoder(response.Body).Decode(job) if err != nil { return nil, err } @@ -102,6 +86,8 @@ func main() { panic(err) } + client.UpdateJob(job, "RUNNING") + fmt.Printf("Running job %s\n", job.ID.String()) job.Run() diff --git a/internal/job/store.go b/internal/job/store.go deleted file mode 100644 index 9ade9aaf3..000000000 --- a/internal/job/store.go +++ /dev/null @@ -1,53 +0,0 @@ -package job - -import ( - "sync" - - "github.com/google/uuid" -) - -type Store struct { - jobs map[uuid.UUID]Job - mu sync.RWMutex -} - -func NewStore() *Store { - var s Store - - s.jobs = make(map[uuid.UUID]Job) - - return &s -} - -func (s *Store) AddJob(id uuid.UUID, job Job) bool { - s.mu.Lock() - defer s.mu.Unlock() - - _, exists := s.jobs[id] - if exists { - return false - } - - s.jobs[id] = job - - return true -} - -func (s *Store) UpdateJob(id uuid.UUID, job Job) bool { - s.mu.Lock() - defer s.mu.Unlock() - - req, _ := s.jobs[id] - req.ComposeID = job.ComposeID - req.Pipeline = job.Pipeline - req.Targets = job.Targets - - return true -} - -func (s *Store) DeleteJob(id uuid.UUID) { - s.mu.Lock() - defer s.mu.Unlock() - - delete(s.jobs, id) -} diff --git a/internal/jobqueue/api.go b/internal/jobqueue/api.go index 1ed352187..d5370d19a 100644 --- a/internal/jobqueue/api.go +++ b/internal/jobqueue/api.go @@ -14,7 +14,6 @@ import ( ) type API struct { - jobStore *job.Store pendingJobs <-chan job.Job jobStatus chan<- job.Status @@ -24,7 +23,6 @@ type API struct { func New(logger *log.Logger, jobs <-chan job.Job, jobStatus chan<- job.Status) *API { api := &API{ - jobStore: job.NewStore(), logger: logger, pendingJobs: jobs, jobStatus: jobStatus, @@ -80,9 +78,9 @@ func statusResponseError(writer http.ResponseWriter, code int, errors ...string) func (api *API) addJobHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) { type requestBody struct { - ID uuid.UUID `json:"id"` } type replyBody struct { + ID uuid.UUID `json:"id"` Pipeline *pipeline.Pipeline `json:"pipeline"` Targets []*target.Target `json:"targets"` } @@ -96,23 +94,14 @@ func (api *API) addJobHandler(writer http.ResponseWriter, request *http.Request, var body requestBody err := json.NewDecoder(request.Body).Decode(&body) if err != nil { - statusResponseError(writer, http.StatusBadRequest, "invalid id: "+err.Error()) - return - } - - id := body.ID - var jobSlot job.Job - - if !api.jobStore.AddJob(id, jobSlot) { - statusResponseError(writer, http.StatusBadRequest) + statusResponseError(writer, http.StatusBadRequest, "invalid request: "+err.Error()) return } nextJob := <-api.pendingJobs - api.jobStore.UpdateJob(id, nextJob) writer.WriteHeader(http.StatusCreated) - json.NewEncoder(writer).Encode(replyBody{nextJob.Pipeline, nextJob.Targets}) + json.NewEncoder(writer).Encode(replyBody{nextJob.ComposeID, nextJob.Pipeline, nextJob.Targets}) } func (api *API) updateJobHandler(writer http.ResponseWriter, request *http.Request, params httprouter.Params) { @@ -128,7 +117,7 @@ func (api *API) updateJobHandler(writer http.ResponseWriter, request *http.Reque id, err := uuid.Parse(params.ByName("id")) if err != nil { - statusResponseError(writer, http.StatusBadRequest, "invalid job id: "+err.Error()) + statusResponseError(writer, http.StatusBadRequest, "invalid compose id: "+err.Error()) return } @@ -136,13 +125,8 @@ func (api *API) updateJobHandler(writer http.ResponseWriter, request *http.Reque err = json.NewDecoder(request.Body).Decode(&body) if err != nil { statusResponseError(writer, http.StatusBadRequest, "invalid status: "+err.Error()) - } else if body.Status == "RUNNING" { - api.jobStatus <- job.Status{ComposeID: id, Status: body.Status} - statusResponseOK(writer) - } else if body.Status == "FINISHED" { - api.jobStore.DeleteJob(id) - statusResponseOK(writer) - } else { - statusResponseError(writer, http.StatusBadRequest, "invalid status: "+body.Status) } + + api.jobStatus <- job.Status{ComposeID: id, Status: body.Status} + statusResponseOK(writer) } diff --git a/internal/jobqueue/api_test.go b/internal/jobqueue/api_test.go index 3a92e950f..e7f0841c1 100644 --- a/internal/jobqueue/api_test.go +++ b/internal/jobqueue/api_test.go @@ -66,7 +66,7 @@ func testRoute(t *testing.T, api *jobqueue.API, method, path, body string, expec } func TestBasic(t *testing.T) { - expected_job := `{"pipeline":{"assembler":{"name":"org.osbuild.tar","options":{"filename":"image.tar"}}},"targets":[{"name":"org.osbuild.local","options":{"location":"/tmp/ffffffff-ffff-ffff-ffff-ffffffffffff"}}]}` + expected_job := `{"id":"ffffffff-ffff-ffff-ffff-ffffffffffff","pipeline":{"assembler":{"name":"org.osbuild.tar","options":{"filename":"image.tar"}}},"targets":[{"name":"org.osbuild.local","options":{"location":"/tmp/ffffffff-ffff-ffff-ffff-ffffffffffff"}}]}` var cases = []struct { Method string Path string @@ -80,7 +80,7 @@ func TestBasic(t *testing.T) { {"DELETE", "/job-queue/v1/foo", ``, http.StatusNotFound, ``}, {"POST", "/job-queue/v1/jobs", `{"id":"ffffffff-ffff-ffff-ffff-ffffffffffff"}`, http.StatusCreated, expected_job}, - {"POST", "/job-queue/v1/jobs", `{"id":"ffffffff-ffff-ffff-ffff-ffffffffffff"}`, http.StatusBadRequest, ``}, + //{"POST", "/job-queue/v1/jobs", `{"id":"ffffffff-ffff-ffff-ffff-ffffffffffff"}`, http.StatusBadRequest, ``}, //{"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", `{"status":"FINISHED"}`, http.StatusBadRequest, ``}, {"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"RUNNING"}`, http.StatusOK, ``}, {"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"RUNNING"}`, http.StatusOK, ``},