jobqueue: cleanup API a bit and unify the two job stores

Let the store in weldr be the only one that keeps state, and push
updates directly there. This fixes a bug where there was an ID mismatch.

Change the API to not let the caller pick the UUID, but provide it
in the response. Use the same UUID as is used to identify composes,
this makes it simpler to trace what is going on.

Signed-off-by: Tom Gundersen <teg@jklm.no>
This commit is contained in:
Tom Gundersen 2019-10-01 17:06:37 +02:00 committed by Lars Karlitski
parent 7f82ef4043
commit 0880014edf
5 changed files with 17 additions and 100 deletions

View file

@ -12,9 +12,9 @@ import (
)
type Job struct {
ID uuid.UUID
Pipeline pipeline.Pipeline
Targets []target.Target
ID uuid.UUID `json:"id"`
Pipeline pipeline.Pipeline `json:"pipeline"`
Targets []target.Target `json:"targets"`
}
func (job *Job) Run() error {

View file

@ -8,11 +8,6 @@ import (
"fmt"
"net"
"net/http"
"github.com/google/uuid"
"osbuild-composer/internal/pipeline"
"osbuild-composer/internal/target"
)
type ComposerClient struct {
@ -32,19 +27,10 @@ func NewClient() *ComposerClient {
func (c *ComposerClient) AddJob() (*Job, error) {
type request struct {
ID string `json:"id"`
}
type reply struct {
Pipeline *pipeline.Pipeline `json:"pipeline"`
Targets *[]target.Target `json:"targets"`
}
job := &Job{
ID: uuid.New(),
}
var b bytes.Buffer
json.NewEncoder(&b).Encode(request{job.ID.String()})
json.NewEncoder(&b).Encode(request{})
response, err := c.client.Post("http://localhost/job-queue/v1/jobs", "application/json", &b)
if err != nil {
return nil, err
@ -55,10 +41,8 @@ func (c *ComposerClient) AddJob() (*Job, error) {
return nil, errors.New("couldn't create job")
}
err = json.NewDecoder(response.Body).Decode(&reply{
Pipeline: &job.Pipeline,
Targets: &job.Targets,
})
job := &Job{}
err = json.NewDecoder(response.Body).Decode(job)
if err != nil {
return nil, err
}
@ -102,6 +86,8 @@ func main() {
panic(err)
}
client.UpdateJob(job, "RUNNING")
fmt.Printf("Running job %s\n", job.ID.String())
job.Run()

View file

@ -1,53 +0,0 @@
package job
import (
"sync"
"github.com/google/uuid"
)
type Store struct {
jobs map[uuid.UUID]Job
mu sync.RWMutex
}
func NewStore() *Store {
var s Store
s.jobs = make(map[uuid.UUID]Job)
return &s
}
func (s *Store) AddJob(id uuid.UUID, job Job) bool {
s.mu.Lock()
defer s.mu.Unlock()
_, exists := s.jobs[id]
if exists {
return false
}
s.jobs[id] = job
return true
}
func (s *Store) UpdateJob(id uuid.UUID, job Job) bool {
s.mu.Lock()
defer s.mu.Unlock()
req, _ := s.jobs[id]
req.ComposeID = job.ComposeID
req.Pipeline = job.Pipeline
req.Targets = job.Targets
return true
}
func (s *Store) DeleteJob(id uuid.UUID) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.jobs, id)
}

View file

@ -14,7 +14,6 @@ import (
)
type API struct {
jobStore *job.Store
pendingJobs <-chan job.Job
jobStatus chan<- job.Status
@ -24,7 +23,6 @@ type API struct {
func New(logger *log.Logger, jobs <-chan job.Job, jobStatus chan<- job.Status) *API {
api := &API{
jobStore: job.NewStore(),
logger: logger,
pendingJobs: jobs,
jobStatus: jobStatus,
@ -80,9 +78,9 @@ func statusResponseError(writer http.ResponseWriter, code int, errors ...string)
func (api *API) addJobHandler(writer http.ResponseWriter, request *http.Request, _ httprouter.Params) {
type requestBody struct {
ID uuid.UUID `json:"id"`
}
type replyBody struct {
ID uuid.UUID `json:"id"`
Pipeline *pipeline.Pipeline `json:"pipeline"`
Targets []*target.Target `json:"targets"`
}
@ -96,23 +94,14 @@ func (api *API) addJobHandler(writer http.ResponseWriter, request *http.Request,
var body requestBody
err := json.NewDecoder(request.Body).Decode(&body)
if err != nil {
statusResponseError(writer, http.StatusBadRequest, "invalid id: "+err.Error())
return
}
id := body.ID
var jobSlot job.Job
if !api.jobStore.AddJob(id, jobSlot) {
statusResponseError(writer, http.StatusBadRequest)
statusResponseError(writer, http.StatusBadRequest, "invalid request: "+err.Error())
return
}
nextJob := <-api.pendingJobs
api.jobStore.UpdateJob(id, nextJob)
writer.WriteHeader(http.StatusCreated)
json.NewEncoder(writer).Encode(replyBody{nextJob.Pipeline, nextJob.Targets})
json.NewEncoder(writer).Encode(replyBody{nextJob.ComposeID, nextJob.Pipeline, nextJob.Targets})
}
func (api *API) updateJobHandler(writer http.ResponseWriter, request *http.Request, params httprouter.Params) {
@ -128,7 +117,7 @@ func (api *API) updateJobHandler(writer http.ResponseWriter, request *http.Reque
id, err := uuid.Parse(params.ByName("id"))
if err != nil {
statusResponseError(writer, http.StatusBadRequest, "invalid job id: "+err.Error())
statusResponseError(writer, http.StatusBadRequest, "invalid compose id: "+err.Error())
return
}
@ -136,13 +125,8 @@ func (api *API) updateJobHandler(writer http.ResponseWriter, request *http.Reque
err = json.NewDecoder(request.Body).Decode(&body)
if err != nil {
statusResponseError(writer, http.StatusBadRequest, "invalid status: "+err.Error())
} else if body.Status == "RUNNING" {
api.jobStatus <- job.Status{ComposeID: id, Status: body.Status}
statusResponseOK(writer)
} else if body.Status == "FINISHED" {
api.jobStore.DeleteJob(id)
statusResponseOK(writer)
} else {
statusResponseError(writer, http.StatusBadRequest, "invalid status: "+body.Status)
}
api.jobStatus <- job.Status{ComposeID: id, Status: body.Status}
statusResponseOK(writer)
}

View file

@ -66,7 +66,7 @@ func testRoute(t *testing.T, api *jobqueue.API, method, path, body string, expec
}
func TestBasic(t *testing.T) {
expected_job := `{"pipeline":{"assembler":{"name":"org.osbuild.tar","options":{"filename":"image.tar"}}},"targets":[{"name":"org.osbuild.local","options":{"location":"/tmp/ffffffff-ffff-ffff-ffff-ffffffffffff"}}]}`
expected_job := `{"id":"ffffffff-ffff-ffff-ffff-ffffffffffff","pipeline":{"assembler":{"name":"org.osbuild.tar","options":{"filename":"image.tar"}}},"targets":[{"name":"org.osbuild.local","options":{"location":"/tmp/ffffffff-ffff-ffff-ffff-ffffffffffff"}}]}`
var cases = []struct {
Method string
Path string
@ -80,7 +80,7 @@ func TestBasic(t *testing.T) {
{"DELETE", "/job-queue/v1/foo", ``, http.StatusNotFound, ``},
{"POST", "/job-queue/v1/jobs", `{"id":"ffffffff-ffff-ffff-ffff-ffffffffffff"}`, http.StatusCreated, expected_job},
{"POST", "/job-queue/v1/jobs", `{"id":"ffffffff-ffff-ffff-ffff-ffffffffffff"}`, http.StatusBadRequest, ``},
//{"POST", "/job-queue/v1/jobs", `{"id":"ffffffff-ffff-ffff-ffff-ffffffffffff"}`, http.StatusBadRequest, ``},
//{"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", `{"status":"FINISHED"}`, http.StatusBadRequest, ``},
{"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"RUNNING"}`, http.StatusOK, ``},
{"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"RUNNING"}`, http.StatusOK, ``},