dbjoqbqueue: actually use the transaction object when a tx is created

Transactions are tied to a connection so this is actually not a functional
change. Nevertheless, I think it's nice to explicitly state that we are
using a transaction.

Signed-off-by: Ondřej Budai <ondrej@budai.cz>
This commit is contained in:
Ondřej Budai 2022-03-18 10:46:04 +01:00 committed by Ondřej Budai
parent 187eb188da
commit e9ce9370c6

View file

@ -259,19 +259,19 @@ func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
}()
id := uuid.New()
_, err = conn.Exec(context.Background(), sqlEnqueue, id, jobType, args, channel)
_, err = tx.Exec(context.Background(), sqlEnqueue, id, jobType, args, channel)
if err != nil {
return uuid.Nil, fmt.Errorf("error enqueuing job: %v", err)
}
for _, d := range dependencies {
_, err = conn.Exec(context.Background(), sqlInsertDependency, id, d)
_, err = tx.Exec(context.Background(), sqlInsertDependency, id, d)
if err != nil {
return uuid.Nil, fmt.Errorf("error inserting dependency: %v", err)
}
}
_, err = conn.Exec(context.Background(), sqlNotify)
_, err = tx.Exec(context.Background(), sqlNotify)
if err != nil {
return uuid.Nil, fmt.Errorf("error notifying jobs channel: %v", err)
}
@ -418,7 +418,7 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
var started, finished *time.Time
var jobType string
canceled := false
err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, nil, nil, &started, &finished, &canceled)
err = tx.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, nil, nil, &started, &finished, &canceled)
if err == pgx.ErrNoRows {
return jobqueue.ErrNotExist
}
@ -430,7 +430,7 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
}
// Remove from heartbeats
tag, err := conn.Exec(context.Background(), sqlDeleteHeartbeat, id)
tag, err := tx.Exec(context.Background(), sqlDeleteHeartbeat, id)
if err != nil {
return fmt.Errorf("error finishing job %s: %v", id, err)
}
@ -439,7 +439,7 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
return jobqueue.ErrNotExist
}
err = conn.QueryRow(context.Background(), sqlFinishJob, result, id).Scan(&finished)
err = tx.QueryRow(context.Background(), sqlFinishJob, result, id).Scan(&finished)
if err == pgx.ErrNoRows {
return jobqueue.ErrNotExist
@ -677,7 +677,7 @@ func (q *DBJobQueue) DeleteJobIncludingDependencies(jobId uuid.UUID) error {
}
}()
rows, err := conn.Query(context.Background(), sqlQueryDepedenciesRecursively, jobId)
rows, err := tx.Query(context.Background(), sqlQueryDepedenciesRecursively, jobId)
if err != nil {
return fmt.Errorf("error querying the job's dependencies: %v", err)
}
@ -693,13 +693,13 @@ func (q *DBJobQueue) DeleteJobIncludingDependencies(jobId uuid.UUID) error {
dependencies = append(dependencies, dep)
}
depTag, err := conn.Exec(context.Background(), sqlDeleteJobDependencies, dependencies)
depTag, err := tx.Exec(context.Background(), sqlDeleteJobDependencies, dependencies)
if err != nil {
return fmt.Errorf("Error removing from dependencies recursively for job %v: %v", jobId, err)
}
jobAndDependencies := append(dependencies, jobId)
jobsTag, err := conn.Exec(context.Background(), sqlDeleteJobs, jobAndDependencies)
jobsTag, err := tx.Exec(context.Background(), sqlDeleteJobs, jobAndDependencies)
if err != nil {
return fmt.Errorf("Error removing from jobs recursively for job %v: %v", jobId, err)
}