From 464ce568b249622be0f497bcdcf82d046f2cbc64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Budai?= Date: Fri, 14 Apr 2023 13:07:15 +0200 Subject: [PATCH] dbjobqueue: put all DequeueByID queries into a transaction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If inserting a heartbeat or querying dependencies fail, we don't want to actually dequeue the job from the database. The failures may be: - context timeout/cancellation - network issues Signed-off-by: Ondřej Budai --- pkg/jobqueue/dbjobqueue/dbjobqueue.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/pkg/jobqueue/dbjobqueue/dbjobqueue.go b/pkg/jobqueue/dbjobqueue/dbjobqueue.go index 8ed2b9fcb..d40368eb4 100644 --- a/pkg/jobqueue/dbjobqueue/dbjobqueue.go +++ b/pkg/jobqueue/dbjobqueue/dbjobqueue.go @@ -386,12 +386,24 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, } defer conn.Release() + tx, err := conn.Begin(ctx) + if err != nil { + return uuid.Nil, nil, "", nil, fmt.Errorf("error starting a new transaction: %w", err) + } + + defer func() { + err = tx.Rollback(context.Background()) + if err != nil && !errors.Is(err, pgx.ErrTxClosed) { + q.logger.Error(err, "Error rolling back dequeuing by id transaction", "job_id", id.String()) + } + }() + var jobType string var args json.RawMessage var started, queued *time.Time token := uuid.New() - err = conn.QueryRow(ctx, sqlDequeueByID, token, id).Scan(&token, &jobType, &args, &queued, &started) + err = tx.QueryRow(ctx, sqlDequeueByID, token, id).Scan(&token, &jobType, &args, &queued, &started) if err == pgx.ErrNoRows { return uuid.Nil, nil, "", nil, jobqueue.ErrNotPending } else if err != nil { @@ -399,16 +411,21 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, } // insert heartbeat - _, err = conn.Exec(ctx, sqlInsertHeartbeat, token, id) + _, err = tx.Exec(ctx, sqlInsertHeartbeat, token, id) if err != nil { return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err) } - dependencies, err := q.jobDependencies(ctx, conn, id) + dependencies, err := q.jobDependencies(ctx, tx, id) if err != nil { return uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err) } + err = tx.Commit(ctx) + if err != nil { + return uuid.Nil, nil, "", nil, fmt.Errorf("error committing a transaction: %w", err) + } + q.logger.Info("Dequeued job", "job_type", jobType, "job_id", id.String(), "job_dependencies", fmt.Sprintf("%+v", dependencies)) return token, dependencies, jobType, args, nil