dbjobqueue: put all DequeueByID queries into a transaction

If inserting a heartbeat or querying dependencies fail, we don't want to
actually dequeue the job from the database.

The failures may be:
- context timeout/cancellation
- network issues

Signed-off-by: Ondřej Budai <ondrej@budai.cz>
This commit is contained in:
Ondřej Budai 2023-04-14 13:07:15 +02:00 committed by Sanne Raymaekers
parent 571b959cc1
commit 464ce568b2

View file

@ -386,12 +386,24 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID,
}
defer conn.Release()
tx, err := conn.Begin(ctx)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error starting a new transaction: %w", err)
}
defer func() {
err = tx.Rollback(context.Background())
if err != nil && !errors.Is(err, pgx.ErrTxClosed) {
q.logger.Error(err, "Error rolling back dequeuing by id transaction", "job_id", id.String())
}
}()
var jobType string
var args json.RawMessage
var started, queued *time.Time
token := uuid.New()
err = conn.QueryRow(ctx, sqlDequeueByID, token, id).Scan(&token, &jobType, &args, &queued, &started)
err = tx.QueryRow(ctx, sqlDequeueByID, token, id).Scan(&token, &jobType, &args, &queued, &started)
if err == pgx.ErrNoRows {
return uuid.Nil, nil, "", nil, jobqueue.ErrNotPending
} else if err != nil {
@ -399,16 +411,21 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID,
}
// insert heartbeat
_, err = conn.Exec(ctx, sqlInsertHeartbeat, token, id)
_, err = tx.Exec(ctx, sqlInsertHeartbeat, token, id)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err)
}
dependencies, err := q.jobDependencies(ctx, conn, id)
dependencies, err := q.jobDependencies(ctx, tx, id)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err)
}
err = tx.Commit(ctx)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error committing a transaction: %w", err)
}
q.logger.Info("Dequeued job", "job_type", jobType, "job_id", id.String(), "job_dependencies", fmt.Sprintf("%+v", dependencies))
return token, dependencies, jobType, args, nil