diff --git a/cmd/osbuild-worker/main.go b/cmd/osbuild-worker/main.go index 7fd527a79..5fc32605c 100644 --- a/cmd/osbuild-worker/main.go +++ b/cmd/osbuild-worker/main.go @@ -57,7 +57,7 @@ func (c *ComposerClient) AddJob() (*Job, error) { err = json.NewDecoder(response.Body).Decode(&reply{ Pipeline: &job.Pipeline, - Targets: &job.Targets, + Targets: &job.Targets, }) if err != nil { return nil, err @@ -73,7 +73,7 @@ func (c *ComposerClient) UpdateJob(job *Job, status string) error { var b bytes.Buffer json.NewEncoder(&b).Encode(&request{status}) - req, err:= http.NewRequest("PATCH", "http://localhost/job-queue/v1/jobs/" + job.ID.String(), &b) + req, err := http.NewRequest("PATCH", "http://localhost/job-queue/v1/jobs/"+job.ID.String(), &b) if err != nil { return err } @@ -105,6 +105,6 @@ func main() { fmt.Printf("Running job %s\n", job.ID.String()) job.Run() - client.UpdateJob(job, "finished") + client.UpdateJob(job, "FINISHED") } } diff --git a/internal/jobqueue/api.go b/internal/jobqueue/api.go index 1e0f97551..1ed352187 100644 --- a/internal/jobqueue/api.go +++ b/internal/jobqueue/api.go @@ -136,10 +136,10 @@ func (api *API) updateJobHandler(writer http.ResponseWriter, request *http.Reque err = json.NewDecoder(request.Body).Decode(&body) if err != nil { statusResponseError(writer, http.StatusBadRequest, "invalid status: "+err.Error()) - } else if body.Status == "running" { + } else if body.Status == "RUNNING" { api.jobStatus <- job.Status{ComposeID: id, Status: body.Status} statusResponseOK(writer) - } else if body.Status == "finished" { + } else if body.Status == "FINISHED" { api.jobStore.DeleteJob(id) statusResponseOK(writer) } else { diff --git a/internal/jobqueue/api_test.go b/internal/jobqueue/api_test.go index 76fafa2d2..3a92e950f 100644 --- a/internal/jobqueue/api_test.go +++ b/internal/jobqueue/api_test.go @@ -81,12 +81,12 @@ func TestBasic(t *testing.T) { {"POST", "/job-queue/v1/jobs", `{"id":"ffffffff-ffff-ffff-ffff-ffffffffffff"}`, http.StatusCreated, expected_job}, {"POST", "/job-queue/v1/jobs", `{"id":"ffffffff-ffff-ffff-ffff-ffffffffffff"}`, http.StatusBadRequest, ``}, - //{"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", `{"status":"finished"}`, http.StatusBadRequest, ``}, - {"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"running"}`, http.StatusOK, ``}, - {"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"running"}`, http.StatusOK, ``}, - {"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"finished"}`, http.StatusOK, ``}, - //{"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"running"}`, http.StatusNotAllowed, ``}, - //{"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"finished"}`, http.StatusNotAllowed, ``}, + //{"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", `{"status":"FINISHED"}`, http.StatusBadRequest, ``}, + {"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"RUNNING"}`, http.StatusOK, ``}, + {"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"RUNNING"}`, http.StatusOK, ``}, + {"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"FINISHED"}`, http.StatusOK, ``}, + //{"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"RUNNING"}`, http.StatusNotAllowed, ``}, + //{"PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"FINISHED"}`, http.StatusNotAllowed, ``}, } jobChannel := make(chan job.Job, 100) diff --git a/internal/weldr/store.go b/internal/weldr/store.go index 9c1a1290a..f30cd42c2 100644 --- a/internal/weldr/store.go +++ b/internal/weldr/store.go @@ -8,6 +8,7 @@ import ( "osbuild-composer/internal/target" "sort" "sync" + "time" "github.com/google/uuid" ) @@ -37,9 +38,13 @@ type blueprintPackage struct { } type compose struct { - Status string `json:"status"` - Blueprint *blueprint `json:"blueprint"` - Targets []*target.Target `json:"targets"` + QueueStatus string `json:"queue_status"` + Blueprint *blueprint `json:"blueprint"` + OutputType string `json:"output-type"` + Targets []*target.Target `json:"targets"` + JobCreated time.Time `json:"job_created"` + JobStarted time.Time `json:"job_started"` + JobFinished time.Time `json:"job_finished"` } func newStore(initialState []byte, stateChannel chan<- []byte, pendingJobs chan<- job.Job, jobUpdates <-chan job.Status) *store { @@ -59,7 +64,7 @@ func newStore(initialState []byte, stateChannel chan<- []byte, pendingJobs chan< s.Workspace = make(map[string]blueprint) } if s.Composes == nil { - // TODO: push pending/running composes to workers again + // TODO: push waiting/running composes to workers again s.Composes = make(map[uuid.UUID]compose) } s.stateChannel = stateChannel @@ -74,7 +79,17 @@ func newStore(initialState []byte, stateChannel chan<- []byte, pendingJobs chan< if !exists { return } - compose.Status = update.Status + if compose.QueueStatus != update.Status { + switch update.Status { + case "RUNNING": + compose.JobStarted = time.Now() + case "FINISHED": + fallthrough + case "FAILED": + compose.JobFinished = time.Now() + } + compose.QueueStatus = update.Status + } }) } }() @@ -193,7 +208,13 @@ func (s *store) addCompose(composeID uuid.UUID, bp *blueprint, composeType strin target.NewLocalTarget(target.NewLocalTargetOptions("/var/lib/osbuild-composer/outputs/" + composeID.String())), } s.change(func() { - s.Composes[composeID] = compose{"pending", bp, targets} + s.Composes[composeID] = compose{ + QueueStatus: "WAITING", + Blueprint: bp, + OutputType: composeType, + Targets: targets, + JobCreated: time.Now(), + } }) s.pendingJobs <- job.Job{ ComposeID: composeID,