jobqueue: Query job dependents
This commit is contained in:
parent
099b34b301
commit
0fe3f1b2ae
6 changed files with 64 additions and 16 deletions
|
|
@ -58,7 +58,7 @@ func testDeleteJob(t *testing.T, d db, q *dbjobqueue.DBJobQueue) {
|
|||
err = q.FinishJob(id, res)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, _, r, _, _, _, _, _, err := q.JobStatus(id)
|
||||
_, _, r, _, _, _, _, _, _, err := q.JobStatus(id)
|
||||
require.NoError(t, err)
|
||||
|
||||
var r1 Result
|
||||
|
|
@ -78,7 +78,7 @@ func testDeleteJob(t *testing.T, d db, q *dbjobqueue.DBJobQueue) {
|
|||
require.NoError(t, err)
|
||||
require.Equal(t, int64(1), rows)
|
||||
|
||||
_, _, _, _, _, _, _, _, err = q.JobStatus(id)
|
||||
_, _, _, _, _, _, _, _, _, err = q.JobStatus(id)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -66,6 +66,7 @@ type job struct {
|
|||
Type string `json:"type"`
|
||||
Args json.RawMessage `json:"args,omitempty"`
|
||||
Dependencies []uuid.UUID `json:"dependencies"`
|
||||
Dependents []uuid.UUID `json:"dependents"`
|
||||
Result json.RawMessage `json:"result,omitempty"`
|
||||
Channel string `json:"channel"`
|
||||
|
||||
|
|
@ -151,13 +152,20 @@ func (q *fsJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
|
|||
// Verify dependendencies early, so that the job doesn't get written
|
||||
// when one of them doesn't exist.
|
||||
for _, d := range j.Dependencies {
|
||||
exists, err := q.db.Read(d.String(), nil)
|
||||
var dep job
|
||||
exists, err := q.db.Read(d.String(), &dep)
|
||||
if err != nil {
|
||||
return uuid.Nil, err
|
||||
}
|
||||
if !exists {
|
||||
return uuid.Nil, jobqueue.ErrNotExist
|
||||
}
|
||||
|
||||
dep.Dependents = append(dep.Dependents, j.Id)
|
||||
err = q.db.Write(d.String(), dep)
|
||||
if err != nil {
|
||||
return uuid.Nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Write the job before updating in-memory state, so that the latter
|
||||
|
|
@ -344,7 +352,7 @@ func (q *fsJobQueue) CancelJob(id uuid.UUID) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (q *fsJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) {
|
||||
func (q *fsJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, dependents []uuid.UUID, err error) {
|
||||
j, err := q.readJob(id)
|
||||
if err != nil {
|
||||
return
|
||||
|
|
@ -358,6 +366,7 @@ func (q *fsJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, re
|
|||
finished = j.FinishedAt
|
||||
canceled = j.Canceled
|
||||
deps = j.Dependencies
|
||||
dependents = j.Dependents
|
||||
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -211,7 +211,11 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) {
|
|||
require.ElementsMatch(t, []uuid.UUID{one, two}, r)
|
||||
|
||||
j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two}, "")
|
||||
jobType, _, _, queued, started, finished, canceled, deps, err := q.JobStatus(j)
|
||||
_, _, _, _, _, _, _, _, dependents, err := q.JobStatus(one)
|
||||
require.NoError(t, err)
|
||||
require.ElementsMatch(t, dependents, []uuid.UUID{j})
|
||||
|
||||
jobType, _, _, queued, started, finished, canceled, deps, dependents, err := q.JobStatus(j)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, jobType, "test")
|
||||
require.True(t, !queued.IsZero())
|
||||
|
|
@ -219,10 +223,11 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) {
|
|||
require.True(t, finished.IsZero())
|
||||
require.False(t, canceled)
|
||||
require.ElementsMatch(t, deps, []uuid.UUID{one, two})
|
||||
require.Empty(t, dependents)
|
||||
|
||||
require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two}))
|
||||
|
||||
jobType, _, result, queued, started, finished, canceled, deps, err := q.JobStatus(j)
|
||||
jobType, _, result, queued, started, finished, canceled, deps, dependents, err := q.JobStatus(j)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, jobType, "test")
|
||||
require.True(t, !queued.IsZero())
|
||||
|
|
@ -230,6 +235,7 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) {
|
|||
require.True(t, !finished.IsZero())
|
||||
require.False(t, canceled)
|
||||
require.ElementsMatch(t, deps, []uuid.UUID{one, two})
|
||||
require.Empty(t, dependents)
|
||||
|
||||
err = json.Unmarshal(result, &testResult{})
|
||||
require.NoError(t, err)
|
||||
|
|
@ -240,7 +246,7 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) {
|
|||
two := pushTestJob(t, q, "test", nil, nil, "")
|
||||
|
||||
j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two}, "")
|
||||
jobType, _, _, queued, started, finished, canceled, deps, err := q.JobStatus(j)
|
||||
jobType, _, _, queued, started, finished, canceled, deps, _, err := q.JobStatus(j)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, jobType, "test")
|
||||
require.True(t, !queued.IsZero())
|
||||
|
|
@ -256,7 +262,7 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) {
|
|||
|
||||
require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two}))
|
||||
|
||||
jobType, _, result, queued, started, finished, canceled, deps, err := q.JobStatus(j)
|
||||
jobType, _, result, queued, started, finished, canceled, deps, _, err := q.JobStatus(j)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, jobType, "test")
|
||||
require.True(t, !queued.IsZero())
|
||||
|
|
@ -352,7 +358,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) {
|
|||
require.NotEmpty(t, id)
|
||||
err = q.CancelJob(id)
|
||||
require.NoError(t, err)
|
||||
jobType, _, result, _, _, _, canceled, _, err := q.JobStatus(id)
|
||||
jobType, _, result, _, _, _, canceled, _, _, err := q.JobStatus(id)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, jobType, "clownfish")
|
||||
require.True(t, canceled)
|
||||
|
|
@ -372,7 +378,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) {
|
|||
require.Equal(t, json.RawMessage("null"), args)
|
||||
err = q.CancelJob(id)
|
||||
require.NoError(t, err)
|
||||
jobType, _, result, _, _, _, canceled, _, err = q.JobStatus(id)
|
||||
jobType, _, result, _, _, _, canceled, _, _, err = q.JobStatus(id)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, jobType, "clownfish")
|
||||
require.True(t, canceled)
|
||||
|
|
@ -395,7 +401,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) {
|
|||
err = q.CancelJob(id)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, jobqueue.ErrNotRunning, err)
|
||||
jobType, _, result, _, _, _, canceled, _, err = q.JobStatus(id)
|
||||
jobType, _, result, _, _, _, canceled, _, _, err = q.JobStatus(id)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, jobType, "clownfish")
|
||||
require.False(t, canceled)
|
||||
|
|
@ -612,7 +618,7 @@ func test100dequeuers(t *testing.T, q jobqueue.JobQueue) {
|
|||
// try to do some other operations on the jobqueue
|
||||
id := pushTestJob(t, q, "clownfish", nil, nil, "")
|
||||
|
||||
_, _, _, _, _, _, _, _, err := q.JobStatus(id)
|
||||
_, _, _, _, _, _, _, _, _, err := q.JobStatus(id)
|
||||
require.NoError(t, err)
|
||||
|
||||
finishNextTestJob(t, q, "clownfish", testResult{}, nil)
|
||||
|
|
|
|||
|
|
@ -396,7 +396,7 @@ func (s *Server) AWSEC2ShareJobInfo(id uuid.UUID, result *AWSEC2ShareJobResult)
|
|||
}
|
||||
|
||||
func (s *Server) jobInfo(id uuid.UUID, result interface{}) (*JobInfo, error) {
|
||||
jobType, channel, rawResult, queued, started, finished, canceled, deps, err := s.jobs.JobStatus(id)
|
||||
jobType, channel, rawResult, queued, started, finished, canceled, deps, _, err := s.jobs.JobStatus(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -568,7 +568,7 @@ func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string,
|
|||
// TODO: include type of arguments
|
||||
var result json.RawMessage
|
||||
var finished time.Time
|
||||
_, _, result, _, _, finished, _, _, err = s.jobs.JobStatus(depID)
|
||||
_, _, result, _, _, finished, _, _, _, err = s.jobs.JobStatus(depID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
|
|
|||
|
|
@ -61,6 +61,10 @@ const (
|
|||
SELECT dependency_id
|
||||
FROM job_dependencies
|
||||
WHERE job_id = $1`
|
||||
sqlQueryDependents = `
|
||||
SELECT job_id
|
||||
FROM job_dependencies
|
||||
WHERE dependency_id = $1`
|
||||
|
||||
sqlQueryJob = `
|
||||
SELECT type, args, channel, started_at, finished_at, canceled
|
||||
|
|
@ -462,7 +466,7 @@ func (q *DBJobQueue) CancelJob(id uuid.UUID) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) {
|
||||
func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, dependents []uuid.UUID, err error) {
|
||||
conn, err := q.pool.Acquire(context.Background())
|
||||
if err != nil {
|
||||
return
|
||||
|
|
@ -490,6 +494,11 @@ func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, re
|
|||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
dependents, err = q.jobDependents(context.Background(), conn, id)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -602,3 +611,27 @@ func (q *DBJobQueue) jobDependencies(ctx context.Context, conn *pgxpool.Conn, id
|
|||
|
||||
return dependencies, nil
|
||||
}
|
||||
|
||||
func (q *DBJobQueue) jobDependents(ctx context.Context, conn *pgxpool.Conn, id uuid.UUID) ([]uuid.UUID, error) {
|
||||
rows, err := conn.Query(ctx, sqlQueryDependents, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
dependents := []uuid.UUID{}
|
||||
for rows.Next() {
|
||||
var d uuid.UUID
|
||||
err = rows.Scan(&d)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dependents = append(dependents, d)
|
||||
}
|
||||
if rows.Err() != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return dependents, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -64,7 +64,7 @@ type JobQueue interface {
|
|||
// finished, respectively.
|
||||
//
|
||||
// Lastly, the IDs of the jobs dependencies are returned.
|
||||
JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error)
|
||||
JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, dependents []uuid.UUID, err error)
|
||||
|
||||
// Job returns all the parameters that define a job (everything provided during Enqueue).
|
||||
Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, channel string, err error)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue