pkg/jobqueue: add arch to worker

This commit is contained in:
Sanne Raymaekers 2023-12-08 12:20:36 +01:00
parent 8a8b1406fb
commit ac854b7cc8
4 changed files with 28 additions and 16 deletions

View file

@ -493,18 +493,21 @@ func (q *fsJobQueue) UpdateWorkerStatus(wID uuid.UUID) error {
return nil
}
func (q *fsJobQueue) Workers(olderThan time.Duration) ([]uuid.UUID, error) {
func (q *fsJobQueue) Workers(olderThan time.Duration) ([]jobqueue.Worker, error) {
q.mu.Lock()
defer q.mu.Unlock()
now := time.Now()
wIDs := []uuid.UUID{}
for wID, worker := range q.workers {
if now.Sub(worker.Heartbeat) > olderThan {
wIDs = append(wIDs, wID)
workers := []jobqueue.Worker{}
for wID, w := range q.workers {
if now.Sub(w.Heartbeat) > olderThan {
workers = append(workers, jobqueue.Worker{
ID: wID,
Arch: w.Arch,
})
}
}
return wIDs, nil
return workers, nil
}
func (q *fsJobQueue) DeleteWorker(wID uuid.UUID) error {

View file

@ -163,9 +163,9 @@ func (s *Server) WatchWorkers() {
logrus.Warningf("Unable to query workers: %v", err)
continue
}
for _, wID := range workers {
logrus.Infof("Removing inactive worker: %s", wID)
err = s.jobs.DeleteWorker(wID)
for _, w := range workers {
logrus.Infof("Removing inactive worker: %s", w.ID)
err = s.jobs.DeleteWorker(w.ID)
if err != nil {
logrus.Warningf("Unable to remove worker: %v", err)
}

View file

@ -124,7 +124,7 @@ const (
SET heartbeat = now()
WHERE worker_id = $1`
sqlQueryWorkers = `
SELECT worker_id
SELECT worker_id, arch
FROM workers
WHERE age(now(), heartbeat) > $1`
sqlDeleteWorker = `
@ -746,7 +746,7 @@ func (q *DBJobQueue) UpdateWorkerStatus(workerID uuid.UUID) error {
}
return nil
}
func (q *DBJobQueue) Workers(olderThan time.Duration) ([]uuid.UUID, error) {
func (q *DBJobQueue) Workers(olderThan time.Duration) ([]jobqueue.Worker, error) {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return nil, err
@ -759,23 +759,27 @@ func (q *DBJobQueue) Workers(olderThan time.Duration) ([]uuid.UUID, error) {
}
defer rows.Close()
workerIDs := make([]uuid.UUID, 0)
workers := make([]jobqueue.Worker, 0)
for rows.Next() {
var w uuid.UUID
err = rows.Scan(&w)
var a string
err = rows.Scan(&w, &a)
if err != nil {
// Log the error and try to continue with the next row
q.logger.Error(err, "Unable to read token from heartbeats")
continue
}
workerIDs = append(workerIDs, w)
workers = append(workers, jobqueue.Worker{
ID: w,
Arch: a,
})
}
if rows.Err() != nil {
q.logger.Error(rows.Err(), "Error reading tokens from heartbeats")
return nil, rows.Err()
}
return workerIDs, nil
return workers, nil
}
func (q *DBJobQueue) DeleteWorker(workerID uuid.UUID) error {

View file

@ -88,7 +88,7 @@ type JobQueue interface {
UpdateWorkerStatus(workerID uuid.UUID) error
// Get a list of workers which haven't been updated in the specified time frame
Workers(olderThan time.Duration) ([]uuid.UUID, error)
Workers(olderThan time.Duration) ([]Worker, error)
// Deletes the worker
DeleteWorker(workerID uuid.UUID) error
@ -115,3 +115,8 @@ var (
ErrActiveJobs = errors.New("worker has active jobs associated with it")
ErrWorkerNotExist = errors.New("worker does not exist")
)
type Worker struct {
ID uuid.UUID
Arch string
}