diff --git a/cmd/osbuild-composer/main.go b/cmd/osbuild-composer/main.go index e443c2b01..4e3c7fd95 100644 --- a/cmd/osbuild-composer/main.go +++ b/cmd/osbuild-composer/main.go @@ -8,7 +8,6 @@ import ( "path/filepath" "osbuild-composer/internal/blueprint" - "osbuild-composer/internal/job" "osbuild-composer/internal/jobqueue" "osbuild-composer/internal/rpmmd" "osbuild-composer/internal/store" @@ -63,9 +62,8 @@ func main() { } stateChannel := make(chan []byte, 10) - jobChannel := make(chan job.Job, 200) - store := store.New(state, stateChannel, jobChannel) + store := store.New(state, stateChannel) // sample blueprint on first run if state == nil { store.PushBlueprint(blueprint.Blueprint{ @@ -77,7 +75,7 @@ func main() { }) } - jobAPI := jobqueue.New(logger, store, jobChannel) + jobAPI := jobqueue.New(logger, store) weldrAPI := weldr.New(repo, packages, logger, store) go func() { for { diff --git a/internal/jobqueue/api.go b/internal/jobqueue/api.go index f34f06ad5..64fd973db 100644 --- a/internal/jobqueue/api.go +++ b/internal/jobqueue/api.go @@ -5,7 +5,6 @@ import ( "log" "net" "net/http" - "osbuild-composer/internal/job" "osbuild-composer/internal/pipeline" "osbuild-composer/internal/store" "osbuild-composer/internal/target" @@ -15,18 +14,15 @@ import ( ) type API struct { - pendingJobs <-chan job.Job - logger *log.Logger store *store.Store router *httprouter.Router } -func New(logger *log.Logger, store *store.Store, jobs <-chan job.Job) *API { +func New(logger *log.Logger, store *store.Store) *API { api := &API{ - logger: logger, - store: store, - pendingJobs: jobs, + logger: logger, + store: store, } api.router = httprouter.New() @@ -99,7 +95,7 @@ func (api *API) addJobHandler(writer http.ResponseWriter, request *http.Request, return } - nextJob := <-api.pendingJobs + nextJob := api.store.PopCompose() writer.WriteHeader(http.StatusCreated) json.NewEncoder(writer).Encode(replyBody{nextJob.ComposeID, nextJob.Pipeline, nextJob.Targets}) diff --git a/internal/jobqueue/api_test.go b/internal/jobqueue/api_test.go index b86d71cf6..254ac9fd2 100644 --- a/internal/jobqueue/api_test.go +++ b/internal/jobqueue/api_test.go @@ -10,11 +10,9 @@ import ( "strings" "testing" - "osbuild-composer/internal/job" + "osbuild-composer/internal/blueprint" "osbuild-composer/internal/jobqueue" - "osbuild-composer/internal/pipeline" "osbuild-composer/internal/store" - "osbuild-composer/internal/target" "github.com/google/uuid" ) @@ -67,7 +65,7 @@ func testRoute(t *testing.T, api *jobqueue.API, method, path, body string, expec } func TestBasic(t *testing.T) { - expected_job := `{"id":"ffffffff-ffff-ffff-ffff-ffffffffffff","pipeline":{"assembler":{"name":"org.osbuild.tar","options":{"filename":"image.tar"}}},"targets":[{"name":"org.osbuild.local","options":{"location":"/tmp/ffffffff-ffff-ffff-ffff-ffffffffffff"}}]}` + expected_job := `{"id":"ffffffff-ffff-ffff-ffff-ffffffffffff","pipeline":{"assembler":{"name":"org.osbuild.tar","options":{"filename":"image.tar"}}},"targets":[{"name":"org.osbuild.local","options":{"location":"/var/lib/osbuild-composer/outputs/ffffffff-ffff-ffff-ffff-ffffffffffff"}}]}` var cases = []struct { Method string Path string @@ -90,19 +88,11 @@ func TestBasic(t *testing.T) { //{"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"FINISHED"}`, http.StatusNotAllowed, ``}, } - jobChannel := make(chan job.Job, 100) - api := jobqueue.New(nil, store.New(nil, nil, jobChannel), jobChannel) + store := store.New(nil, nil) + api := jobqueue.New(nil, store) for _, c := range cases { id, _ := uuid.Parse("ffffffff-ffff-ffff-ffff-ffffffffffff") - p := &pipeline.Pipeline{} - p.SetAssembler(pipeline.NewTarAssembler(pipeline.NewTarAssemblerOptions("image.tar"))) - jobChannel <- job.Job{ - ComposeID: id, - Pipeline: p, - Targets: []*target.Target{ - target.NewLocalTarget(target.NewLocalTargetOptions("/tmp/" + id.String())), - }, - } + store.PushCompose(id, &blueprint.Blueprint{}, "tar") testRoute(t, api, c.Method, c.Path, c.Body, c.ExpectedStatus, c.ExpectedJSON) } diff --git a/internal/store/store.go b/internal/store/store.go index b91e8937b..4c7debbd7 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -25,7 +25,7 @@ type Store struct { Composes map[uuid.UUID]Compose `json:"composes"` mu sync.RWMutex // protects all fields - pendingJobs chan<- job.Job + pendingJobs chan job.Job stateChannel chan<- []byte } @@ -48,7 +48,7 @@ type Image struct { Mime string } -func New(initialState []byte, stateChannel chan<- []byte, pendingJobs chan<- job.Job) *Store { +func New(initialState []byte, stateChannel chan<- []byte) *Store { var s Store if initialState != nil { @@ -69,7 +69,7 @@ func New(initialState []byte, stateChannel chan<- []byte, pendingJobs chan<- job s.Composes = make(map[uuid.UUID]Compose) } s.stateChannel = stateChannel - s.pendingJobs = pendingJobs + s.pendingJobs = make(chan job.Job, 200) return &s } @@ -254,7 +254,7 @@ func (s *Store) DeleteBlueprintFromWorkspace(name string) { }) } -func (s *Store) AddCompose(composeID uuid.UUID, bp *blueprint.Blueprint, composeType string) { +func (s *Store) PushCompose(composeID uuid.UUID, bp *blueprint.Blueprint, composeType string) { targets := []*target.Target{ target.NewLocalTarget(target.NewLocalTargetOptions("/var/lib/osbuild-composer/outputs/" + composeID.String())), } @@ -274,6 +274,10 @@ func (s *Store) AddCompose(composeID uuid.UUID, bp *blueprint.Blueprint, compose } } +func (s *Store) PopCompose() job.Job { + return <-s.pendingJobs +} + func (s *Store) UpdateCompose(composeID uuid.UUID, status string) { s.change(func() { compose, exists := s.Composes[composeID] diff --git a/internal/weldr/api.go b/internal/weldr/api.go index e5f5c1195..7fd7e9af7 100644 --- a/internal/weldr/api.go +++ b/internal/weldr/api.go @@ -623,7 +623,7 @@ func (api *API) composeHandler(writer http.ResponseWriter, httpRequest *http.Req found := api.store.GetBlueprint(cr.BlueprintName, &bp, &changed) // TODO: what to do with changed? if found { - api.store.AddCompose(reply.BuildID, &bp, cr.ComposeType) + api.store.PushCompose(reply.BuildID, &bp, cr.ComposeType) } else { statusResponseError(writer, http.StatusBadRequest, "blueprint does not exist") return diff --git a/internal/weldr/api_test.go b/internal/weldr/api_test.go index 593e6da83..bfd88932d 100644 --- a/internal/weldr/api_test.go +++ b/internal/weldr/api_test.go @@ -13,7 +13,6 @@ import ( "strings" "testing" - "osbuild-composer/internal/job" "osbuild-composer/internal/rpmmd" "osbuild-composer/internal/store" "osbuild-composer/internal/weldr" @@ -154,13 +153,13 @@ func TestBasic(t *testing.T) { } for _, c := range cases { - api := weldr.New(repo, packages, nil, store.New(nil, nil, nil)) + api := weldr.New(repo, packages, nil, store.New(nil, nil)) testRoute(t, api, "GET", c.Path, ``, c.ExpectedStatus, c.ExpectedJSON) } } func TestBlueprints(t *testing.T) { - api := weldr.New(repo, packages, nil, store.New(nil, nil, nil)) + api := weldr.New(repo, packages, nil, store.New(nil, nil)) testRoute(t, api, "POST", "/api/v0/blueprints/new", `{"name":"test","description":"Test","packages":[{"name":"httpd","version":"2.4.*"}],"version":"0.0.0"}`, @@ -183,8 +182,7 @@ func TestBlueprints(t *testing.T) { } func TestCompose(t *testing.T) { - jobChannel := make(chan job.Job, 200) - api := weldr.New(repo, packages, nil, store.New(nil, nil, jobChannel)) + api := weldr.New(repo, packages, nil, store.New(nil, nil)) testRoute(t, api, "POST", "/api/v0/blueprints/new", `{"name":"test","description":"Test","packages":[{"name":"httpd","version":"2.4.*"}],"version":"0.0.0"}`,