weldr/store: keep the compose status up-to-date
This way it can be correctly exposed in the API. We listen on a channel from the job-queue, where status updates are pushed when the worker is running/finished (or, in the future, failed). Signed-off-by: Tom Gundersen <teg@jklm.no>
This commit is contained in:
parent
cad89c6650
commit
f880581a14
7 changed files with 36 additions and 21 deletions
|
|
@ -79,8 +79,9 @@ func main() {
|
|||
|
||||
stateChannel := make(chan []byte, 10)
|
||||
jobChannel := make(chan job.Job, 200)
|
||||
jobAPI := jobqueue.New(logger, jobChannel)
|
||||
weldrAPI := weldr.New(repo, packages, logger, state, stateChannel, jobChannel)
|
||||
jobUpdateChannel := make(chan job.Status, 200)
|
||||
jobAPI := jobqueue.New(logger, jobChannel, jobUpdateChannel)
|
||||
weldrAPI := weldr.New(repo, packages, logger, state, stateChannel, jobChannel, jobUpdateChannel)
|
||||
go func() {
|
||||
for {
|
||||
err := writeFileAtomically(StateFile, <-stateChannel, 0755)
|
||||
|
|
|
|||
|
|
@ -12,3 +12,8 @@ type Job struct {
|
|||
Pipeline pipeline.Pipeline
|
||||
Targets []target.Target
|
||||
}
|
||||
|
||||
type Status struct {
|
||||
ComposeID uuid.UUID
|
||||
Status string
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,16 +16,18 @@ import (
|
|||
type API struct {
|
||||
jobStore *job.Store
|
||||
pendingJobs <-chan job.Job
|
||||
jobStatus chan<- job.Status
|
||||
|
||||
logger *log.Logger
|
||||
router *httprouter.Router
|
||||
}
|
||||
|
||||
func New(logger *log.Logger, jobs <-chan job.Job) *API {
|
||||
func New(logger *log.Logger, jobs <-chan job.Job, jobStatus chan<- job.Status) *API {
|
||||
api := &API{
|
||||
jobStore: job.NewStore(),
|
||||
logger: logger,
|
||||
pendingJobs: jobs,
|
||||
jobStatus: jobStatus,
|
||||
}
|
||||
|
||||
api.router = httprouter.New()
|
||||
|
|
@ -135,6 +137,7 @@ func (api *API) updateJobHandler(writer http.ResponseWriter, request *http.Reque
|
|||
if err != nil {
|
||||
statusResponseError(writer, http.StatusBadRequest, "invalid status: "+err.Error())
|
||||
} else if body.Status == "running" {
|
||||
api.jobStatus <- job.Status{ComposeID: id, Status: body.Status}
|
||||
statusResponseOK(writer)
|
||||
} else if body.Status == "finished" {
|
||||
api.jobStore.DeleteJob(id)
|
||||
|
|
|
|||
|
|
@ -90,7 +90,8 @@ func TestBasic(t *testing.T) {
|
|||
}
|
||||
|
||||
jobChannel := make(chan job.Job, 100)
|
||||
api := jobqueue.New(nil, jobChannel)
|
||||
statusChannel := make(chan job.Status, 100)
|
||||
api := jobqueue.New(nil, jobChannel, statusChannel)
|
||||
for _, c := range cases {
|
||||
id, _ := uuid.Parse("ffffffff-ffff-ffff-ffff-ffffffffffff")
|
||||
jobChannel <- job.Job{
|
||||
|
|
|
|||
|
|
@ -25,11 +25,11 @@ type API struct {
|
|||
router *httprouter.Router
|
||||
}
|
||||
|
||||
func New(repo rpmmd.RepoConfig, packages rpmmd.PackageList, logger *log.Logger, initialState []byte, stateChannel chan<- []byte, jobs chan<- job.Job) *API {
|
||||
func New(repo rpmmd.RepoConfig, packages rpmmd.PackageList, logger *log.Logger, initialState []byte, stateChannel chan<- []byte, jobs chan<- job.Job, jobStatus <-chan job.Status) *API {
|
||||
// This needs to be shared with the worker API so that they can communicate with each other
|
||||
// builds := make(chan queue.Build, 200)
|
||||
api := &API{
|
||||
store: newStore(initialState, stateChannel, jobs),
|
||||
store: newStore(initialState, stateChannel, jobs, jobStatus),
|
||||
repo: repo,
|
||||
packages: packages,
|
||||
logger: logger,
|
||||
|
|
|
|||
|
|
@ -112,13 +112,13 @@ func TestBasic(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, c := range cases {
|
||||
api := weldr.New(repo, packages, nil, nil, nil, nil)
|
||||
api := weldr.New(repo, packages, nil, nil, nil, nil, nil)
|
||||
testRoute(t, api, "GET", c.Path, ``, c.ExpectedStatus, c.ExpectedJSON)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlueprints(t *testing.T) {
|
||||
api := weldr.New(repo, packages, nil, nil, nil, nil)
|
||||
api := weldr.New(repo, packages, nil, nil, nil, nil, nil)
|
||||
|
||||
testRoute(t, api, "POST", "/api/v0/blueprints/new",
|
||||
`{"name":"test","description":"Test","packages":[{"name":"httpd","version":"2.4.*"}],"version":"0"}`,
|
||||
|
|
@ -142,7 +142,7 @@ func TestBlueprints(t *testing.T) {
|
|||
|
||||
func TestCompose(t *testing.T) {
|
||||
jobChannel := make(chan job.Job, 200)
|
||||
api := weldr.New(repo, packages, nil, nil, nil, jobChannel)
|
||||
api := weldr.New(repo, packages, nil, nil, nil, jobChannel, nil)
|
||||
|
||||
testRoute(t, api, "POST", "/api/v0/blueprints/new",
|
||||
`{"name":"test","description":"Test","packages":[{"name":"httpd","version":"2.4.*"}],"version":"0"}`,
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ type store struct {
|
|||
|
||||
mu sync.RWMutex // protects all fields
|
||||
pendingJobs chan<- job.Job
|
||||
jobUpdates <-chan job.Status
|
||||
stateChannel chan<- []byte
|
||||
}
|
||||
|
||||
|
|
@ -36,12 +37,12 @@ type blueprintPackage struct {
|
|||
}
|
||||
|
||||
type compose struct {
|
||||
State string `json:"state"`
|
||||
Status string `json:"status"`
|
||||
Pipeline pipeline.Pipeline `json:"pipeline"`
|
||||
Targets []target.Target `json:"targets"`
|
||||
}
|
||||
|
||||
func newStore(initialState []byte, stateChannel chan<- []byte, pendingJobs chan<- job.Job) *store {
|
||||
func newStore(initialState []byte, stateChannel chan<- []byte, pendingJobs chan<- job.Job, jobUpdates <-chan job.Status) *store {
|
||||
var s store
|
||||
|
||||
if initialState != nil {
|
||||
|
|
@ -63,6 +64,20 @@ func newStore(initialState []byte, stateChannel chan<- []byte, pendingJobs chan<
|
|||
}
|
||||
s.stateChannel = stateChannel
|
||||
s.pendingJobs = pendingJobs
|
||||
s.jobUpdates = jobUpdates
|
||||
|
||||
go func() {
|
||||
for {
|
||||
update := <-s.jobUpdates
|
||||
s.change(func() {
|
||||
compose, exists := s.Composes[update.ComposeID]
|
||||
if !exists {
|
||||
return
|
||||
}
|
||||
compose.Status = update.Status
|
||||
})
|
||||
}
|
||||
}()
|
||||
|
||||
return &s
|
||||
}
|
||||
|
|
@ -191,16 +206,6 @@ func (s *store) addCompose(composeID uuid.UUID, bp blueprint, composeType string
|
|||
}
|
||||
}
|
||||
|
||||
func (s *store) updateCompose(composeID uuid.UUID, state string) {
|
||||
s.change(func() {
|
||||
compose, exists := s.Composes[composeID]
|
||||
if !exists {
|
||||
return
|
||||
}
|
||||
compose.State = state
|
||||
})
|
||||
}
|
||||
|
||||
func (b *blueprint) translateToPipeline(outputFormat string) pipeline.Pipeline {
|
||||
return pipeline.Pipeline{
|
||||
Assembler: pipeline.Assembler{
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue