From 96f3cbf6553d644545beb1e2cd0d46e4261d20c8 Mon Sep 17 00:00:00 2001 From: Tom Gundersen Date: Sun, 29 Sep 2019 01:33:50 +0200 Subject: [PATCH] weldr/store/compose: store information exposed in the API Use the exact same status strings as is used in the API, making it clearer that they are the same (and avoiding any translation). Remember the creation/start/finish timestamps. And store the output type. Signed-off-by: Tom Gundersen --- cmd/osbuild-worker/main.go | 6 +++--- internal/jobqueue/api.go | 4 ++-- internal/jobqueue/api_test.go | 12 ++++++------ internal/weldr/store.go | 33 +++++++++++++++++++++++++++------ 4 files changed, 38 insertions(+), 17 deletions(-) diff --git a/cmd/osbuild-worker/main.go b/cmd/osbuild-worker/main.go index 7fd527a79..5fc32605c 100644 --- a/cmd/osbuild-worker/main.go +++ b/cmd/osbuild-worker/main.go @@ -57,7 +57,7 @@ func (c *ComposerClient) AddJob() (*Job, error) { err = json.NewDecoder(response.Body).Decode(&reply{ Pipeline: &job.Pipeline, - Targets: &job.Targets, + Targets: &job.Targets, }) if err != nil { return nil, err @@ -73,7 +73,7 @@ func (c *ComposerClient) UpdateJob(job *Job, status string) error { var b bytes.Buffer json.NewEncoder(&b).Encode(&request{status}) - req, err:= http.NewRequest("PATCH", "http://localhost/job-queue/v1/jobs/" + job.ID.String(), &b) + req, err := http.NewRequest("PATCH", "http://localhost/job-queue/v1/jobs/"+job.ID.String(), &b) if err != nil { return err } @@ -105,6 +105,6 @@ func main() { fmt.Printf("Running job %s\n", job.ID.String()) job.Run() - client.UpdateJob(job, "finished") + client.UpdateJob(job, "FINISHED") } } diff --git a/internal/jobqueue/api.go b/internal/jobqueue/api.go index 1e0f97551..1ed352187 100644 --- a/internal/jobqueue/api.go +++ b/internal/jobqueue/api.go @@ -136,10 +136,10 @@ func (api *API) updateJobHandler(writer http.ResponseWriter, request *http.Reque err = json.NewDecoder(request.Body).Decode(&body) if err != nil { statusResponseError(writer, http.StatusBadRequest, "invalid status: "+err.Error()) - } else if body.Status == "running" { + } else if body.Status == "RUNNING" { api.jobStatus <- job.Status{ComposeID: id, Status: body.Status} statusResponseOK(writer) - } else if body.Status == "finished" { + } else if body.Status == "FINISHED" { api.jobStore.DeleteJob(id) statusResponseOK(writer) } else { diff --git a/internal/jobqueue/api_test.go b/internal/jobqueue/api_test.go index 76fafa2d2..3a92e950f 100644 --- a/internal/jobqueue/api_test.go +++ b/internal/jobqueue/api_test.go @@ -81,12 +81,12 @@ func TestBasic(t *testing.T) { {"POST", "/job-queue/v1/jobs", `{"id":"ffffffff-ffff-ffff-ffff-ffffffffffff"}`, http.StatusCreated, expected_job}, {"POST", "/job-queue/v1/jobs", `{"id":"ffffffff-ffff-ffff-ffff-ffffffffffff"}`, http.StatusBadRequest, ``}, - //{"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", `{"status":"finished"}`, http.StatusBadRequest, ``}, - {"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"running"}`, http.StatusOK, ``}, - {"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"running"}`, http.StatusOK, ``}, - {"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"finished"}`, http.StatusOK, ``}, - //{"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"running"}`, http.StatusNotAllowed, ``}, - //{"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"finished"}`, http.StatusNotAllowed, ``}, + //{"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", `{"status":"FINISHED"}`, http.StatusBadRequest, ``}, + {"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"RUNNING"}`, http.StatusOK, ``}, + {"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"RUNNING"}`, http.StatusOK, ``}, + {"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"FINISHED"}`, http.StatusOK, ``}, + //{"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"RUNNING"}`, http.StatusNotAllowed, ``}, + //{"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"FINISHED"}`, http.StatusNotAllowed, ``}, } jobChannel := make(chan job.Job, 100) diff --git a/internal/weldr/store.go b/internal/weldr/store.go index 9c1a1290a..f30cd42c2 100644 --- a/internal/weldr/store.go +++ b/internal/weldr/store.go @@ -8,6 +8,7 @@ import ( "osbuild-composer/internal/target" "sort" "sync" + "time" "github.com/google/uuid" ) @@ -37,9 +38,13 @@ type blueprintPackage struct { } type compose struct { - Status string `json:"status"` - Blueprint *blueprint `json:"blueprint"` - Targets []*target.Target `json:"targets"` + QueueStatus string `json:"queue_status"` + Blueprint *blueprint `json:"blueprint"` + OutputType string `json:"output-type"` + Targets []*target.Target `json:"targets"` + JobCreated time.Time `json:"job_created"` + JobStarted time.Time `json:"job_started"` + JobFinished time.Time `json:"job_finished"` } func newStore(initialState []byte, stateChannel chan<- []byte, pendingJobs chan<- job.Job, jobUpdates <-chan job.Status) *store { @@ -59,7 +64,7 @@ func newStore(initialState []byte, stateChannel chan<- []byte, pendingJobs chan< s.Workspace = make(map[string]blueprint) } if s.Composes == nil { - // TODO: push pending/running composes to workers again + // TODO: push waiting/running composes to workers again s.Composes = make(map[uuid.UUID]compose) } s.stateChannel = stateChannel @@ -74,7 +79,17 @@ func newStore(initialState []byte, stateChannel chan<- []byte, pendingJobs chan< if !exists { return } - compose.Status = update.Status + if compose.QueueStatus != update.Status { + switch update.Status { + case "RUNNING": + compose.JobStarted = time.Now() + case "FINISHED": + fallthrough + case "FAILED": + compose.JobFinished = time.Now() + } + compose.QueueStatus = update.Status + } }) } }() @@ -193,7 +208,13 @@ func (s *store) addCompose(composeID uuid.UUID, bp *blueprint, composeType strin target.NewLocalTarget(target.NewLocalTargetOptions("/var/lib/osbuild-composer/outputs/" + composeID.String())), } s.change(func() { - s.Composes[composeID] = compose{"pending", bp, targets} + s.Composes[composeID] = compose{ + QueueStatus: "WAITING", + Blueprint: bp, + OutputType: composeType, + Targets: targets, + JobCreated: time.Now(), + } }) s.pendingJobs <- job.Job{ ComposeID: composeID,