diff --git a/cmd/osbuild-service-maintenance/db_test.go b/cmd/osbuild-service-maintenance/db_test.go index ec45a7112..517645320 100644 --- a/cmd/osbuild-service-maintenance/db_test.go +++ b/cmd/osbuild-service-maintenance/db_test.go @@ -44,7 +44,7 @@ func testDeleteJob(t *testing.T, d db, q *dbjobqueue.DBJobQueue) { id, err := q.Enqueue("octopus", nil, nil, "") require.NoError(t, err) require.NotEqual(t, uuid.Nil, id) - _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) + _, _, _, _, _, err = q.Dequeue(context.Background(), uuid.Nil, []string{"octopus"}, []string{""}) require.NoError(t, err) type Result struct { diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index 57252cb61..69b7ca8ea 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -56,6 +56,15 @@ type fsJobQueue struct { // reported as done. jobIdByToken map[uuid.UUID]uuid.UUID heartbeats map[uuid.UUID]time.Time // token -> heartbeat + + workerIDByToken map[uuid.UUID]uuid.UUID // token -> workerID + workers map[uuid.UUID]worker +} + +type worker struct { + Arch string `json:"arch"` + Heartbeat time.Time `json:"heartbeat"` + Tokens map[uuid.UUID]struct{} } // On-disk job struct. Contains all necessary (but non-redundant) information @@ -85,12 +94,14 @@ type job struct { // loaded and rescheduled to run if necessary. func New(dir string) (*fsJobQueue, error) { q := &fsJobQueue{ - db: jsondb.New(dir, 0600), - pending: list.New(), - dependants: make(map[uuid.UUID][]uuid.UUID), - jobIdByToken: make(map[uuid.UUID]uuid.UUID), - heartbeats: make(map[uuid.UUID]time.Time), - listeners: make(map[chan struct{}]struct{}), + db: jsondb.New(dir, 0600), + pending: list.New(), + dependants: make(map[uuid.UUID][]uuid.UUID), + jobIdByToken: make(map[uuid.UUID]uuid.UUID), + heartbeats: make(map[uuid.UUID]time.Time), + listeners: make(map[chan struct{}]struct{}), + workers: make(map[uuid.UUID]worker), + workerIDByToken: make(map[uuid.UUID]uuid.UUID), } // Look for jobs that are still pending and build the dependant map. @@ -186,7 +197,7 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu return j.Id, nil } -func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { +func (q *fsJobQueue) Dequeue(ctx context.Context, wID uuid.UUID, jobTypes, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { q.mu.Lock() defer q.mu.Unlock() @@ -231,6 +242,10 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels [] j.Token = uuid.New() q.jobIdByToken[j.Token] = j.Id q.heartbeats[j.Token] = time.Now() + if _, ok := q.workers[wID]; ok { + q.workers[wID].Tokens[j.Token] = struct{}{} + q.workerIDByToken[j.Token] = wID + } err := q.db.Write(j.Id.String(), j) if err != nil { @@ -240,7 +255,7 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels [] return j.Id, j.Token, j.Dependencies, j.Type, j.Args, nil } -func (q *fsJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { +func (q *fsJobQueue) DequeueByID(ctx context.Context, id, wID uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { q.mu.Lock() defer q.mu.Unlock() @@ -268,6 +283,10 @@ func (q *fsJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, j.Token = uuid.New() q.jobIdByToken[j.Token] = j.Id q.heartbeats[j.Token] = time.Now() + if _, ok := q.workers[wID]; ok { + q.workers[wID].Tokens[j.Token] = struct{}{} + q.workerIDByToken[j.Token] = wID + } err = q.db.Write(j.Id.String(), j) if err != nil { @@ -296,6 +315,10 @@ func (q *fsJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result delete(q.jobIdByToken, j.Token) delete(q.heartbeats, j.Token) + if wID, ok := q.workerIDByToken[j.Token]; ok { + delete(q.workers[wID].Tokens, j.Token) + delete(q.workerIDByToken, j.Token) + } if j.Retries >= maxRetries { j.FinishedAt = time.Now() @@ -444,6 +467,62 @@ func (q *fsJobQueue) RefreshHeartbeat(token uuid.UUID) { } } +func (q *fsJobQueue) InsertWorker(arch string) (uuid.UUID, error) { + q.mu.Lock() + defer q.mu.Unlock() + + wID := uuid.New() + q.workers[wID] = worker{ + Arch: arch, + Heartbeat: time.Now(), + Tokens: make(map[uuid.UUID]struct{}), + } + return wID, nil +} + +func (q *fsJobQueue) UpdateWorkerStatus(wID uuid.UUID) error { + q.mu.Lock() + defer q.mu.Unlock() + + worker, ok := q.workers[wID] + if !ok { + return jobqueue.ErrWorkerNotExist + } + + worker.Heartbeat = time.Now() + return nil +} + +func (q *fsJobQueue) Workers(olderThan time.Duration) ([]uuid.UUID, error) { + q.mu.Lock() + defer q.mu.Unlock() + + now := time.Now() + wIDs := []uuid.UUID{} + for wID, worker := range q.workers { + if now.Sub(worker.Heartbeat) > olderThan { + wIDs = append(wIDs, wID) + } + } + return wIDs, nil +} + +func (q *fsJobQueue) DeleteWorker(wID uuid.UUID) error { + q.mu.Lock() + defer q.mu.Unlock() + + worker, ok := q.workers[wID] + if !ok { + return jobqueue.ErrWorkerNotExist + } + + if len(worker.Tokens) != 0 { + return jobqueue.ErrActiveJobs + } + delete(q.workers, wID) + return nil +} + // Reads job with `id`. This is a thin wrapper around `q.db.Read`, which // returns the job directly, or and error if a job with `id` does not exist. func (q *fsJobQueue) readJob(id uuid.UUID) (*job, error) { diff --git a/internal/jobqueue/jobqueuetest/jobqueuetest.go b/internal/jobqueue/jobqueuetest/jobqueuetest.go index fa2d688e8..a558bd960 100644 --- a/internal/jobqueue/jobqueuetest/jobqueuetest.go +++ b/internal/jobqueue/jobqueuetest/jobqueuetest.go @@ -44,6 +44,7 @@ func TestJobQueue(t *testing.T, makeJobQueue MakeJobQueue) { t.Run("dequeue-by-id", wrap(testDequeueByID)) t.Run("multiple-channels", wrap(testMultipleChannels)) t.Run("100-dequeuers", wrap(test100dequeuers)) + t.Run("workers", wrap(testWorkers)) } func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID, channel string) uuid.UUID { @@ -55,7 +56,7 @@ func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interfa } func finishNextTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, result interface{}, deps []uuid.UUID) uuid.UUID { - id, tok, d, typ, args, err := q.Dequeue(context.Background(), []string{jobType}, []string{""}) + id, tok, d, typ, args, err := q.Dequeue(context.Background(), uuid.Nil, []string{jobType}, []string{""}) require.NoError(t, err) require.NotEmpty(t, id) require.NotEmpty(t, tok) @@ -82,7 +83,7 @@ func testErrors(t *testing.T, q jobqueue.JobQueue) { // token gets removed pushTestJob(t, q, "octopus", nil, nil, "") - id, tok, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) + id, tok, _, _, _, err := q.Dequeue(context.Background(), uuid.Nil, []string{"octopus"}, []string{""}) require.NoError(t, err) require.NotEmpty(t, tok) @@ -133,7 +134,7 @@ func testArgs(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, "kingfisher", jchan) require.Equal(t, "octopus", jtype) - id, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{"kingfisher"}) + id, tok, deps, typ, args, err := q.Dequeue(context.Background(), uuid.Nil, []string{"octopus"}, []string{"kingfisher"}) require.NoError(t, err) require.Equal(t, two, id) require.NotEmpty(t, tok) @@ -151,7 +152,7 @@ func testArgs(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, "kingfisher", jchan) require.Equal(t, typ, jtype) - id, tok, deps, typ, args, err = q.Dequeue(context.Background(), []string{"fish"}, []string{"toucan"}) + id, tok, deps, typ, args, err = q.Dequeue(context.Background(), uuid.Nil, []string{"fish"}, []string{"toucan"}) require.NoError(t, err) require.Equal(t, one, id) require.NotEmpty(t, tok) @@ -181,7 +182,7 @@ func testJobTypes(t *testing.T, q jobqueue.JobQueue) { ctx, cancel := context.WithCancel(context.Background()) cancel() - id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"zebra"}, []string{""}) + id, tok, deps, typ, args, err := q.Dequeue(ctx, uuid.Nil, []string{"zebra"}, []string{""}) require.Equal(t, err, jobqueue.ErrDequeueTimeout) require.Equal(t, uuid.Nil, id) require.Equal(t, uuid.Nil, tok) @@ -193,12 +194,12 @@ func testJobTypes(t *testing.T, q jobqueue.JobQueue) { func testDequeueTimeout(t *testing.T, q jobqueue.JobQueue) { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*20) defer cancel() - _, _, _, _, _, err := q.Dequeue(ctx, []string{"octopus"}, []string{""}) + _, _, _, _, _, err := q.Dequeue(ctx, uuid.Nil, []string{"octopus"}, []string{""}) require.Equal(t, jobqueue.ErrDequeueTimeout, err) ctx2, cancel2 := context.WithCancel(context.Background()) cancel2() - _, _, _, _, _, err = q.Dequeue(ctx2, []string{"octopus"}, []string{""}) + _, _, _, _, _, err = q.Dequeue(ctx2, uuid.Nil, []string{"octopus"}, []string{""}) require.Equal(t, jobqueue.ErrDequeueTimeout, err) } @@ -286,7 +287,7 @@ func testMultipleWorkers(t *testing.T, q jobqueue.JobQueue) { defer close(done) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"octopus"}, []string{""}) + id, tok, deps, typ, args, err := q.Dequeue(ctx, uuid.Nil, []string{"octopus"}, []string{""}) require.NoError(t, err) require.NotEmpty(t, id) require.NotEmpty(t, tok) @@ -301,7 +302,7 @@ func testMultipleWorkers(t *testing.T, q jobqueue.JobQueue) { // This call to Dequeue() should not block on the one in the goroutine. id := pushTestJob(t, q, "clownfish", nil, nil, "") - r, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}, []string{""}) + r, tok, deps, typ, args, err := q.Dequeue(context.Background(), uuid.Nil, []string{"clownfish"}, []string{""}) require.NoError(t, err) require.Equal(t, id, r) require.NotEmpty(t, tok) @@ -324,7 +325,7 @@ func testMultipleWorkersSingleJobType(t *testing.T, q jobqueue.JobQueue) { defer wg.Add(-1) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"clownfish"}, []string{""}) + id, tok, deps, typ, args, err := q.Dequeue(ctx, uuid.Nil, []string{"clownfish"}, []string{""}) require.NoError(t, err) require.NotEmpty(t, id) require.NotEmpty(t, tok) @@ -371,7 +372,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { // Cancel a running job, which should not dequeue the canceled job from above id = pushTestJob(t, q, "clownfish", nil, nil, "") require.NotEmpty(t, id) - r, tok, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}, []string{""}) + r, tok, deps, typ, args, err := q.Dequeue(context.Background(), uuid.Nil, []string{"clownfish"}, []string{""}) require.NoError(t, err) require.Equal(t, id, r) require.NotEmpty(t, tok) @@ -391,7 +392,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { // Cancel a finished job, which is a no-op id = pushTestJob(t, q, "clownfish", nil, nil, "") require.NotEmpty(t, id) - r, tok, deps, typ, args, err = q.Dequeue(context.Background(), []string{"clownfish"}, []string{""}) + r, tok, deps, typ, args, err = q.Dequeue(context.Background(), uuid.Nil, []string{"clownfish"}, []string{""}) require.NoError(t, err) require.Equal(t, id, r) require.NotEmpty(t, tok) @@ -423,7 +424,7 @@ func testRequeue(t *testing.T, q jobqueue.JobQueue) { require.Error(t, err) // Requeue a running job - r, tok1, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}, []string{""}) + r, tok1, deps, typ, args, err := q.Dequeue(context.Background(), uuid.Nil, []string{"clownfish"}, []string{""}) require.NoError(t, err) require.Equal(t, id, r) require.NotEmpty(t, tok1) @@ -432,7 +433,7 @@ func testRequeue(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, json.RawMessage("null"), args) err = q.RequeueOrFinishJob(id, 1, nil) require.NoError(t, err) - r, tok2, deps, typ, args, err := q.Dequeue(context.Background(), []string{"clownfish"}, []string{""}) + r, tok2, deps, typ, args, err := q.Dequeue(context.Background(), uuid.Nil, []string{"clownfish"}, []string{""}) require.NoError(t, err) require.Equal(t, id, r) require.NotEmpty(t, tok2) @@ -457,13 +458,13 @@ func testRequeueLimit(t *testing.T, q jobqueue.JobQueue) { // Start a job id := pushTestJob(t, q, "clownfish", nil, nil, "") require.NotEmpty(t, id) - _, _, _, _, _, err := q.Dequeue(context.Background(), []string{"clownfish"}, []string{""}) + _, _, _, _, _, err := q.Dequeue(context.Background(), uuid.Nil, []string{"clownfish"}, []string{""}) require.NoError(t, err) // Requeue once err = q.RequeueOrFinishJob(id, 1, nil) require.NoError(t, err) // Start again - _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"clownfish"}, []string{""}) + _, _, _, _, _, err = q.Dequeue(context.Background(), uuid.Nil, []string{"clownfish"}, []string{""}) require.NoError(t, err) _, _, result, _, _, finished, _, _, _, err := q.JobStatus(id) require.NoError(t, err) @@ -483,7 +484,7 @@ func testHeartbeats(t *testing.T, q jobqueue.JobQueue) { // No heartbeats for queued job require.Empty(t, q.Heartbeats(time.Second*0)) - r, tok, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) + r, tok, _, _, _, err := q.Dequeue(context.Background(), uuid.Nil, []string{"octopus"}, []string{""}) require.NoError(t, err) require.Equal(t, id, r) require.NotEmpty(t, tok) @@ -518,7 +519,7 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) { one := pushTestJob(t, q, "octopus", nil, nil, "") two := pushTestJob(t, q, "octopus", nil, nil, "") - tok, d, typ, args, err := q.DequeueByID(context.Background(), one) + tok, d, typ, args, err := q.DequeueByID(context.Background(), one, uuid.Nil) require.NoError(t, err) require.NotEmpty(t, tok) require.Empty(t, d) @@ -535,7 +536,7 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) { one := pushTestJob(t, q, "octopus", nil, nil, "") two := pushTestJob(t, q, "octopus", nil, []uuid.UUID{one}, "") - _, _, _, _, err := q.DequeueByID(context.Background(), two) + _, _, _, _, err := q.DequeueByID(context.Background(), two, uuid.Nil) require.Equal(t, jobqueue.ErrNotPending, err) require.Equal(t, one, finishNextTestJob(t, q, "octopus", testResult{}, nil)) @@ -545,16 +546,16 @@ func testDequeueByID(t *testing.T, q jobqueue.JobQueue) { t.Run("cannot dequeue a non-pending job", func(t *testing.T) { one := pushTestJob(t, q, "octopus", nil, nil, "") - _, _, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) + _, _, _, _, _, err := q.Dequeue(context.Background(), uuid.Nil, []string{"octopus"}, []string{""}) require.NoError(t, err) - _, _, _, _, err = q.DequeueByID(context.Background(), one) + _, _, _, _, err = q.DequeueByID(context.Background(), one, uuid.Nil) require.Equal(t, jobqueue.ErrNotPending, err) err = q.RequeueOrFinishJob(one, 0, nil) require.NoError(t, err) - _, _, _, _, err = q.DequeueByID(context.Background(), one) + _, _, _, _, err = q.DequeueByID(context.Background(), one, uuid.Nil) require.Equal(t, jobqueue.ErrNotPending, err) }) } @@ -571,7 +572,7 @@ func testMultipleChannels(t *testing.T, q jobqueue.JobQueue) { go func() { defer wg.Done() - id, _, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{"kingfisher"}) + id, _, _, _, _, err := q.Dequeue(context.Background(), uuid.Nil, []string{"octopus"}, []string{"kingfisher"}) require.NoError(t, err) expectedID := <-twoChan @@ -583,7 +584,7 @@ func testMultipleChannels(t *testing.T, q jobqueue.JobQueue) { go func() { defer wg.Done() - id, _, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{"toucan"}) + id, _, _, _, _, err := q.Dequeue(context.Background(), uuid.Nil, []string{"octopus"}, []string{"toucan"}) require.NoError(t, err) expectedID := <-oneChan @@ -610,13 +611,13 @@ func testMultipleChannels(t *testing.T, q jobqueue.JobQueue) { go func() { defer wg.Done() - id, _, _, _, _, err := q.Dequeue(context.Background(), []string{"octopus"}, []string{"kingfisher", "toucan"}) + id, _, _, _, _, err := q.Dequeue(context.Background(), uuid.Nil, []string{"octopus"}, []string{"kingfisher", "toucan"}) require.NoError(t, err) expectedID := <-oneChan require.Equal(t, expectedID, id) - id, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{"kingfisher", "toucan"}) + id, _, _, _, _, err = q.Dequeue(context.Background(), uuid.Nil, []string{"octopus"}, []string{"kingfisher", "toucan"}) require.NoError(t, err) expectedID = <-twoChan @@ -639,11 +640,11 @@ func testMultipleChannels(t *testing.T, q jobqueue.JobQueue) { // dequeue from an empty channel ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond*100)) defer cancel() - _, _, _, _, _, err := q.Dequeue(ctx, []string{"octopus"}, []string{""}) + _, _, _, _, _, err := q.Dequeue(ctx, uuid.Nil, []string{"octopus"}, []string{""}) require.ErrorIs(t, err, jobqueue.ErrDequeueTimeout) // dequeue from toucan channel - _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{"toucan"}) + _, _, _, _, _, err = q.Dequeue(context.Background(), uuid.Nil, []string{"octopus"}, []string{"toucan"}) require.NoError(t, err) // enqueue into an empty channel @@ -652,11 +653,11 @@ func testMultipleChannels(t *testing.T, q jobqueue.JobQueue) { // dequeue from toucan channel ctx2, cancel2 := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond*100)) defer cancel2() - _, _, _, _, _, err = q.Dequeue(ctx2, []string{"octopus"}, []string{"toucan"}) + _, _, _, _, _, err = q.Dequeue(ctx2, uuid.Nil, []string{"octopus"}, []string{"toucan"}) require.ErrorIs(t, err, jobqueue.ErrDequeueTimeout) // dequeue from an empty channel - _, _, _, _, _, err = q.Dequeue(context.Background(), []string{"octopus"}, []string{""}) + _, _, _, _, _, err = q.Dequeue(context.Background(), uuid.Nil, []string{"octopus"}, []string{""}) require.NoError(t, err) }) } @@ -700,3 +701,42 @@ func test100dequeuers(t *testing.T, q jobqueue.JobQueue) { wg.Wait() } + +// Registers workers and runs jobs against them +func testWorkers(t *testing.T, q jobqueue.JobQueue) { + one := pushTestJob(t, q, "octopus", nil, nil, "") + + w1, err := q.InsertWorker("x86_64") + require.NoError(t, err) + w2, err := q.InsertWorker("aarch64") + require.NoError(t, err) + + workers, err := q.Workers(0) + require.NoError(t, err) + require.Len(t, workers, 2) + + workers, err = q.Workers(time.Hour * 24) + require.NoError(t, err) + require.Len(t, workers, 0) + + _, _, _, _, _, err = q.Dequeue(context.Background(), w1, []string{"octopus"}, []string{""}) + require.NoError(t, err) + + err = q.DeleteWorker(w1) + require.Equal(t, err, jobqueue.ErrActiveJobs) + + err = q.UpdateWorkerStatus(w1) + require.NoError(t, err) + + err = q.UpdateWorkerStatus(uuid.New()) + require.Equal(t, err, jobqueue.ErrWorkerNotExist) + + err = q.RequeueOrFinishJob(one, 0, &testResult{}) + require.NoError(t, err) + + err = q.DeleteWorker(w1) + require.NoError(t, err) + + err = q.DeleteWorker(w2) + require.NoError(t, err) +} diff --git a/pkg/jobqueue/dbjobqueue/dbjobqueue.go b/pkg/jobqueue/dbjobqueue/dbjobqueue.go index 8437301a9..fc56dbb46 100644 --- a/pkg/jobqueue/dbjobqueue/dbjobqueue.go +++ b/pkg/jobqueue/dbjobqueue/dbjobqueue.go @@ -95,19 +95,41 @@ const ( RETURNING type, started_at` sqlInsertHeartbeat = ` - INSERT INTO heartbeats(token, id, heartbeat) - VALUES ($1, $2, now())` + INSERT INTO heartbeats(token, id, heartbeat) + VALUES ($1, $2, now())` + sqlInsertHeartbeatWithWorker = ` + INSERT INTO heartbeats(token, id, worker_id, heartbeat) + VALUES ($1, $2, $3, now())` sqlQueryHeartbeats = ` - SELECT token - FROM heartbeats - WHERE age(now(), heartbeat) > $1` + SELECT token + FROM heartbeats + WHERE age(now(), heartbeat) > $1` sqlRefreshHeartbeat = ` - UPDATE heartbeats - SET heartbeat = now() - WHERE token = $1` + UPDATE heartbeats + SET heartbeat = now() + WHERE token = $1` sqlDeleteHeartbeat = ` - DELETE FROM heartbeats - WHERE id = $1` + DELETE FROM heartbeats + WHERE id = $1` + sqlQueryHeartbeatsForWorker = ` + SELECT count(token) + FROM heartbeats + WHERE worker_id = $1` + + sqlInsertWorker = ` + INSERT INTO workers(worker_id, arch, heartbeat) + VALUES($1, $2, now())` + sqlUpdateWorkerStatus = ` + UPDATE heartbeats + SET heartbeat = now() + WHERE worker_id = $1` + sqlQueryWorkers = ` + SELECT worker_id + FROM workers + WHERE age(now(), heartbeat) > $1` + sqlDeleteWorker = ` + DELETE FROM workers + WHERE worker_id = $1` ) type DBJobQueue struct { @@ -307,7 +329,7 @@ func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu return id, nil } -func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { +func (q *DBJobQueue) Dequeue(ctx context.Context, workerID uuid.UUID, jobTypes, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { // add ourselves as a dequeuer c := make(chan struct{}, 1) el := q.dequeuers.pushBack(c) @@ -316,7 +338,7 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels [] token := uuid.New() for { var err error - id, dependencies, jobType, args, err := q.dequeueMaybe(ctx, token, jobTypes, channels) + id, dependencies, jobType, args, err := q.dequeueMaybe(ctx, token, workerID, jobTypes, channels) if err == nil { return id, token, dependencies, jobType, args, nil } @@ -343,7 +365,7 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels [] // // This method returns pgx.ErrNoRows if the method didn't manage to dequeue // anything -func (q *DBJobQueue) dequeueMaybe(ctx context.Context, token uuid.UUID, jobTypes []string, channels []string) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { +func (q *DBJobQueue) dequeueMaybe(ctx context.Context, token, workerID uuid.UUID, jobTypes, channels []string) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { var conn *pgxpool.Conn conn, err := q.pool.Acquire(ctx) if err != nil { @@ -374,9 +396,16 @@ func (q *DBJobQueue) dequeueMaybe(ctx context.Context, token uuid.UUID, jobTypes } // insert heartbeat - _, err = tx.Exec(ctx, sqlInsertHeartbeat, token, id) - if err != nil { - return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err) + if workerID != uuid.Nil { + _, err = tx.Exec(ctx, sqlInsertHeartbeatWithWorker, token, id, workerID) + if err != nil { + return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err) + } + } else { + _, err = tx.Exec(ctx, sqlInsertHeartbeat, token, id) + if err != nil { + return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err) + } } dependencies, err := q.jobDependencies(ctx, tx, id) @@ -394,7 +423,7 @@ func (q *DBJobQueue) dequeueMaybe(ctx context.Context, token uuid.UUID, jobTypes return id, dependencies, jobType, args, nil } -func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { +func (q *DBJobQueue) DequeueByID(ctx context.Context, id, workerID uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { // Return early if the context is already canceled. if err := ctx.Err(); err != nil { return uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout @@ -431,9 +460,16 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, } // insert heartbeat - _, err = tx.Exec(ctx, sqlInsertHeartbeat, token, id) - if err != nil { - return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err) + if workerID != uuid.Nil { + _, err = tx.Exec(ctx, sqlInsertHeartbeatWithWorker, token, id, workerID) + if err != nil { + return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err) + } + } else { + _, err = tx.Exec(ctx, sqlInsertHeartbeat, token, id) + if err != nil { + return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err) + } } dependencies, err := q.jobDependencies(ctx, tx, id) @@ -486,7 +522,7 @@ func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result return jobqueue.ErrNotRunning } - // Remove from heartbeats + // Remove from heartbeats if token is null tag, err := tx.Exec(context.Background(), sqlDeleteHeartbeat, id) if err != nil { return fmt.Errorf("error removing job %s from heartbeats: %v", id, err) @@ -677,6 +713,102 @@ func (q *DBJobQueue) RefreshHeartbeat(token uuid.UUID) { } } +func (q *DBJobQueue) InsertWorker(arch string) (uuid.UUID, error) { + conn, err := q.pool.Acquire(context.Background()) + if err != nil { + return uuid.Nil, err + } + defer conn.Release() + + id := uuid.New() + _, err = conn.Exec(context.Background(), sqlInsertWorker, id, arch) + if err != nil { + q.logger.Error(err, "Error inserting worker") + return uuid.Nil, err + } + return id, nil +} + +func (q *DBJobQueue) UpdateWorkerStatus(workerID uuid.UUID) error { + conn, err := q.pool.Acquire(context.Background()) + if err != nil { + return err + } + defer conn.Release() + + tag, err := conn.Exec(context.Background(), sqlUpdateWorkerStatus, workerID) + if err != nil { + q.logger.Error(err, "Error updating worker status") + } + if tag.RowsAffected() != 1 { + q.logger.Error(nil, "No rows affected when refreshing updating status") + return jobqueue.ErrWorkerNotExist + } + return nil +} +func (q *DBJobQueue) Workers(olderThan time.Duration) ([]uuid.UUID, error) { + conn, err := q.pool.Acquire(context.Background()) + if err != nil { + return nil, err + } + defer conn.Release() + + rows, err := conn.Query(context.Background(), sqlQueryWorkers, olderThan.String()) + if err != nil { + return nil, err + } + defer rows.Close() + + workerIDs := make([]uuid.UUID, 0) + for rows.Next() { + var w uuid.UUID + err = rows.Scan(&w) + if err != nil { + // Log the error and try to continue with the next row + q.logger.Error(err, "Unable to read token from heartbeats") + continue + } + workerIDs = append(workerIDs, w) + } + if rows.Err() != nil { + q.logger.Error(rows.Err(), "Error reading tokens from heartbeats") + return nil, rows.Err() + } + + return workerIDs, nil +} + +func (q *DBJobQueue) DeleteWorker(workerID uuid.UUID) error { + conn, err := q.pool.Acquire(context.Background()) + if err != nil { + return err + } + defer conn.Release() + + var count int + err = conn.QueryRow(context.Background(), sqlQueryHeartbeatsForWorker, workerID).Scan(&count) + if err != nil { + return err + } + + // If worker has any active jobs, refuse to remove the worker + if count != 0 { + return jobqueue.ErrActiveJobs + } + + tag, err := conn.Exec(context.Background(), sqlDeleteWorker, workerID) + if err != nil { + q.logger.Error(err, "Error deleting worker") + return err + } + + if tag.RowsAffected() != 1 { + q.logger.Error(nil, "No rows affected when deleting worker") + return jobqueue.ErrActiveJobs + } + return nil +} + // connection unifies pgxpool.Conn and pgx.Tx interfaces // Some methods don't care whether they run queries on a raw connection, // or in a transaction. This interface thus abstracts this concept. diff --git a/pkg/jobqueue/dbjobqueue/schemas/007_create_table_workers.sql b/pkg/jobqueue/dbjobqueue/schemas/007_create_table_workers.sql new file mode 100644 index 000000000..4b1028857 --- /dev/null +++ b/pkg/jobqueue/dbjobqueue/schemas/007_create_table_workers.sql @@ -0,0 +1,8 @@ +CREATE TABLE workers( + worker_id uuid PRIMARY KEY, + arch varchar NOT NULL, + heartbeat timestamp NOT NULL +); + +ALTER TABLE heartbeats +ADD COLUMN worker_id uuid REFERENCES workers(worker_id) ON DELETE CASCADE; diff --git a/pkg/jobqueue/jobqueue.go b/pkg/jobqueue/jobqueue.go index 8b11f5f86..55c357cab 100644 --- a/pkg/jobqueue/jobqueue.go +++ b/pkg/jobqueue/jobqueue.go @@ -41,13 +41,13 @@ type JobQueue interface { // // Returns the job's id, token, dependencies, type, and arguments, or an error. Arguments // can be unmarshaled to the type given in Enqueue(). - Dequeue(ctx context.Context, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) + Dequeue(ctx context.Context, workerID uuid.UUID, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) // Dequeues a pending job by its ID in a non-blocking way. // // Returns the job's token, dependencies, type, and arguments, or an error. Arguments // can be unmarshaled to the type given in Enqueue(). - DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) + DequeueByID(ctx context.Context, id, workerID uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) // Tries to requeue a running job by its ID // @@ -78,8 +78,20 @@ type JobQueue interface { // Get a list of tokens which haven't been updated in the specified time frame Heartbeats(olderThan time.Duration) (tokens []uuid.UUID) - // Reset the last heartbeat time to time.Now() + // Reset the last job heartbeat time to time.Now() RefreshHeartbeat(token uuid.UUID) + + // Inserts the worker and creates a UUID for it + InsertWorker(arch string) (uuid.UUID, error) + + // Reset the last worker's heartbeat time to time.Now() + UpdateWorkerStatus(workerID uuid.UUID) error + + // Get a list of workers which haven't been updated in the specified time frame + Workers(olderThan time.Duration) ([]uuid.UUID, error) + + // Deletes the worker + DeleteWorker(workerID uuid.UUID) error } // SimpleLogger provides a structured logging methods for the jobqueue library. @@ -100,4 +112,6 @@ var ( ErrNotRunning = errors.New("job is not running") ErrCanceled = errors.New("job was canceled") ErrDequeueTimeout = errors.New("dequeue context timed out or was canceled") + ErrActiveJobs = errors.New("worker has active jobs associated with it") + ErrWorkerNotExist = errors.New("worker does not exist") )