store: change semantics of queue states

A job is now in "WAITING" state exactly when it is in the channel,
once it is popped it enters "RUNNING" state. It is only possible
to update the state of a job that is in the running state.

This mean that updating to "RUNNING" is entirely optional, but in
the future we may want to use this as a watchdog logic, and require
the worker to update at regular intervals to avoid being restarted.

The job queue API is updated to require a POST followed by one
or several PATCH messages to the returned ID. If a patch is sent
to an ID before the POST it is as if the object does not exist
(regarldess of it being in the queue in WAITING state or not).

Once a job has been POSTed it can be PATCHed to update it zero or
more times with (still) RUNNING before exactly oncee with either
FINISHED or FAILED.

Signed-off-by: Tom Gundersen <teg@jklm.no>
This commit is contained in:
Tom Gundersen 2019-10-24 12:57:37 +02:00
parent 967b0e8ce0
commit f055ba2e07
3 changed files with 53 additions and 25 deletions

View file

@ -2,13 +2,14 @@ package jobqueue
import (
"encoding/json"
"github.com/osbuild/osbuild-composer/internal/pipeline"
"github.com/osbuild/osbuild-composer/internal/store"
"github.com/osbuild/osbuild-composer/internal/target"
"log"
"net"
"net/http"
"github.com/osbuild/osbuild-composer/internal/pipeline"
"github.com/osbuild/osbuild-composer/internal/store"
"github.com/osbuild/osbuild-composer/internal/target"
"github.com/google/uuid"
"github.com/julienschmidt/httprouter"
)
@ -131,6 +132,8 @@ func (api *API) updateJobHandler(writer http.ResponseWriter, request *http.Reque
case *store.NotFoundError:
statusResponseError(writer, http.StatusNotFound, err.Error())
case *store.NotPendingError:
statusResponseError(writer, http.StatusNotFound, err.Error())
case *store.NotRunningError:
statusResponseError(writer, http.StatusBadRequest, err.Error())
case *store.InvalidRequestError:
statusResponseError(writer, http.StatusBadRequest, err.Error())

View file

@ -112,12 +112,16 @@ func testUpdateTransition(t *testing.T, from, to string, expectedStatus int) {
store := store.New(nil)
api := jobqueue.New(nil, store)
store.PushCompose(id, &blueprint.Blueprint{}, "tar")
sendHTTP(api, "POST", "/job-queue/v1/jobs", `{}`)
if from != "WAITING" {
sendHTTP(api, "PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"`+from+`"}`)
if from != "VOID" {
store.PushCompose(id, &blueprint.Blueprint{}, "tar")
if from != "WAITING" {
sendHTTP(api, "POST", "/job-queue/v1/jobs", `{}`)
if from != "RUNNING" {
sendHTTP(api, "PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"`+from+`"}`)
}
}
}
testRoute(t, api, "PATCH", "/job-queue/v1/jobs/ffffffff-ffff-ffff-ffff-ffffffffffff", `{"status":"`+to+`"}`, expectedStatus, ``)
}
@ -127,11 +131,15 @@ func TestUpdate(t *testing.T) {
To string
ExpectedStatus int
}{
{"WAITING", "WAITING", http.StatusBadRequest},
{"WAITING", "RUNNING", http.StatusOK},
{"WAITING", "FINISHED", http.StatusOK},
{"WAITING", "FAILED", http.StatusOK},
{"RUNNING", "RUNNING", http.StatusOK},
{"VOID", "WAITING", http.StatusNotFound},
{"VOID", "RUNNING", http.StatusNotFound},
{"VOID", "FINISHED", http.StatusNotFound},
{"VOID", "FAILED", http.StatusNotFound},
{"WAITING", "WAITING", http.StatusNotFound},
{"WAITING", "RUNNING", http.StatusNotFound},
{"WAITING", "FINISHED", http.StatusNotFound},
{"WAITING", "FAILED", http.StatusNotFound},
{"RUNNING", "WAITING", http.StatusBadRequest},
{"RUNNING", "RUNNING", http.StatusOK},
{"RUNNING", "FINISHED", http.StatusOK},
{"RUNNING", "FAILED", http.StatusOK},