From 9cb140795e5668e0edace0bcadfa20d1e7bf9fc5 Mon Sep 17 00:00:00 2001 From: Tom Gundersen Date: Sat, 5 Oct 2019 17:51:30 +0200 Subject: [PATCH] store: add {Push,Pop}Compose methods and hide channel Wrap the channel in Pop and Push methods, so it is not exposed to the callers. PushCompose replaces the old AddCompose for consistency, and PopCompose simply reads from the other end of the channel. Signed-off-by: Tom Gundersen --- cmd/osbuild-composer/main.go | 6 ++---- internal/jobqueue/api.go | 12 ++++-------- internal/jobqueue/api_test.go | 20 +++++--------------- internal/store/store.go | 12 ++++++++---- internal/weldr/api.go | 2 +- internal/weldr/api_test.go | 8 +++----- 6 files changed, 23 insertions(+), 37 deletions(-) diff --git a/cmd/osbuild-composer/main.go b/cmd/osbuild-composer/main.go index e443c2b01..4e3c7fd95 100644 --- a/cmd/osbuild-composer/main.go +++ b/cmd/osbuild-composer/main.go @@ -8,7 +8,6 @@ import ( "path/filepath" "osbuild-composer/internal/blueprint" - "osbuild-composer/internal/job" "osbuild-composer/internal/jobqueue" "osbuild-composer/internal/rpmmd" "osbuild-composer/internal/store" @@ -63,9 +62,8 @@ func main() { } stateChannel := make(chan []byte, 10) - jobChannel := make(chan job.Job, 200) - store := store.New(state, stateChannel, jobChannel) + store := store.New(state, stateChannel) // sample blueprint on first run if state == nil { store.PushBlueprint(blueprint.Blueprint{ @@ -77,7 +75,7 @@ func main() { }) } - jobAPI := jobqueue.New(logger, store, jobChannel) + jobAPI := jobqueue.New(logger, store) weldrAPI := weldr.New(repo, packages, logger, store) go func() { for { diff --git a/internal/jobqueue/api.go b/internal/jobqueue/api.go index f34f06ad5..64fd973db 100644 --- a/internal/jobqueue/api.go +++ b/internal/jobqueue/api.go @@ -5,7 +5,6 @@ import ( "log" "net" "net/http" - "osbuild-composer/internal/job" "osbuild-composer/internal/pipeline" "osbuild-composer/internal/store" "osbuild-composer/internal/target" @@ -15,18 +14,15 @@ import ( ) type API struct { - pendingJobs <-chan job.Job - logger *log.Logger store *store.Store router *httprouter.Router } -func New(logger *log.Logger, store *store.Store, jobs <-chan job.Job) *API { +func New(logger *log.Logger, store *store.Store) *API { api := &API{ - logger: logger, - store: store, - pendingJobs: jobs, + logger: logger, + store: store, } api.router = httprouter.New() @@ -99,7 +95,7 @@ func (api *API) addJobHandler(writer http.ResponseWriter, request *http.Request, return } - nextJob := <-api.pendingJobs + nextJob := api.store.PopCompose() writer.WriteHeader(http.StatusCreated) json.NewEncoder(writer).Encode(replyBody{nextJob.ComposeID, nextJob.Pipeline, nextJob.Targets}) diff --git a/internal/jobqueue/api_test.go b/internal/jobqueue/api_test.go index b86d71cf6..254ac9fd2 100644 --- a/internal/jobqueue/api_test.go +++ b/internal/jobqueue/api_test.go @@ -10,11 +10,9 @@ import ( "strings" "testing" - "osbuild-composer/internal/job" + "osbuild-composer/internal/blueprint" "osbuild-composer/internal/jobqueue" - "osbuild-composer/internal/pipeline" "osbuild-composer/internal/store" - "osbuild-composer/internal/target" "github.com/google/uuid" ) @@ -67,7 +65,7 @@ func testRoute(t *testing.T, api *jobqueue.API, method, path, body string, expec } func TestBasic(t *testing.T) { - expected_job := `{"id":"ffffffff-ffff-ffff-ffff-ffffffffffff","pipeline":{"assembler":{"name":"org.osbuild.tar","options":{"filename":"image.tar"}}},"targets":[{"name":"org.osbuild.local","options":{"location":"/tmp/ffffffff-ffff-ffff-ffff-ffffffffffff"}}]}` + expected_job := `{"id":"ffffffff-ffff-ffff-ffff-ffffffffffff","pipeline":{"assembler":{"name":"org.osbuild.tar","options":{"filename":"image.tar"}}},"targets":[{"name":"org.osbuild.local","options":{"location":"/var/lib/osbuild-composer/outputs/ffffffff-ffff-ffff-ffff-ffffffffffff"}}]}` var cases = []struct { Method string Path string @@ -90,19 +88,11 @@ func TestBasic(t *testing.T) { //{"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"FINISHED"}`, http.StatusNotAllowed, ``}, } - jobChannel := make(chan job.Job, 100) - api := jobqueue.New(nil, store.New(nil, nil, jobChannel), jobChannel) + store := store.New(nil, nil) + api := jobqueue.New(nil, store) for _, c := range cases { id, _ := uuid.Parse("ffffffff-ffff-ffff-ffff-ffffffffffff") - p := &pipeline.Pipeline{} - p.SetAssembler(pipeline.NewTarAssembler(pipeline.NewTarAssemblerOptions("image.tar"))) - jobChannel <- job.Job{ - ComposeID: id, - Pipeline: p, - Targets: []*target.Target{ - target.NewLocalTarget(target.NewLocalTargetOptions("/tmp/" + id.String())), - }, - } + store.PushCompose(id, &blueprint.Blueprint{}, "tar") testRoute(t, api, c.Method, c.Path, c.Body, c.ExpectedStatus, c.ExpectedJSON) } diff --git a/internal/store/store.go b/internal/store/store.go index b91e8937b..4c7debbd7 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -25,7 +25,7 @@ type Store struct { Composes map[uuid.UUID]Compose `json:"composes"` mu sync.RWMutex // protects all fields - pendingJobs chan<- job.Job + pendingJobs chan job.Job stateChannel chan<- []byte } @@ -48,7 +48,7 @@ type Image struct { Mime string } -func New(initialState []byte, stateChannel chan<- []byte, pendingJobs chan<- job.Job) *Store { +func New(initialState []byte, stateChannel chan<- []byte) *Store { var s Store if initialState != nil { @@ -69,7 +69,7 @@ func New(initialState []byte, stateChannel chan<- []byte, pendingJobs chan<- job s.Composes = make(map[uuid.UUID]Compose) } s.stateChannel = stateChannel - s.pendingJobs = pendingJobs + s.pendingJobs = make(chan job.Job, 200) return &s } @@ -254,7 +254,7 @@ func (s *Store) DeleteBlueprintFromWorkspace(name string) { }) } -func (s *Store) AddCompose(composeID uuid.UUID, bp *blueprint.Blueprint, composeType string) { +func (s *Store) PushCompose(composeID uuid.UUID, bp *blueprint.Blueprint, composeType string) { targets := []*target.Target{ target.NewLocalTarget(target.NewLocalTargetOptions("/var/lib/osbuild-composer/outputs/" + composeID.String())), } @@ -274,6 +274,10 @@ func (s *Store) AddCompose(composeID uuid.UUID, bp *blueprint.Blueprint, compose } } +func (s *Store) PopCompose() job.Job { + return <-s.pendingJobs +} + func (s *Store) UpdateCompose(composeID uuid.UUID, status string) { s.change(func() { compose, exists := s.Composes[composeID] diff --git a/internal/weldr/api.go b/internal/weldr/api.go index e5f5c1195..7fd7e9af7 100644 --- a/internal/weldr/api.go +++ b/internal/weldr/api.go @@ -623,7 +623,7 @@ func (api *API) composeHandler(writer http.ResponseWriter, httpRequest *http.Req found := api.store.GetBlueprint(cr.BlueprintName, &bp, &changed) // TODO: what to do with changed? if found { - api.store.AddCompose(reply.BuildID, &bp, cr.ComposeType) + api.store.PushCompose(reply.BuildID, &bp, cr.ComposeType) } else { statusResponseError(writer, http.StatusBadRequest, "blueprint does not exist") return diff --git a/internal/weldr/api_test.go b/internal/weldr/api_test.go index 593e6da83..bfd88932d 100644 --- a/internal/weldr/api_test.go +++ b/internal/weldr/api_test.go @@ -13,7 +13,6 @@ import ( "strings" "testing" - "osbuild-composer/internal/job" "osbuild-composer/internal/rpmmd" "osbuild-composer/internal/store" "osbuild-composer/internal/weldr" @@ -154,13 +153,13 @@ func TestBasic(t *testing.T) { } for _, c := range cases { - api := weldr.New(repo, packages, nil, store.New(nil, nil, nil)) + api := weldr.New(repo, packages, nil, store.New(nil, nil)) testRoute(t, api, "GET", c.Path, ``, c.ExpectedStatus, c.ExpectedJSON) } } func TestBlueprints(t *testing.T) { - api := weldr.New(repo, packages, nil, store.New(nil, nil, nil)) + api := weldr.New(repo, packages, nil, store.New(nil, nil)) testRoute(t, api, "POST", "/api/v0/blueprints/new", `{"name":"test","description":"Test","packages":[{"name":"httpd","version":"2.4.*"}],"version":"0.0.0"}`, @@ -183,8 +182,7 @@ func TestBlueprints(t *testing.T) { } func TestCompose(t *testing.T) { - jobChannel := make(chan job.Job, 200) - api := weldr.New(repo, packages, nil, store.New(nil, nil, jobChannel)) + api := weldr.New(repo, packages, nil, store.New(nil, nil)) testRoute(t, api, "POST", "/api/v0/blueprints/new", `{"name":"test","description":"Test","packages":[{"name":"httpd","version":"2.4.*"}],"version":"0.0.0"}`,