jobqueue: add ability to track workers

This commit is contained in:
Sanne Raymaekers 2023-10-25 13:52:51 +02:00 committed by Achilleas Koutsou
parent 2410b00eb9
commit d784075d31
6 changed files with 336 additions and 63 deletions

View file

@ -95,19 +95,41 @@ const (
RETURNING type, started_at`
sqlInsertHeartbeat = `
INSERT INTO heartbeats(token, id, heartbeat)
VALUES ($1, $2, now())`
INSERT INTO heartbeats(token, id, heartbeat)
VALUES ($1, $2, now())`
sqlInsertHeartbeatWithWorker = `
INSERT INTO heartbeats(token, id, worker_id, heartbeat)
VALUES ($1, $2, $3, now())`
sqlQueryHeartbeats = `
SELECT token
FROM heartbeats
WHERE age(now(), heartbeat) > $1`
SELECT token
FROM heartbeats
WHERE age(now(), heartbeat) > $1`
sqlRefreshHeartbeat = `
UPDATE heartbeats
SET heartbeat = now()
WHERE token = $1`
UPDATE heartbeats
SET heartbeat = now()
WHERE token = $1`
sqlDeleteHeartbeat = `
DELETE FROM heartbeats
WHERE id = $1`
DELETE FROM heartbeats
WHERE id = $1`
sqlQueryHeartbeatsForWorker = `
SELECT count(token)
FROM heartbeats
WHERE worker_id = $1`
sqlInsertWorker = `
INSERT INTO workers(worker_id, arch, heartbeat)
VALUES($1, $2, now())`
sqlUpdateWorkerStatus = `
UPDATE heartbeats
SET heartbeat = now()
WHERE worker_id = $1`
sqlQueryWorkers = `
SELECT worker_id
FROM workers
WHERE age(now(), heartbeat) > $1`
sqlDeleteWorker = `
DELETE FROM workers
WHERE worker_id = $1`
)
type DBJobQueue struct {
@ -307,7 +329,7 @@ func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
return id, nil
}
func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
func (q *DBJobQueue) Dequeue(ctx context.Context, workerID uuid.UUID, jobTypes, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
// add ourselves as a dequeuer
c := make(chan struct{}, 1)
el := q.dequeuers.pushBack(c)
@ -316,7 +338,7 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels []
token := uuid.New()
for {
var err error
id, dependencies, jobType, args, err := q.dequeueMaybe(ctx, token, jobTypes, channels)
id, dependencies, jobType, args, err := q.dequeueMaybe(ctx, token, workerID, jobTypes, channels)
if err == nil {
return id, token, dependencies, jobType, args, nil
}
@ -343,7 +365,7 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels []
//
// This method returns pgx.ErrNoRows if the method didn't manage to dequeue
// anything
func (q *DBJobQueue) dequeueMaybe(ctx context.Context, token uuid.UUID, jobTypes []string, channels []string) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
func (q *DBJobQueue) dequeueMaybe(ctx context.Context, token, workerID uuid.UUID, jobTypes, channels []string) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
var conn *pgxpool.Conn
conn, err := q.pool.Acquire(ctx)
if err != nil {
@ -374,9 +396,16 @@ func (q *DBJobQueue) dequeueMaybe(ctx context.Context, token uuid.UUID, jobTypes
}
// insert heartbeat
_, err = tx.Exec(ctx, sqlInsertHeartbeat, token, id)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err)
if workerID != uuid.Nil {
_, err = tx.Exec(ctx, sqlInsertHeartbeatWithWorker, token, id, workerID)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err)
}
} else {
_, err = tx.Exec(ctx, sqlInsertHeartbeat, token, id)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err)
}
}
dependencies, err := q.jobDependencies(ctx, tx, id)
@ -394,7 +423,7 @@ func (q *DBJobQueue) dequeueMaybe(ctx context.Context, token uuid.UUID, jobTypes
return id, dependencies, jobType, args, nil
}
func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
func (q *DBJobQueue) DequeueByID(ctx context.Context, id, workerID uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
// Return early if the context is already canceled.
if err := ctx.Err(); err != nil {
return uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
@ -431,9 +460,16 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID,
}
// insert heartbeat
_, err = tx.Exec(ctx, sqlInsertHeartbeat, token, id)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err)
if workerID != uuid.Nil {
_, err = tx.Exec(ctx, sqlInsertHeartbeatWithWorker, token, id, workerID)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err)
}
} else {
_, err = tx.Exec(ctx, sqlInsertHeartbeat, token, id)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err)
}
}
dependencies, err := q.jobDependencies(ctx, tx, id)
@ -486,7 +522,7 @@ func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result
return jobqueue.ErrNotRunning
}
// Remove from heartbeats
// Remove from heartbeats if token is null
tag, err := tx.Exec(context.Background(), sqlDeleteHeartbeat, id)
if err != nil {
return fmt.Errorf("error removing job %s from heartbeats: %v", id, err)
@ -677,6 +713,102 @@ func (q *DBJobQueue) RefreshHeartbeat(token uuid.UUID) {
}
}
func (q *DBJobQueue) InsertWorker(arch string) (uuid.UUID, error) {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return uuid.Nil, err
}
defer conn.Release()
id := uuid.New()
_, err = conn.Exec(context.Background(), sqlInsertWorker, id, arch)
if err != nil {
q.logger.Error(err, "Error inserting worker")
return uuid.Nil, err
}
return id, nil
}
func (q *DBJobQueue) UpdateWorkerStatus(workerID uuid.UUID) error {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return err
}
defer conn.Release()
tag, err := conn.Exec(context.Background(), sqlUpdateWorkerStatus, workerID)
if err != nil {
q.logger.Error(err, "Error updating worker status")
}
if tag.RowsAffected() != 1 {
q.logger.Error(nil, "No rows affected when refreshing updating status")
return jobqueue.ErrWorkerNotExist
}
return nil
}
func (q *DBJobQueue) Workers(olderThan time.Duration) ([]uuid.UUID, error) {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return nil, err
}
defer conn.Release()
rows, err := conn.Query(context.Background(), sqlQueryWorkers, olderThan.String())
if err != nil {
return nil, err
}
defer rows.Close()
workerIDs := make([]uuid.UUID, 0)
for rows.Next() {
var w uuid.UUID
err = rows.Scan(&w)
if err != nil {
// Log the error and try to continue with the next row
q.logger.Error(err, "Unable to read token from heartbeats")
continue
}
workerIDs = append(workerIDs, w)
}
if rows.Err() != nil {
q.logger.Error(rows.Err(), "Error reading tokens from heartbeats")
return nil, rows.Err()
}
return workerIDs, nil
}
func (q *DBJobQueue) DeleteWorker(workerID uuid.UUID) error {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return err
}
defer conn.Release()
var count int
err = conn.QueryRow(context.Background(), sqlQueryHeartbeatsForWorker, workerID).Scan(&count)
if err != nil {
return err
}
// If worker has any active jobs, refuse to remove the worker
if count != 0 {
return jobqueue.ErrActiveJobs
}
tag, err := conn.Exec(context.Background(), sqlDeleteWorker, workerID)
if err != nil {
q.logger.Error(err, "Error deleting worker")
return err
}
if tag.RowsAffected() != 1 {
q.logger.Error(nil, "No rows affected when deleting worker")
return jobqueue.ErrActiveJobs
}
return nil
}
// connection unifies pgxpool.Conn and pgx.Tx interfaces
// Some methods don't care whether they run queries on a raw connection,
// or in a transaction. This interface thus abstracts this concept.

View file

@ -0,0 +1,8 @@
CREATE TABLE workers(
worker_id uuid PRIMARY KEY,
arch varchar NOT NULL,
heartbeat timestamp NOT NULL
);
ALTER TABLE heartbeats
ADD COLUMN worker_id uuid REFERENCES workers(worker_id) ON DELETE CASCADE;

View file

@ -41,13 +41,13 @@ type JobQueue interface {
//
// Returns the job's id, token, dependencies, type, and arguments, or an error. Arguments
// can be unmarshaled to the type given in Enqueue().
Dequeue(ctx context.Context, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error)
Dequeue(ctx context.Context, workerID uuid.UUID, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error)
// Dequeues a pending job by its ID in a non-blocking way.
//
// Returns the job's token, dependencies, type, and arguments, or an error. Arguments
// can be unmarshaled to the type given in Enqueue().
DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error)
DequeueByID(ctx context.Context, id, workerID uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error)
// Tries to requeue a running job by its ID
//
@ -78,8 +78,20 @@ type JobQueue interface {
// Get a list of tokens which haven't been updated in the specified time frame
Heartbeats(olderThan time.Duration) (tokens []uuid.UUID)
// Reset the last heartbeat time to time.Now()
// Reset the last job heartbeat time to time.Now()
RefreshHeartbeat(token uuid.UUID)
// Inserts the worker and creates a UUID for it
InsertWorker(arch string) (uuid.UUID, error)
// Reset the last worker's heartbeat time to time.Now()
UpdateWorkerStatus(workerID uuid.UUID) error
// Get a list of workers which haven't been updated in the specified time frame
Workers(olderThan time.Duration) ([]uuid.UUID, error)
// Deletes the worker
DeleteWorker(workerID uuid.UUID) error
}
// SimpleLogger provides a structured logging methods for the jobqueue library.
@ -100,4 +112,6 @@ var (
ErrNotRunning = errors.New("job is not running")
ErrCanceled = errors.New("job was canceled")
ErrDequeueTimeout = errors.New("dequeue context timed out or was canceled")
ErrActiveJobs = errors.New("worker has active jobs associated with it")
ErrWorkerNotExist = errors.New("worker does not exist")
)