From 7edbaf6b4333922bbdad9a4c0922d6352599b7a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Budai?= Date: Fri, 14 Apr 2023 13:15:08 +0200 Subject: [PATCH] dbjobqueue: put all SQL queries in dequeueMaybe into a transaction MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is needed to ensure atomicity of the whole dequeue operation. Signed-off-by: Ondřej Budai --- pkg/jobqueue/dbjobqueue/dbjobqueue.go | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/pkg/jobqueue/dbjobqueue/dbjobqueue.go b/pkg/jobqueue/dbjobqueue/dbjobqueue.go index 004658354..8437301a9 100644 --- a/pkg/jobqueue/dbjobqueue/dbjobqueue.go +++ b/pkg/jobqueue/dbjobqueue/dbjobqueue.go @@ -351,10 +351,22 @@ func (q *DBJobQueue) dequeueMaybe(ctx context.Context, token uuid.UUID, jobTypes } defer conn.Release() + tx, err := conn.Begin(ctx) + if err != nil { + return uuid.Nil, nil, "", nil, fmt.Errorf("error starting a new transaction when dequeueing: %w", err) + } + + defer func() { + err = tx.Rollback(context.Background()) + if err != nil && !errors.Is(err, pgx.ErrTxClosed) { + q.logger.Error(err, "Error rolling back dequeuing transaction") + } + }() + var id uuid.UUID var jobType string var args json.RawMessage - err = conn.QueryRow(ctx, sqlDequeue, token, jobTypes, channels).Scan(&id, &jobType, &args) + err = tx.QueryRow(ctx, sqlDequeue, token, jobTypes, channels).Scan(&id, &jobType, &args) // skip the rest of the dequeueing operation if there are no rows if err != nil && errors.Is(err, pgx.ErrNoRows) { @@ -362,16 +374,21 @@ func (q *DBJobQueue) dequeueMaybe(ctx context.Context, token uuid.UUID, jobTypes } // insert heartbeat - _, err = conn.Exec(ctx, sqlInsertHeartbeat, token, id) + _, err = tx.Exec(ctx, sqlInsertHeartbeat, token, id) if err != nil { return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err) } - dependencies, err := q.jobDependencies(ctx, conn, id) + dependencies, err := q.jobDependencies(ctx, tx, id) if err != nil { return uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err) } + err = tx.Commit(ctx) + if err != nil { + return uuid.Nil, nil, "", nil, fmt.Errorf("error committing the transaction for dequeueing job %s: %w", id.String(), err) + } + q.logger.Info("Dequeued job", "job_type", jobType, "job_id", id.String(), "job_dependencies", fmt.Sprintf("%+v", dependencies)) return id, dependencies, jobType, args, nil