store: pass the store to the jobqueue API
Drop the jobUpdates channel, and instead add an UpdateCompose method to the store, which updates the status of a compose directly. This allows us to report back errors directly, rather than having to mirror the staet in the jobqueue API. Signed-off-by: Tom Gundersen <teg@jklm.no>
This commit is contained in:
parent
89fd2e6037
commit
3ff4f59fc7
5 changed files with 34 additions and 39 deletions
|
|
@ -7,6 +7,7 @@ import (
|
|||
"net/http"
|
||||
"osbuild-composer/internal/job"
|
||||
"osbuild-composer/internal/pipeline"
|
||||
"osbuild-composer/internal/store"
|
||||
"osbuild-composer/internal/target"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
|
@ -15,17 +16,17 @@ import (
|
|||
|
||||
type API struct {
|
||||
pendingJobs <-chan job.Job
|
||||
jobStatus chan<- job.Status
|
||||
|
||||
logger *log.Logger
|
||||
store *store.Store
|
||||
router *httprouter.Router
|
||||
}
|
||||
|
||||
func New(logger *log.Logger, jobs <-chan job.Job, jobStatus chan<- job.Status) *API {
|
||||
func New(logger *log.Logger, store *store.Store, jobs <-chan job.Job) *API {
|
||||
api := &API{
|
||||
logger: logger,
|
||||
store: store,
|
||||
pendingJobs: jobs,
|
||||
jobStatus: jobStatus,
|
||||
}
|
||||
|
||||
api.router = httprouter.New()
|
||||
|
|
@ -127,6 +128,6 @@ func (api *API) updateJobHandler(writer http.ResponseWriter, request *http.Reque
|
|||
statusResponseError(writer, http.StatusBadRequest, "invalid status: "+err.Error())
|
||||
}
|
||||
|
||||
api.jobStatus <- job.Status{ComposeID: id, Status: body.Status}
|
||||
api.store.UpdateCompose(id, body.Status)
|
||||
statusResponseOK(writer)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import (
|
|||
"osbuild-composer/internal/job"
|
||||
"osbuild-composer/internal/jobqueue"
|
||||
"osbuild-composer/internal/pipeline"
|
||||
"osbuild-composer/internal/store"
|
||||
"osbuild-composer/internal/target"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
|
@ -90,8 +91,7 @@ func TestBasic(t *testing.T) {
|
|||
}
|
||||
|
||||
jobChannel := make(chan job.Job, 100)
|
||||
statusChannel := make(chan job.Status, 100)
|
||||
api := jobqueue.New(nil, jobChannel, statusChannel)
|
||||
api := jobqueue.New(nil, store.New(nil, nil, jobChannel), jobChannel)
|
||||
for _, c := range cases {
|
||||
id, _ := uuid.Parse("ffffffff-ffff-ffff-ffff-ffffffffffff")
|
||||
p := &pipeline.Pipeline{}
|
||||
|
|
|
|||
|
|
@ -26,7 +26,6 @@ type Store struct {
|
|||
|
||||
mu sync.RWMutex // protects all fields
|
||||
pendingJobs chan<- job.Job
|
||||
jobUpdates <-chan job.Status
|
||||
stateChannel chan<- []byte
|
||||
}
|
||||
|
||||
|
|
@ -49,7 +48,7 @@ type Image struct {
|
|||
Mime string
|
||||
}
|
||||
|
||||
func New(initialState []byte, stateChannel chan<- []byte, pendingJobs chan<- job.Job, jobUpdates <-chan job.Status) *Store {
|
||||
func New(initialState []byte, stateChannel chan<- []byte, pendingJobs chan<- job.Job) *Store {
|
||||
var s Store
|
||||
|
||||
if initialState != nil {
|
||||
|
|
@ -71,31 +70,6 @@ func New(initialState []byte, stateChannel chan<- []byte, pendingJobs chan<- job
|
|||
}
|
||||
s.stateChannel = stateChannel
|
||||
s.pendingJobs = pendingJobs
|
||||
s.jobUpdates = jobUpdates
|
||||
|
||||
go func() {
|
||||
for {
|
||||
update := <-s.jobUpdates
|
||||
s.change(func() {
|
||||
compose, exists := s.Composes[update.ComposeID]
|
||||
if !exists {
|
||||
return
|
||||
}
|
||||
if compose.QueueStatus != update.Status {
|
||||
switch update.Status {
|
||||
case "RUNNING":
|
||||
compose.JobStarted = time.Now()
|
||||
case "FINISHED":
|
||||
fallthrough
|
||||
case "FAILED":
|
||||
compose.JobFinished = time.Now()
|
||||
}
|
||||
compose.QueueStatus = update.Status
|
||||
s.Composes[update.ComposeID] = compose
|
||||
}
|
||||
})
|
||||
}
|
||||
}()
|
||||
|
||||
return &s
|
||||
}
|
||||
|
|
@ -300,6 +274,27 @@ func (s *Store) AddCompose(composeID uuid.UUID, bp *blueprint.Blueprint, compose
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Store) UpdateCompose(composeID uuid.UUID, status string) {
|
||||
s.change(func() {
|
||||
compose, exists := s.Composes[composeID]
|
||||
if !exists {
|
||||
return
|
||||
}
|
||||
if compose.QueueStatus != status {
|
||||
switch status {
|
||||
case "RUNNING":
|
||||
compose.JobStarted = time.Now()
|
||||
case "FINISHED":
|
||||
fallthrough
|
||||
case "FAILED":
|
||||
compose.JobFinished = time.Now()
|
||||
}
|
||||
compose.QueueStatus = status
|
||||
s.Composes[composeID] = compose
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Store) GetImage(composeID uuid.UUID) (*Image, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
|
|
|||
|
|
@ -154,13 +154,13 @@ func TestBasic(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, c := range cases {
|
||||
api := weldr.New(repo, packages, nil, store.New(nil, nil, nil, nil))
|
||||
api := weldr.New(repo, packages, nil, store.New(nil, nil, nil))
|
||||
testRoute(t, api, "GET", c.Path, ``, c.ExpectedStatus, c.ExpectedJSON)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlueprints(t *testing.T) {
|
||||
api := weldr.New(repo, packages, nil, store.New(nil, nil, nil, nil))
|
||||
api := weldr.New(repo, packages, nil, store.New(nil, nil, nil))
|
||||
|
||||
testRoute(t, api, "POST", "/api/v0/blueprints/new",
|
||||
`{"name":"test","description":"Test","packages":[{"name":"httpd","version":"2.4.*"}],"version":"0.0.0"}`,
|
||||
|
|
@ -184,7 +184,7 @@ func TestBlueprints(t *testing.T) {
|
|||
|
||||
func TestCompose(t *testing.T) {
|
||||
jobChannel := make(chan job.Job, 200)
|
||||
api := weldr.New(repo, packages, nil, store.New(nil, nil, jobChannel, nil))
|
||||
api := weldr.New(repo, packages, nil, store.New(nil, nil, jobChannel))
|
||||
|
||||
testRoute(t, api, "POST", "/api/v0/blueprints/new",
|
||||
`{"name":"test","description":"Test","packages":[{"name":"httpd","version":"2.4.*"}],"version":"0.0.0"}`,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue