From 4385c39d66b7ed65514722110fa4fde399771555 Mon Sep 17 00:00:00 2001 From: sanne Date: Mon, 5 Jul 2021 11:45:16 +0200 Subject: [PATCH] worker: Introduce heartbeats An occupied worker checks about every 15 seconds if it's current job was cancelled. Use this to introduce a heartbeat mechanism, where if composer hasn't heard from the worker in 2 minutes, the job times out and is set to fail. --- docs/news/unreleased/worker-heartbeat.md | 9 ++ internal/jobqueue/fsjobqueue/fsjobqueue.go | 29 +++++ .../jobqueue/fsjobqueue/fsjobqueue_test.go | 30 +++++ internal/jobqueue/jobqueue.go | 6 + internal/worker/server.go | 24 +++- test/cases/api.sh | 114 +++++++++++------- 6 files changed, 166 insertions(+), 46 deletions(-) create mode 100644 docs/news/unreleased/worker-heartbeat.md diff --git a/docs/news/unreleased/worker-heartbeat.md b/docs/news/unreleased/worker-heartbeat.md new file mode 100644 index 000000000..2c91e1655 --- /dev/null +++ b/docs/news/unreleased/worker-heartbeat.md @@ -0,0 +1,9 @@ +# Workers: heartbeat + +Workers check in with composer every 15 seconds to see if their job hasn't been +cancelled. We can use this to introduce a heartbeat. If the worker fails to +check in for over 2 minutes, composer assumes the worker crashed or was stopped, +marking the job as failed. + +This will mitigate the issue where jobs who had their worker crash or stopped, +would remain in a 'building' state forever. diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index ba7f69646..5f52f06fe 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -51,6 +51,7 @@ type fsJobQueue struct { // and renamed to `$STATE_DIRECTORY/artifacts/$JOB_ID` once the job is // reported as done. jobIdByToken map[uuid.UUID]uuid.UUID + heartbeats map[uuid.UUID]time.Time // token -> heartbeat } // On-disk job struct. Contains all necessary (but non-redundant) information @@ -84,6 +85,7 @@ func New(dir string) (*fsJobQueue, error) { pending: make(map[string]chan uuid.UUID), dependants: make(map[uuid.UUID][]uuid.UUID), jobIdByToken: make(map[uuid.UUID]uuid.UUID), + heartbeats: make(map[uuid.UUID]time.Time), } // Look for jobs that are still pending and build the dependant map. @@ -112,6 +114,7 @@ func New(dir string) (*fsJobQueue, error) { } } else { q.jobIdByToken[j.Token] = j.Id + q.heartbeats[j.Token] = time.Now() } } @@ -226,6 +229,7 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, j.Token = uuid.New() q.jobIdByToken[j.Token] = j.Id + q.heartbeats[j.Token] = time.Now() err := q.db.Write(j.Id.String(), j) if err != nil { @@ -259,6 +263,7 @@ func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error { return fmt.Errorf("error marshaling result: %v", err) } + delete(q.heartbeats, j.Token) delete(q.jobIdByToken, j.Token) j.Token = uuid.Nil @@ -298,6 +303,8 @@ func (q *fsJobQueue) CancelJob(id uuid.UUID) error { j.Canceled = true + delete(q.heartbeats, j.Token) + err = q.db.Write(id.String(), j) if err != nil { return fmt.Errorf("error writing job %s: %v", id, err) @@ -345,6 +352,28 @@ func (q *fsJobQueue) IdFromToken(token uuid.UUID) (id uuid.UUID, err error) { return id, nil } +// Retrieve a list of tokens tied to jobs, which most recent action has been +// olderThan time ago +func (q *fsJobQueue) Heartbeats(olderThan time.Duration) (tokens []uuid.UUID) { + q.mu.Lock() + defer q.mu.Unlock() + now := time.Now() + for token, hb := range q.heartbeats { + if now.Sub(hb) > olderThan { + tokens = append(tokens, token) + } + } + return +} + +func (q *fsJobQueue) RefreshHeartbeat(token uuid.UUID) { + q.mu.Lock() + defer q.mu.Unlock() + if token != uuid.Nil { + q.heartbeats[token] = time.Now() + } +} + // Reads job with `id`. This is a thin wrapper around `q.db.Read`, which // returns the job directly, or and error if a job with `id` does not exist. func (q *fsJobQueue) readJob(id uuid.UUID) (*job, error) { diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go index cf7dee86a..aaffa5ea7 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go @@ -338,3 +338,33 @@ func TestCancel(t *testing.T) { err = json.Unmarshal(result, &testResult{}) require.NoError(t, err) } + +func TestHeartbeats(t *testing.T) { + q, dir := newTemporaryQueue(t) + defer cleanupTempDir(t, dir) + + id := pushTestJob(t, q, "octopus", nil, nil) + // No heartbeats for queued job + require.Empty(t, q.Heartbeats(time.Second*0)) + + r, tok, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}) + require.NoError(t, err) + require.Equal(t, id, r) + require.NotEmpty(t, tok) + + tokens := q.Heartbeats(time.Second * 0) + require.Contains(t, tokens, tok) + require.Empty(t, q.Heartbeats(time.Hour*24)) + + id2, err := q.IdFromToken(tok) + require.NoError(t, err) + require.Equal(t, id, id2) + + err = q.FinishJob(id, &testResult{}) + require.NoError(t, err) + + // No heartbeats for finished job + require.Empty(t, q.Heartbeats(time.Second*0)) + _, err = q.IdFromToken(tok) + require.Equal(t, jobqueue.ErrNotExist, err) +} diff --git a/internal/jobqueue/jobqueue.go b/internal/jobqueue/jobqueue.go index 7bc217034..a5a5f47a6 100644 --- a/internal/jobqueue/jobqueue.go +++ b/internal/jobqueue/jobqueue.go @@ -65,6 +65,12 @@ type JobQueue interface { // Find job by token, this will return an error if the job hasn't been dequeued IdFromToken(token uuid.UUID) (id uuid.UUID, err error) + + // Get a list of tokens which haven't been updated in the specified time frame + Heartbeats(olderThan time.Duration) (tokens []uuid.UUID) + + // Reset the last heartbeat time to time.Now() + RefreshHeartbeat(token uuid.UUID) } var ( diff --git a/internal/worker/server.go b/internal/worker/server.go index d817af22a..acb6fcb99 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -40,13 +40,14 @@ var ErrInvalidToken = errors.New("token does not exist") var ErrJobNotRunning = errors.New("job isn't running") func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, artifactsDir string, identityFilter []string) *Server { - - return &Server{ + s := &Server{ jobs: jobs, logger: logger, artifactsDir: artifactsDir, identityFilter: identityFilter, } + go s.WatchHeartbeats() + return s } func (s *Server) Handler() http.Handler { @@ -111,6 +112,23 @@ func (s *Server) VerifyIdentityHeader(nextHandler echo.HandlerFunc) echo.Handler } } +// This function should be started as a goroutine +// Every 30 seconds it goes through all running jobs, removing any unresponsive ones. +// It fails jobs which fail to check if they cancelled for more than 2 minutes. +func (s *Server) WatchHeartbeats() { + //nolint:staticcheck // avoid SA1015, this is an endless function + for range time.Tick(time.Second * 30) { + for _, token := range s.jobs.Heartbeats(time.Second * 120) { + id, _ := s.jobs.IdFromToken(token) + log.Printf("Removing unresponsive job: %s\n", id) + err := s.FinishJob(token, nil) + if err != nil { + log.Printf("Error finishing unresponsive job: %v", err) + } + } + } +} + func (s *Server) EnqueueOSBuild(arch string, job *OSBuildJob) (uuid.UUID, error) { return s.jobs.Enqueue("osbuild:"+arch, job, nil) } @@ -355,6 +373,8 @@ func (h *apiHandlers) GetJob(ctx echo.Context, tokenstr string) error { return ctx.JSON(http.StatusOK, getJobResponse{}) } + h.server.jobs.RefreshHeartbeat(token) + status, _, err := h.server.JobStatus(jobId, &json.RawMessage{}) if err != nil { return err diff --git a/test/cases/api.sh b/test/cases/api.sh index 54f91dbdc..d7546f166 100755 --- a/test/cases/api.sh +++ b/test/cases/api.sh @@ -530,56 +530,82 @@ esac # the server's response in case of an error. # -OUTPUT=$(curl \ - --silent \ - --show-error \ - --cacert /etc/osbuild-composer/ca-crt.pem \ - --key /etc/osbuild-composer/client-key.pem \ - --cert /etc/osbuild-composer/client-crt.pem \ - --header 'Content-Type: application/json' \ - --request POST \ - --data @"$REQUEST_FILE" \ - https://localhost/api/composer/v1/compose) +function sendCompose() { + OUTPUT=$(curl \ + --silent \ + --show-error \ + --cacert /etc/osbuild-composer/ca-crt.pem \ + --key /etc/osbuild-composer/client-key.pem \ + --cert /etc/osbuild-composer/client-crt.pem \ + --header 'Content-Type: application/json' \ + --request POST \ + --data @"$REQUEST_FILE" \ + https://localhost/api/composer/v1/compose) + COMPOSE_ID=$(echo "$OUTPUT" | jq -r '.id') +} -COMPOSE_ID=$(echo "$OUTPUT" | jq -r '.id') +function waitForState() { + local DESIRED_STATE="${1:-success}" + while true + do + OUTPUT=$(curl \ + --silent \ + --show-error \ + --cacert /etc/osbuild-composer/ca-crt.pem \ + --key /etc/osbuild-composer/client-key.pem \ + --cert /etc/osbuild-composer/client-crt.pem \ + https://localhost/api/composer/v1/compose/"$COMPOSE_ID") -while true -do - OUTPUT=$(curl \ - --silent \ - --show-error \ - --cacert /etc/osbuild-composer/ca-crt.pem \ - --key /etc/osbuild-composer/client-key.pem \ - --cert /etc/osbuild-composer/client-crt.pem \ - https://localhost/api/composer/v1/compose/"$COMPOSE_ID") + COMPOSE_STATUS=$(echo "$OUTPUT" | jq -r '.image_status.status') + UPLOAD_STATUS=$(echo "$OUTPUT" | jq -r '.image_status.upload_status.status') + UPLOAD_TYPE=$(echo "$OUTPUT" | jq -r '.image_status.upload_status.type') + UPLOAD_OPTIONS=$(echo "$OUTPUT" | jq -r '.image_status.upload_status.options') - COMPOSE_STATUS=$(echo "$OUTPUT" | jq -r '.image_status.status') - UPLOAD_STATUS=$(echo "$OUTPUT" | jq -r '.image_status.upload_status.status') - UPLOAD_TYPE=$(echo "$OUTPUT" | jq -r '.image_status.upload_status.type') - UPLOAD_OPTIONS=$(echo "$OUTPUT" | jq -r '.image_status.upload_status.options') + case "$COMPOSE_STATUS" in + "$DESIRED_STATE") + break + ;; + # all valid status values for a compose which hasn't finished yet + "pending"|"building"|"uploading"|"registering") + ;; + # default undesired state + "failure") + echo "Image compose failed" + exit 1 + ;; + *) + echo "API returned unexpected image_status.status value: '$COMPOSE_STATUS'" + exit 1 + ;; + esac - case "$COMPOSE_STATUS" in - # valid status values for compose which is not yet finished - "pending"|"building"|"uploading"|"registering") - ;; - "success") - test "$UPLOAD_STATUS" = "success" - test "$UPLOAD_TYPE" = "$CLOUD_PROVIDER" - break - ;; - "failure") - echo "Image compose failed" - exit 1 - ;; - *) - echo "API returned unexpected image_status.status value: '$COMPOSE_STATUS'" - exit 1 - ;; - esac + sleep 30 + done +} + +# a pending shouldn't state shouldn't trip up the heartbeats +sudo systemctl stop "osbuild-worker@*" +sendCompose +waitForState "pending" +# jobs time out after 2 minutes, so 180 seconds gives ample time to make sure it +# doesn't time out for pending jobs +sleep 180 +waitForState "pending" + +# crashed/stopped/killed worker should result in a failed state +sudo systemctl start "osbuild-worker@1" +waitForState "building" +sudo systemctl stop "osbuild-worker@*" +waitForState "failure" +sudo systemctl start "osbuild-worker@1" + +# full integration case +sendCompose +waitForState +test "$UPLOAD_STATUS" = "success" +test "$UPLOAD_TYPE" = "$CLOUD_PROVIDER" - sleep 30 -done # # Verify the Cloud-provider specific upload_status options