diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index 69b7ca8ea..348c2e723 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -493,18 +493,21 @@ func (q *fsJobQueue) UpdateWorkerStatus(wID uuid.UUID) error { return nil } -func (q *fsJobQueue) Workers(olderThan time.Duration) ([]uuid.UUID, error) { +func (q *fsJobQueue) Workers(olderThan time.Duration) ([]jobqueue.Worker, error) { q.mu.Lock() defer q.mu.Unlock() now := time.Now() - wIDs := []uuid.UUID{} - for wID, worker := range q.workers { - if now.Sub(worker.Heartbeat) > olderThan { - wIDs = append(wIDs, wID) + workers := []jobqueue.Worker{} + for wID, w := range q.workers { + if now.Sub(w.Heartbeat) > olderThan { + workers = append(workers, jobqueue.Worker{ + ID: wID, + Arch: w.Arch, + }) } } - return wIDs, nil + return workers, nil } func (q *fsJobQueue) DeleteWorker(wID uuid.UUID) error { diff --git a/internal/worker/server.go b/internal/worker/server.go index 254c1421c..da33cc2a5 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -163,9 +163,9 @@ func (s *Server) WatchWorkers() { logrus.Warningf("Unable to query workers: %v", err) continue } - for _, wID := range workers { - logrus.Infof("Removing inactive worker: %s", wID) - err = s.jobs.DeleteWorker(wID) + for _, w := range workers { + logrus.Infof("Removing inactive worker: %s", w.ID) + err = s.jobs.DeleteWorker(w.ID) if err != nil { logrus.Warningf("Unable to remove worker: %v", err) } diff --git a/pkg/jobqueue/dbjobqueue/dbjobqueue.go b/pkg/jobqueue/dbjobqueue/dbjobqueue.go index fc56dbb46..be1bda07b 100644 --- a/pkg/jobqueue/dbjobqueue/dbjobqueue.go +++ b/pkg/jobqueue/dbjobqueue/dbjobqueue.go @@ -124,7 +124,7 @@ const ( SET heartbeat = now() WHERE worker_id = $1` sqlQueryWorkers = ` - SELECT worker_id + SELECT worker_id, arch FROM workers WHERE age(now(), heartbeat) > $1` sqlDeleteWorker = ` @@ -746,7 +746,7 @@ func (q *DBJobQueue) UpdateWorkerStatus(workerID uuid.UUID) error { } return nil } -func (q *DBJobQueue) Workers(olderThan time.Duration) ([]uuid.UUID, error) { +func (q *DBJobQueue) Workers(olderThan time.Duration) ([]jobqueue.Worker, error) { conn, err := q.pool.Acquire(context.Background()) if err != nil { return nil, err @@ -759,23 +759,27 @@ func (q *DBJobQueue) Workers(olderThan time.Duration) ([]uuid.UUID, error) { } defer rows.Close() - workerIDs := make([]uuid.UUID, 0) + workers := make([]jobqueue.Worker, 0) for rows.Next() { var w uuid.UUID - err = rows.Scan(&w) + var a string + err = rows.Scan(&w, &a) if err != nil { // Log the error and try to continue with the next row q.logger.Error(err, "Unable to read token from heartbeats") continue } - workerIDs = append(workerIDs, w) + workers = append(workers, jobqueue.Worker{ + ID: w, + Arch: a, + }) } if rows.Err() != nil { q.logger.Error(rows.Err(), "Error reading tokens from heartbeats") return nil, rows.Err() } - return workerIDs, nil + return workers, nil } func (q *DBJobQueue) DeleteWorker(workerID uuid.UUID) error { diff --git a/pkg/jobqueue/jobqueue.go b/pkg/jobqueue/jobqueue.go index 55c357cab..5c50e874b 100644 --- a/pkg/jobqueue/jobqueue.go +++ b/pkg/jobqueue/jobqueue.go @@ -88,7 +88,7 @@ type JobQueue interface { UpdateWorkerStatus(workerID uuid.UUID) error // Get a list of workers which haven't been updated in the specified time frame - Workers(olderThan time.Duration) ([]uuid.UUID, error) + Workers(olderThan time.Duration) ([]Worker, error) // Deletes the worker DeleteWorker(workerID uuid.UUID) error @@ -115,3 +115,8 @@ var ( ErrActiveJobs = errors.New("worker has active jobs associated with it") ErrWorkerNotExist = errors.New("worker does not exist") ) + +type Worker struct { + ID uuid.UUID + Arch string +}