diff --git a/internal/cloudapi/v2/handler.go b/internal/cloudapi/v2/handler.go index 7da0bf6d0..3232c001e 100644 --- a/internal/cloudapi/v2/handler.go +++ b/internal/cloudapi/v2/handler.go @@ -307,7 +307,7 @@ func (h *apiHandlers) targetResultToUploadStatus(jobId uuid.UUID, t *target.Targ // GetComposeList returns a list of the root job UUIDs func (h *apiHandlers) GetComposeList(ctx echo.Context) error { - jobs, err := h.server.workers.AllRootJobIDs() + jobs, err := h.server.workers.AllRootJobIDs(ctx.Request().Context()) if err != nil { return HTTPErrorWithInternal(ErrorGettingComposeList, err) } diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index 733fb649d..f10e82e74 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -705,7 +705,7 @@ func jobMatchesCriteria(j *job, acceptedJobTypes []string, acceptedChannels []st // AllRootJobIDs Return a list of all the top level(root) job uuids // This only includes jobs without any Dependents set -func (q *fsJobQueue) AllRootJobIDs() ([]uuid.UUID, error) { +func (q *fsJobQueue) AllRootJobIDs(_ context.Context) ([]uuid.UUID, error) { ids, err := q.db.List() if err != nil { return nil, err diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go index e94f4e817..ee0f68e0c 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go @@ -1,6 +1,7 @@ package fsjobqueue_test import ( + "context" "os" "path" "sort" @@ -82,7 +83,7 @@ func TestAllRootJobIDs(t *testing.T) { rootJobs = append(rootJobs, jidRoot3) sortUUIDs(rootJobs) - roots, err := q.AllRootJobIDs() + roots, err := q.AllRootJobIDs(context.TODO()) require.Nil(t, err) require.Greater(t, len(roots), 0) sortUUIDs(roots) @@ -101,7 +102,7 @@ func TestDeleteJob(t *testing.T) { err = q.DeleteJob(context.TODO(), jidRoot1) require.Nil(t, err) - jobs, err := q.AllRootJobIDs() + jobs, err := q.AllRootJobIDs(context.TODO()) require.Nil(t, err) require.Equal(t, 0, len(jobs)) @@ -122,7 +123,7 @@ func TestDeleteJob(t *testing.T) { // This should only remove jidRoot2 and jid2, leaving jidRoot3, jid1, jid3 err = q.DeleteJob(context.TODO(), jidRoot2) require.Nil(t, err) - jobs, err = q.AllRootJobIDs() + jobs, err = q.AllRootJobIDs(context.TODO()) require.Nil(t, err) require.Equal(t, 1, len(jobs)) assert.Equal(t, []uuid.UUID{jidRoot3}, jobs) @@ -130,7 +131,7 @@ func TestDeleteJob(t *testing.T) { // This should remove the rest err = q.DeleteJob(context.TODO(), jidRoot3) require.Nil(t, err) - jobs, err = q.AllRootJobIDs() + jobs, err = q.AllRootJobIDs(context.TODO()) require.Nil(t, err) require.Equal(t, 0, len(jobs)) @@ -156,7 +157,7 @@ func TestDeleteJob(t *testing.T) { // Delete the koji job err = q.DeleteJob(context.TODO(), kojiRoot) require.Nil(t, err) - jobs, err = q.AllRootJobIDs() + jobs, err = q.AllRootJobIDs(context.TODO()) require.Nil(t, err) require.Equal(t, 0, len(jobs)) diff --git a/internal/worker/server.go b/internal/worker/server.go index 134c470f6..eb4ac04b3 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -349,8 +349,8 @@ func (s *Server) JobDependencyChainErrors(id uuid.UUID) (*clienterrors.Error, er } // AllRootJobIDs returns a list of top level job UUIDs that the worker knows about -func (s *Server) AllRootJobIDs() ([]uuid.UUID, error) { - return s.jobs.AllRootJobIDs() +func (s *Server) AllRootJobIDs(ctx context.Context) ([]uuid.UUID, error) { + return s.jobs.AllRootJobIDs(ctx) } func (s *Server) OSBuildJobInfo(id uuid.UUID, result *OSBuildJobResult) (*JobInfo, error) { diff --git a/pkg/jobqueue/dbjobqueue/dbjobqueue.go b/pkg/jobqueue/dbjobqueue/dbjobqueue.go index 742ee5a55..2ff0a3e2d 100644 --- a/pkg/jobqueue/dbjobqueue/dbjobqueue.go +++ b/pkg/jobqueue/dbjobqueue/dbjobqueue.go @@ -71,6 +71,8 @@ const ( FROM job_dependencies WHERE dependency_id = $1` + sqlQueryListJobs = ` + SELECT id from jobs` sqlQueryJob = ` SELECT type, args, channel, started_at, finished_at, retries, canceled FROM jobs @@ -904,11 +906,58 @@ func (q *DBJobQueue) jobDependents(ctx context.Context, conn connection, id uuid return dependents, nil } -// AllRootJobIDs returns a list of top level job UUIDs that the worker knows about -func (q *DBJobQueue) AllRootJobIDs() ([]uuid.UUID, error) { - // TODO write this +// listJobs returns a list of all of the job UUIDs +func (q *DBJobQueue) listJobs(ctx context.Context, conn connection) (jobs []uuid.UUID, err error) { + rows, err := conn.Query(ctx, sqlQueryListJobs) + if err != nil { + return + } + defer rows.Close() - return nil, nil + for rows.Next() { + var t uuid.UUID + err = rows.Scan(&t) + if err != nil { + // Log the error and try to continue with the next row + q.logger.Error(err, "Unable to read job uuid from jobs") + continue + } + jobs = append(jobs, t) + } + if rows.Err() != nil { + q.logger.Error(rows.Err(), "Error reading job uuids from jobs") + } + + return +} + +// AllRootJobIDs returns a list of top level job UUIDs that the worker knows about +func (q *DBJobQueue) AllRootJobIDs(ctx context.Context) (rootJobs []uuid.UUID, err error) { + conn, err := q.pool.Acquire(ctx) + if err != nil { + return + } + defer conn.Release() + + var jobs []uuid.UUID + jobs, err = q.listJobs(ctx, conn) + if err != nil { + return + } + + for _, id := range jobs { + var dependents []uuid.UUID + dependents, err = q.jobDependents(ctx, conn, id) + if err != nil { + return + } + + if len(dependents) == 0 { + rootJobs = append(rootJobs, id) + } + } + + return } // DeleteJob deletes a job from the database diff --git a/pkg/jobqueue/jobqueue.go b/pkg/jobqueue/jobqueue.go index 549274613..323b9bf05 100644 --- a/pkg/jobqueue/jobqueue.go +++ b/pkg/jobqueue/jobqueue.go @@ -97,7 +97,7 @@ type JobQueue interface { DeleteWorker(workerID uuid.UUID) error // AllRootJobIDs returns a list of top level job UUIDs that the worker knows about - AllRootJobIDs() ([]uuid.UUID, error) + AllRootJobIDs(context.Context) ([]uuid.UUID, error) // DeleteJob deletes a job and all of its dependencies DeleteJob(context.Context, uuid.UUID) error