From 0fe3f1b2aea8e4e6d5de618d96494d16b76e6e68 Mon Sep 17 00:00:00 2001 From: Sanne Raymaekers Date: Tue, 26 Jul 2022 14:13:58 +0200 Subject: [PATCH] jobqueue: Query job dependents --- cmd/osbuild-service-maintenance/db_test.go | 4 +-- internal/jobqueue/fsjobqueue/fsjobqueue.go | 13 +++++-- .../jobqueue/jobqueuetest/jobqueuetest.go | 22 +++++++----- internal/worker/server.go | 4 +-- pkg/jobqueue/dbjobqueue/dbjobqueue.go | 35 ++++++++++++++++++- pkg/jobqueue/jobqueue.go | 2 +- 6 files changed, 64 insertions(+), 16 deletions(-) diff --git a/cmd/osbuild-service-maintenance/db_test.go b/cmd/osbuild-service-maintenance/db_test.go index 762d4d7d4..e1861111c 100644 --- a/cmd/osbuild-service-maintenance/db_test.go +++ b/cmd/osbuild-service-maintenance/db_test.go @@ -58,7 +58,7 @@ func testDeleteJob(t *testing.T, d db, q *dbjobqueue.DBJobQueue) { err = q.FinishJob(id, res) require.NoError(t, err) - _, _, r, _, _, _, _, _, err := q.JobStatus(id) + _, _, r, _, _, _, _, _, _, err := q.JobStatus(id) require.NoError(t, err) var r1 Result @@ -78,7 +78,7 @@ func testDeleteJob(t *testing.T, d db, q *dbjobqueue.DBJobQueue) { require.NoError(t, err) require.Equal(t, int64(1), rows) - _, _, _, _, _, _, _, _, err = q.JobStatus(id) + _, _, _, _, _, _, _, _, _, err = q.JobStatus(id) require.Error(t, err) } diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index b2c3fa464..da187b8c6 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -66,6 +66,7 @@ type job struct { Type string `json:"type"` Args json.RawMessage `json:"args,omitempty"` Dependencies []uuid.UUID `json:"dependencies"` + Dependents []uuid.UUID `json:"dependents"` Result json.RawMessage `json:"result,omitempty"` Channel string `json:"channel"` @@ -151,13 +152,20 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu // Verify dependendencies early, so that the job doesn't get written // when one of them doesn't exist. for _, d := range j.Dependencies { - exists, err := q.db.Read(d.String(), nil) + var dep job + exists, err := q.db.Read(d.String(), &dep) if err != nil { return uuid.Nil, err } if !exists { return uuid.Nil, jobqueue.ErrNotExist } + + dep.Dependents = append(dep.Dependents, j.Id) + err = q.db.Write(d.String(), dep) + if err != nil { + return uuid.Nil, err + } } // Write the job before updating in-memory state, so that the latter @@ -344,7 +352,7 @@ func (q *fsJobQueue) CancelJob(id uuid.UUID) error { return nil } -func (q *fsJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) { +func (q *fsJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, dependents []uuid.UUID, err error) { j, err := q.readJob(id) if err != nil { return @@ -358,6 +366,7 @@ func (q *fsJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, re finished = j.FinishedAt canceled = j.Canceled deps = j.Dependencies + dependents = j.Dependents return } diff --git a/internal/jobqueue/jobqueuetest/jobqueuetest.go b/internal/jobqueue/jobqueuetest/jobqueuetest.go index b589aff6d..8fa6d775d 100644 --- a/internal/jobqueue/jobqueuetest/jobqueuetest.go +++ b/internal/jobqueue/jobqueuetest/jobqueuetest.go @@ -211,7 +211,11 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) { require.ElementsMatch(t, []uuid.UUID{one, two}, r) j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two}, "") - jobType, _, _, queued, started, finished, canceled, deps, err := q.JobStatus(j) + _, _, _, _, _, _, _, _, dependents, err := q.JobStatus(one) + require.NoError(t, err) + require.ElementsMatch(t, dependents, []uuid.UUID{j}) + + jobType, _, _, queued, started, finished, canceled, deps, dependents, err := q.JobStatus(j) require.NoError(t, err) require.Equal(t, jobType, "test") require.True(t, !queued.IsZero()) @@ -219,10 +223,11 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) { require.True(t, finished.IsZero()) require.False(t, canceled) require.ElementsMatch(t, deps, []uuid.UUID{one, two}) + require.Empty(t, dependents) require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two})) - jobType, _, result, queued, started, finished, canceled, deps, err := q.JobStatus(j) + jobType, _, result, queued, started, finished, canceled, deps, dependents, err := q.JobStatus(j) require.NoError(t, err) require.Equal(t, jobType, "test") require.True(t, !queued.IsZero()) @@ -230,6 +235,7 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) { require.True(t, !finished.IsZero()) require.False(t, canceled) require.ElementsMatch(t, deps, []uuid.UUID{one, two}) + require.Empty(t, dependents) err = json.Unmarshal(result, &testResult{}) require.NoError(t, err) @@ -240,7 +246,7 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) { two := pushTestJob(t, q, "test", nil, nil, "") j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two}, "") - jobType, _, _, queued, started, finished, canceled, deps, err := q.JobStatus(j) + jobType, _, _, queued, started, finished, canceled, deps, _, err := q.JobStatus(j) require.NoError(t, err) require.Equal(t, jobType, "test") require.True(t, !queued.IsZero()) @@ -256,7 +262,7 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two})) - jobType, _, result, queued, started, finished, canceled, deps, err := q.JobStatus(j) + jobType, _, result, queued, started, finished, canceled, deps, _, err := q.JobStatus(j) require.NoError(t, err) require.Equal(t, jobType, "test") require.True(t, !queued.IsZero()) @@ -352,7 +358,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { require.NotEmpty(t, id) err = q.CancelJob(id) require.NoError(t, err) - jobType, _, result, _, _, _, canceled, _, err := q.JobStatus(id) + jobType, _, result, _, _, _, canceled, _, _, err := q.JobStatus(id) require.NoError(t, err) require.Equal(t, jobType, "clownfish") require.True(t, canceled) @@ -372,7 +378,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, json.RawMessage("null"), args) err = q.CancelJob(id) require.NoError(t, err) - jobType, _, result, _, _, _, canceled, _, err = q.JobStatus(id) + jobType, _, result, _, _, _, canceled, _, _, err = q.JobStatus(id) require.NoError(t, err) require.Equal(t, jobType, "clownfish") require.True(t, canceled) @@ -395,7 +401,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { err = q.CancelJob(id) require.Error(t, err) require.Equal(t, jobqueue.ErrNotRunning, err) - jobType, _, result, _, _, _, canceled, _, err = q.JobStatus(id) + jobType, _, result, _, _, _, canceled, _, _, err = q.JobStatus(id) require.NoError(t, err) require.Equal(t, jobType, "clownfish") require.False(t, canceled) @@ -612,7 +618,7 @@ func test100dequeuers(t *testing.T, q jobqueue.JobQueue) { // try to do some other operations on the jobqueue id := pushTestJob(t, q, "clownfish", nil, nil, "") - _, _, _, _, _, _, _, _, err := q.JobStatus(id) + _, _, _, _, _, _, _, _, _, err := q.JobStatus(id) require.NoError(t, err) finishNextTestJob(t, q, "clownfish", testResult{}, nil) diff --git a/internal/worker/server.go b/internal/worker/server.go index 2ac477511..d08e17a01 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -396,7 +396,7 @@ func (s *Server) AWSEC2ShareJobInfo(id uuid.UUID, result *AWSEC2ShareJobResult) } func (s *Server) jobInfo(id uuid.UUID, result interface{}) (*JobInfo, error) { - jobType, channel, rawResult, queued, started, finished, canceled, deps, err := s.jobs.JobStatus(id) + jobType, channel, rawResult, queued, started, finished, canceled, deps, _, err := s.jobs.JobStatus(id) if err != nil { return nil, err } @@ -568,7 +568,7 @@ func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, // TODO: include type of arguments var result json.RawMessage var finished time.Time - _, _, result, _, _, finished, _, _, err = s.jobs.JobStatus(depID) + _, _, result, _, _, finished, _, _, _, err = s.jobs.JobStatus(depID) if err != nil { return } diff --git a/pkg/jobqueue/dbjobqueue/dbjobqueue.go b/pkg/jobqueue/dbjobqueue/dbjobqueue.go index 25231224f..e59d4f629 100644 --- a/pkg/jobqueue/dbjobqueue/dbjobqueue.go +++ b/pkg/jobqueue/dbjobqueue/dbjobqueue.go @@ -61,6 +61,10 @@ const ( SELECT dependency_id FROM job_dependencies WHERE job_id = $1` + sqlQueryDependents = ` + SELECT job_id + FROM job_dependencies + WHERE dependency_id = $1` sqlQueryJob = ` SELECT type, args, channel, started_at, finished_at, canceled @@ -462,7 +466,7 @@ func (q *DBJobQueue) CancelJob(id uuid.UUID) error { return nil } -func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) { +func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, dependents []uuid.UUID, err error) { conn, err := q.pool.Acquire(context.Background()) if err != nil { return @@ -490,6 +494,11 @@ func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, re if err != nil { return } + + dependents, err = q.jobDependents(context.Background(), conn, id) + if err != nil { + return + } return } @@ -602,3 +611,27 @@ func (q *DBJobQueue) jobDependencies(ctx context.Context, conn *pgxpool.Conn, id return dependencies, nil } + +func (q *DBJobQueue) jobDependents(ctx context.Context, conn *pgxpool.Conn, id uuid.UUID) ([]uuid.UUID, error) { + rows, err := conn.Query(ctx, sqlQueryDependents, id) + if err != nil { + return nil, err + } + defer rows.Close() + + dependents := []uuid.UUID{} + for rows.Next() { + var d uuid.UUID + err = rows.Scan(&d) + if err != nil { + return nil, err + } + + dependents = append(dependents, d) + } + if rows.Err() != nil { + return nil, err + } + + return dependents, nil +} diff --git a/pkg/jobqueue/jobqueue.go b/pkg/jobqueue/jobqueue.go index d23133728..ae8fad879 100644 --- a/pkg/jobqueue/jobqueue.go +++ b/pkg/jobqueue/jobqueue.go @@ -64,7 +64,7 @@ type JobQueue interface { // finished, respectively. // // Lastly, the IDs of the jobs dependencies are returned. - JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) + JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, dependents []uuid.UUID, err error) // Job returns all the parameters that define a job (everything provided during Enqueue). Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, channel string, err error)