From f055ba2e07a74818f97bc69264923bd454025c03 Mon Sep 17 00:00:00 2001 From: Tom Gundersen Date: Thu, 24 Oct 2019 12:57:37 +0200 Subject: [PATCH] store: change semantics of queue states A job is now in "WAITING" state exactly when it is in the channel, once it is popped it enters "RUNNING" state. It is only possible to update the state of a job that is in the running state. This mean that updating to "RUNNING" is entirely optional, but in the future we may want to use this as a watchdog logic, and require the worker to update at regular intervals to avoid being restarted. The job queue API is updated to require a POST followed by one or several PATCH messages to the returned ID. If a patch is sent to an ID before the POST it is as if the object does not exist (regarldess of it being in the queue in WAITING state or not). Once a job has been POSTed it can be PATCHed to update it zero or more times with (still) RUNNING before exactly oncee with either FINISHED or FAILED. Signed-off-by: Tom Gundersen --- internal/jobqueue/api.go | 9 +++++--- internal/jobqueue/api_test.go | 28 +++++++++++++++--------- internal/store/store.go | 41 +++++++++++++++++++++++++---------- 3 files changed, 53 insertions(+), 25 deletions(-) diff --git a/internal/jobqueue/api.go b/internal/jobqueue/api.go index bfaa4f1e0..87489b0da 100644 --- a/internal/jobqueue/api.go +++ b/internal/jobqueue/api.go @@ -2,13 +2,14 @@ package jobqueue import ( "encoding/json" - "github.com/osbuild/osbuild-composer/internal/pipeline" - "github.com/osbuild/osbuild-composer/internal/store" - "github.com/osbuild/osbuild-composer/internal/target" "log" "net" "net/http" + "github.com/osbuild/osbuild-composer/internal/pipeline" + "github.com/osbuild/osbuild-composer/internal/store" + "github.com/osbuild/osbuild-composer/internal/target" + "github.com/google/uuid" "github.com/julienschmidt/httprouter" ) @@ -131,6 +132,8 @@ func (api *API) updateJobHandler(writer http.ResponseWriter, request *http.Reque case *store.NotFoundError: statusResponseError(writer, http.StatusNotFound, err.Error()) case *store.NotPendingError: + statusResponseError(writer, http.StatusNotFound, err.Error()) + case *store.NotRunningError: statusResponseError(writer, http.StatusBadRequest, err.Error()) case *store.InvalidRequestError: statusResponseError(writer, http.StatusBadRequest, err.Error()) diff --git a/internal/jobqueue/api_test.go b/internal/jobqueue/api_test.go index a5a486af5..21cedb28d 100644 --- a/internal/jobqueue/api_test.go +++ b/internal/jobqueue/api_test.go @@ -112,12 +112,16 @@ func testUpdateTransition(t *testing.T, from, to string, expectedStatus int) { store := store.New(nil) api := jobqueue.New(nil, store) - store.PushCompose(id, &blueprint.Blueprint{}, "tar") - - sendHTTP(api, "POST", "/job-queue/v1/jobs", `{}`) - if from != "WAITING" { - sendHTTP(api, "PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"`+from+`"}`) + if from != "VOID" { + store.PushCompose(id, &blueprint.Blueprint{}, "tar") + if from != "WAITING" { + sendHTTP(api, "POST", "/job-queue/v1/jobs", `{}`) + if from != "RUNNING" { + sendHTTP(api, "PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"`+from+`"}`) + } + } } + testRoute(t, api, "PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"`+to+`"}`, expectedStatus, ``) } @@ -127,11 +131,15 @@ func TestUpdate(t *testing.T) { To string ExpectedStatus int }{ - {"WAITING", "WAITING", http.StatusBadRequest}, - {"WAITING", "RUNNING", http.StatusOK}, - {"WAITING", "FINISHED", http.StatusOK}, - {"WAITING", "FAILED", http.StatusOK}, - {"RUNNING", "RUNNING", http.StatusOK}, + {"VOID", "WAITING", http.StatusNotFound}, + {"VOID", "RUNNING", http.StatusNotFound}, + {"VOID", "FINISHED", http.StatusNotFound}, + {"VOID", "FAILED", http.StatusNotFound}, + {"WAITING", "WAITING", http.StatusNotFound}, + {"WAITING", "RUNNING", http.StatusNotFound}, + {"WAITING", "FINISHED", http.StatusNotFound}, + {"WAITING", "FAILED", http.StatusNotFound}, + {"RUNNING", "WAITING", http.StatusBadRequest}, {"RUNNING", "RUNNING", http.StatusOK}, {"RUNNING", "FINISHED", http.StatusOK}, {"RUNNING", "FAILED", http.StatusOK}, diff --git a/internal/store/store.go b/internal/store/store.go index 94d02c7ea..603fb9764 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -4,9 +4,6 @@ package store import ( "encoding/json" - "github.com/osbuild/osbuild-composer/internal/blueprint" - "github.com/osbuild/osbuild-composer/internal/pipeline" - "github.com/osbuild/osbuild-composer/internal/target" "io/ioutil" "log" "os" @@ -15,6 +12,10 @@ import ( "sync" "time" + "github.com/osbuild/osbuild-composer/internal/blueprint" + "github.com/osbuild/osbuild-composer/internal/pipeline" + "github.com/osbuild/osbuild-composer/internal/target" + "github.com/google/uuid" ) @@ -72,6 +73,14 @@ func (e *NotPendingError) Error() string { return e.message } +type NotRunningError struct { + message string +} + +func (e *NotRunningError) Error() string { + return e.message +} + type InvalidRequestError struct { message string } @@ -368,7 +377,18 @@ func (s *Store) PushCompose(composeID uuid.UUID, bp *blueprint.Blueprint, compos } func (s *Store) PopCompose() Job { - return <-s.pendingJobs + job := <-s.pendingJobs + s.change(func() error { + compose, exists := s.Composes[job.ComposeID] + if !exists || compose.QueueStatus != "WAITING" { + panic("Invalid job in queue.") + } + compose.JobStarted = time.Now() + compose.QueueStatus = "RUNNING" + s.Composes[job.ComposeID] = compose + return nil + }) + return job } func (s *Store) UpdateCompose(composeID uuid.UUID, status string) error { @@ -377,25 +397,22 @@ func (s *Store) UpdateCompose(composeID uuid.UUID, status string) error { if !exists { return &NotFoundError{"compose does not exist"} } + if compose.QueueStatus == "WAITING" { + return &NotPendingError{"compose has not been popped"} + } switch status { case "RUNNING": switch compose.QueueStatus { - case "WAITING": - compose.JobStarted = time.Now() case "RUNNING": default: - return &NotPendingError{"compose was not pending"} + return &NotRunningError{"compose was not running"} } case "FINISHED", "FAILED": switch compose.QueueStatus { - case "WAITING": - now := time.Now() - compose.JobStarted = now - compose.JobFinished = now case "RUNNING": compose.JobFinished = time.Now() default: - return &NotPendingError{"compose was not pending"} + return &NotRunningError{"compose was not running"} } compose.QueueStatus = status s.Composes[composeID] = compose