dbjobqueue: put all SQL queries in dequeueMaybe into a transaction
This is needed to ensure atomicity of the whole dequeue operation. Signed-off-by: Ondřej Budai <ondrej@budai.cz>
This commit is contained in:
parent
c3f6baad7f
commit
7edbaf6b43
1 changed files with 20 additions and 3 deletions
|
|
@ -351,10 +351,22 @@ func (q *DBJobQueue) dequeueMaybe(ctx context.Context, token uuid.UUID, jobTypes
|
|||
}
|
||||
defer conn.Release()
|
||||
|
||||
tx, err := conn.Begin(ctx)
|
||||
if err != nil {
|
||||
return uuid.Nil, nil, "", nil, fmt.Errorf("error starting a new transaction when dequeueing: %w", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
err = tx.Rollback(context.Background())
|
||||
if err != nil && !errors.Is(err, pgx.ErrTxClosed) {
|
||||
q.logger.Error(err, "Error rolling back dequeuing transaction")
|
||||
}
|
||||
}()
|
||||
|
||||
var id uuid.UUID
|
||||
var jobType string
|
||||
var args json.RawMessage
|
||||
err = conn.QueryRow(ctx, sqlDequeue, token, jobTypes, channels).Scan(&id, &jobType, &args)
|
||||
err = tx.QueryRow(ctx, sqlDequeue, token, jobTypes, channels).Scan(&id, &jobType, &args)
|
||||
|
||||
// skip the rest of the dequeueing operation if there are no rows
|
||||
if err != nil && errors.Is(err, pgx.ErrNoRows) {
|
||||
|
|
@ -362,16 +374,21 @@ func (q *DBJobQueue) dequeueMaybe(ctx context.Context, token uuid.UUID, jobTypes
|
|||
}
|
||||
|
||||
// insert heartbeat
|
||||
_, err = conn.Exec(ctx, sqlInsertHeartbeat, token, id)
|
||||
_, err = tx.Exec(ctx, sqlInsertHeartbeat, token, id)
|
||||
if err != nil {
|
||||
return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err)
|
||||
}
|
||||
|
||||
dependencies, err := q.jobDependencies(ctx, conn, id)
|
||||
dependencies, err := q.jobDependencies(ctx, tx, id)
|
||||
if err != nil {
|
||||
return uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err)
|
||||
}
|
||||
|
||||
err = tx.Commit(ctx)
|
||||
if err != nil {
|
||||
return uuid.Nil, nil, "", nil, fmt.Errorf("error committing the transaction for dequeueing job %s: %w", id.String(), err)
|
||||
}
|
||||
|
||||
q.logger.Info("Dequeued job", "job_type", jobType, "job_id", id.String(), "job_dependencies", fmt.Sprintf("%+v", dependencies))
|
||||
|
||||
return id, dependencies, jobType, args, nil
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue