worker: Introduce heartbeats

An occupied worker checks about every 15 seconds if it's current job was
cancelled. Use this to introduce a heartbeat mechanism, where if
composer hasn't heard from the worker in 2 minutes, the job times out
and is set to fail.
This commit is contained in:
sanne 2021-07-05 11:45:16 +02:00 committed by Tom Gundersen
parent 0fcb44e617
commit 4385c39d66
6 changed files with 166 additions and 46 deletions

View file

@ -0,0 +1,9 @@
# Workers: heartbeat
Workers check in with composer every 15 seconds to see if their job hasn't been
cancelled. We can use this to introduce a heartbeat. If the worker fails to
check in for over 2 minutes, composer assumes the worker crashed or was stopped,
marking the job as failed.
This will mitigate the issue where jobs who had their worker crash or stopped,
would remain in a 'building' state forever.

View file

@ -51,6 +51,7 @@ type fsJobQueue struct {
// and renamed to `$STATE_DIRECTORY/artifacts/$JOB_ID` once the job is
// reported as done.
jobIdByToken map[uuid.UUID]uuid.UUID
heartbeats map[uuid.UUID]time.Time // token -> heartbeat
}
// On-disk job struct. Contains all necessary (but non-redundant) information
@ -84,6 +85,7 @@ func New(dir string) (*fsJobQueue, error) {
pending: make(map[string]chan uuid.UUID),
dependants: make(map[uuid.UUID][]uuid.UUID),
jobIdByToken: make(map[uuid.UUID]uuid.UUID),
heartbeats: make(map[uuid.UUID]time.Time),
}
// Look for jobs that are still pending and build the dependant map.
@ -112,6 +114,7 @@ func New(dir string) (*fsJobQueue, error) {
}
} else {
q.jobIdByToken[j.Token] = j.Id
q.heartbeats[j.Token] = time.Now()
}
}
@ -226,6 +229,7 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID,
j.Token = uuid.New()
q.jobIdByToken[j.Token] = j.Id
q.heartbeats[j.Token] = time.Now()
err := q.db.Write(j.Id.String(), j)
if err != nil {
@ -259,6 +263,7 @@ func (q *fsJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
return fmt.Errorf("error marshaling result: %v", err)
}
delete(q.heartbeats, j.Token)
delete(q.jobIdByToken, j.Token)
j.Token = uuid.Nil
@ -298,6 +303,8 @@ func (q *fsJobQueue) CancelJob(id uuid.UUID) error {
j.Canceled = true
delete(q.heartbeats, j.Token)
err = q.db.Write(id.String(), j)
if err != nil {
return fmt.Errorf("error writing job %s: %v", id, err)
@ -345,6 +352,28 @@ func (q *fsJobQueue) IdFromToken(token uuid.UUID) (id uuid.UUID, err error) {
return id, nil
}
// Retrieve a list of tokens tied to jobs, which most recent action has been
// olderThan time ago
func (q *fsJobQueue) Heartbeats(olderThan time.Duration) (tokens []uuid.UUID) {
q.mu.Lock()
defer q.mu.Unlock()
now := time.Now()
for token, hb := range q.heartbeats {
if now.Sub(hb) > olderThan {
tokens = append(tokens, token)
}
}
return
}
func (q *fsJobQueue) RefreshHeartbeat(token uuid.UUID) {
q.mu.Lock()
defer q.mu.Unlock()
if token != uuid.Nil {
q.heartbeats[token] = time.Now()
}
}
// Reads job with `id`. This is a thin wrapper around `q.db.Read`, which
// returns the job directly, or and error if a job with `id` does not exist.
func (q *fsJobQueue) readJob(id uuid.UUID) (*job, error) {

View file

@ -338,3 +338,33 @@ func TestCancel(t *testing.T) {
err = json.Unmarshal(result, &testResult{})
require.NoError(t, err)
}
func TestHeartbeats(t *testing.T) {
q, dir := newTemporaryQueue(t)
defer cleanupTempDir(t, dir)
id := pushTestJob(t, q, "octopus", nil, nil)
// No heartbeats for queued job
require.Empty(t, q.Heartbeats(time.Second*0))
r, tok, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"})
require.NoError(t, err)
require.Equal(t, id, r)
require.NotEmpty(t, tok)
tokens := q.Heartbeats(time.Second * 0)
require.Contains(t, tokens, tok)
require.Empty(t, q.Heartbeats(time.Hour*24))
id2, err := q.IdFromToken(tok)
require.NoError(t, err)
require.Equal(t, id, id2)
err = q.FinishJob(id, &testResult{})
require.NoError(t, err)
// No heartbeats for finished job
require.Empty(t, q.Heartbeats(time.Second*0))
_, err = q.IdFromToken(tok)
require.Equal(t, jobqueue.ErrNotExist, err)
}

View file

@ -65,6 +65,12 @@ type JobQueue interface {
// Find job by token, this will return an error if the job hasn't been dequeued
IdFromToken(token uuid.UUID) (id uuid.UUID, err error)
// Get a list of tokens which haven't been updated in the specified time frame
Heartbeats(olderThan time.Duration) (tokens []uuid.UUID)
// Reset the last heartbeat time to time.Now()
RefreshHeartbeat(token uuid.UUID)
}
var (

View file

@ -40,13 +40,14 @@ var ErrInvalidToken = errors.New("token does not exist")
var ErrJobNotRunning = errors.New("job isn't running")
func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, artifactsDir string, identityFilter []string) *Server {
return &Server{
s := &Server{
jobs: jobs,
logger: logger,
artifactsDir: artifactsDir,
identityFilter: identityFilter,
}
go s.WatchHeartbeats()
return s
}
func (s *Server) Handler() http.Handler {
@ -111,6 +112,23 @@ func (s *Server) VerifyIdentityHeader(nextHandler echo.HandlerFunc) echo.Handler
}
}
// This function should be started as a goroutine
// Every 30 seconds it goes through all running jobs, removing any unresponsive ones.
// It fails jobs which fail to check if they cancelled for more than 2 minutes.
func (s *Server) WatchHeartbeats() {
//nolint:staticcheck // avoid SA1015, this is an endless function
for range time.Tick(time.Second * 30) {
for _, token := range s.jobs.Heartbeats(time.Second * 120) {
id, _ := s.jobs.IdFromToken(token)
log.Printf("Removing unresponsive job: %s\n", id)
err := s.FinishJob(token, nil)
if err != nil {
log.Printf("Error finishing unresponsive job: %v", err)
}
}
}
}
func (s *Server) EnqueueOSBuild(arch string, job *OSBuildJob) (uuid.UUID, error) {
return s.jobs.Enqueue("osbuild:"+arch, job, nil)
}
@ -355,6 +373,8 @@ func (h *apiHandlers) GetJob(ctx echo.Context, tokenstr string) error {
return ctx.JSON(http.StatusOK, getJobResponse{})
}
h.server.jobs.RefreshHeartbeat(token)
status, _, err := h.server.JobStatus(jobId, &json.RawMessage{})
if err != nil {
return err

View file

@ -530,56 +530,82 @@ esac
# the server's response in case of an error.
#
OUTPUT=$(curl \
--silent \
--show-error \
--cacert /etc/osbuild-composer/ca-crt.pem \
--key /etc/osbuild-composer/client-key.pem \
--cert /etc/osbuild-composer/client-crt.pem \
--header 'Content-Type: application/json' \
--request POST \
--data @"$REQUEST_FILE" \
https://localhost/api/composer/v1/compose)
function sendCompose() {
OUTPUT=$(curl \
--silent \
--show-error \
--cacert /etc/osbuild-composer/ca-crt.pem \
--key /etc/osbuild-composer/client-key.pem \
--cert /etc/osbuild-composer/client-crt.pem \
--header 'Content-Type: application/json' \
--request POST \
--data @"$REQUEST_FILE" \
https://localhost/api/composer/v1/compose)
COMPOSE_ID=$(echo "$OUTPUT" | jq -r '.id')
}
COMPOSE_ID=$(echo "$OUTPUT" | jq -r '.id')
function waitForState() {
local DESIRED_STATE="${1:-success}"
while true
do
OUTPUT=$(curl \
--silent \
--show-error \
--cacert /etc/osbuild-composer/ca-crt.pem \
--key /etc/osbuild-composer/client-key.pem \
--cert /etc/osbuild-composer/client-crt.pem \
https://localhost/api/composer/v1/compose/"$COMPOSE_ID")
while true
do
OUTPUT=$(curl \
--silent \
--show-error \
--cacert /etc/osbuild-composer/ca-crt.pem \
--key /etc/osbuild-composer/client-key.pem \
--cert /etc/osbuild-composer/client-crt.pem \
https://localhost/api/composer/v1/compose/"$COMPOSE_ID")
COMPOSE_STATUS=$(echo "$OUTPUT" | jq -r '.image_status.status')
UPLOAD_STATUS=$(echo "$OUTPUT" | jq -r '.image_status.upload_status.status')
UPLOAD_TYPE=$(echo "$OUTPUT" | jq -r '.image_status.upload_status.type')
UPLOAD_OPTIONS=$(echo "$OUTPUT" | jq -r '.image_status.upload_status.options')
COMPOSE_STATUS=$(echo "$OUTPUT" | jq -r '.image_status.status')
UPLOAD_STATUS=$(echo "$OUTPUT" | jq -r '.image_status.upload_status.status')
UPLOAD_TYPE=$(echo "$OUTPUT" | jq -r '.image_status.upload_status.type')
UPLOAD_OPTIONS=$(echo "$OUTPUT" | jq -r '.image_status.upload_status.options')
case "$COMPOSE_STATUS" in
"$DESIRED_STATE")
break
;;
# all valid status values for a compose which hasn't finished yet
"pending"|"building"|"uploading"|"registering")
;;
# default undesired state
"failure")
echo "Image compose failed"
exit 1
;;
*)
echo "API returned unexpected image_status.status value: '$COMPOSE_STATUS'"
exit 1
;;
esac
case "$COMPOSE_STATUS" in
# valid status values for compose which is not yet finished
"pending"|"building"|"uploading"|"registering")
;;
"success")
test "$UPLOAD_STATUS" = "success"
test "$UPLOAD_TYPE" = "$CLOUD_PROVIDER"
break
;;
"failure")
echo "Image compose failed"
exit 1
;;
*)
echo "API returned unexpected image_status.status value: '$COMPOSE_STATUS'"
exit 1
;;
esac
sleep 30
done
}
# a pending shouldn't state shouldn't trip up the heartbeats
sudo systemctl stop "osbuild-worker@*"
sendCompose
waitForState "pending"
# jobs time out after 2 minutes, so 180 seconds gives ample time to make sure it
# doesn't time out for pending jobs
sleep 180
waitForState "pending"
# crashed/stopped/killed worker should result in a failed state
sudo systemctl start "osbuild-worker@1"
waitForState "building"
sudo systemctl stop "osbuild-worker@*"
waitForState "failure"
sudo systemctl start "osbuild-worker@1"
# full integration case
sendCompose
waitForState
test "$UPLOAD_STATUS" = "success"
test "$UPLOAD_TYPE" = "$CLOUD_PROVIDER"
sleep 30
done
#
# Verify the Cloud-provider specific upload_status options