From 09445a1030488107ba72b4349b8738597f1f7a85 Mon Sep 17 00:00:00 2001 From: Sanne Raymaekers Date: Wed, 31 Jul 2024 12:05:31 +0200 Subject: [PATCH] dbjobqueue: correct error wrapping Preserve context.DeadlineExceeded errors through correct error wrapping. This will reduce error-level logging noise in the worker. --- pkg/jobqueue/dbjobqueue/dbjobqueue.go | 38 +++++++++++++-------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/pkg/jobqueue/dbjobqueue/dbjobqueue.go b/pkg/jobqueue/dbjobqueue/dbjobqueue.go index af33d5660..81135e66b 100644 --- a/pkg/jobqueue/dbjobqueue/dbjobqueue.go +++ b/pkg/jobqueue/dbjobqueue/dbjobqueue.go @@ -399,18 +399,18 @@ func (q *DBJobQueue) dequeueMaybe(ctx context.Context, token, workerID uuid.UUID if workerID != uuid.Nil { _, err = tx.Exec(ctx, sqlInsertHeartbeatWithWorker, token, id, workerID) if err != nil { - return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err) + return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %w", err) } } else { _, err = tx.Exec(ctx, sqlInsertHeartbeat, token, id) if err != nil { - return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err) + return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %w", err) } } dependencies, err := q.jobDependencies(ctx, tx, id) if err != nil { - return uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err) + return uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %w", err) } err = tx.Commit(ctx) @@ -431,7 +431,7 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id, workerID uuid.UUID) (u conn, err := q.pool.Acquire(ctx) if err != nil { - return uuid.Nil, nil, "", nil, fmt.Errorf("error connecting to database: %v", err) + return uuid.Nil, nil, "", nil, fmt.Errorf("error connecting to database: %w", err) } defer conn.Release() @@ -456,25 +456,25 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id, workerID uuid.UUID) (u if err == pgx.ErrNoRows { return uuid.Nil, nil, "", nil, jobqueue.ErrNotPending } else if err != nil { - return uuid.Nil, nil, "", nil, fmt.Errorf("error dequeuing job: %v", err) + return uuid.Nil, nil, "", nil, fmt.Errorf("error dequeuing job: %w", err) } // insert heartbeat if workerID != uuid.Nil { _, err = tx.Exec(ctx, sqlInsertHeartbeatWithWorker, token, id, workerID) if err != nil { - return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err) + return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %w", err) } } else { _, err = tx.Exec(ctx, sqlInsertHeartbeat, token, id) if err != nil { - return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err) + return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %w", err) } } dependencies, err := q.jobDependencies(ctx, tx, id) if err != nil { - return uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err) + return uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %w", err) } err = tx.Commit(ctx) @@ -490,13 +490,13 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id, workerID uuid.UUID) (u func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) error { conn, err := q.pool.Acquire(context.Background()) if err != nil { - return fmt.Errorf("error connecting to database: %v", err) + return fmt.Errorf("error connecting to database: %w", err) } defer conn.Release() tx, err := conn.Begin(context.Background()) if err != nil { - return fmt.Errorf("error starting database transaction: %v", err) + return fmt.Errorf("error starting database transaction: %w", err) } defer func() { err = tx.Rollback(context.Background()) @@ -525,7 +525,7 @@ func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result // Remove from heartbeats if token is null tag, err := tx.Exec(context.Background(), sqlDeleteHeartbeat, id) if err != nil { - return fmt.Errorf("error removing job %s from heartbeats: %v", id, err) + return fmt.Errorf("error removing job %s from heartbeats: %w", id, err) } if tag.RowsAffected() != 1 { @@ -538,12 +538,12 @@ func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result return jobqueue.ErrNotExist } if err != nil { - return fmt.Errorf("error finishing job %s: %v", id, err) + return fmt.Errorf("error finishing job %s: %w", id, err) } } else { tag, err = tx.Exec(context.Background(), sqlRequeue, id) if err != nil { - return fmt.Errorf("error requeueing job %s: %v", id, err) + return fmt.Errorf("error requeueing job %s: %w", id, err) } if tag.RowsAffected() != 1 { @@ -553,12 +553,12 @@ func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result _, err = tx.Exec(context.Background(), sqlNotify) if err != nil { - return fmt.Errorf("error notifying jobs channel: %v", err) + return fmt.Errorf("error notifying jobs channel: %w", err) } err = tx.Commit(context.Background()) if err != nil { - return fmt.Errorf("unable to commit database transaction: %v", err) + return fmt.Errorf("unable to commit database transaction: %w", err) } if retries >= maxRetries { @@ -572,7 +572,7 @@ func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result func (q *DBJobQueue) CancelJob(id uuid.UUID) error { conn, err := q.pool.Acquire(context.Background()) if err != nil { - return fmt.Errorf("error connecting to database: %v", err) + return fmt.Errorf("error connecting to database: %w", err) } defer conn.Release() @@ -583,7 +583,7 @@ func (q *DBJobQueue) CancelJob(id uuid.UUID) error { return jobqueue.ErrNotRunning } if err != nil { - return fmt.Errorf("error canceling job %s: %v", id, err) + return fmt.Errorf("error canceling job %s: %w", id, err) } q.logger.Info("Cancelled job", "job_type", jobType, "job_id", id.String()) @@ -651,7 +651,7 @@ func (q *DBJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, de func (q *DBJobQueue) IdFromToken(token uuid.UUID) (id uuid.UUID, err error) { conn, err := q.pool.Acquire(context.Background()) if err != nil { - return uuid.Nil, fmt.Errorf("error establishing connection: %v", err) + return uuid.Nil, fmt.Errorf("error establishing connection: %w", err) } defer conn.Release() @@ -659,7 +659,7 @@ func (q *DBJobQueue) IdFromToken(token uuid.UUID) (id uuid.UUID, err error) { if err == pgx.ErrNoRows { return uuid.Nil, jobqueue.ErrNotExist } else if err != nil { - return uuid.Nil, fmt.Errorf("Error retrieving id: %v", err) + return uuid.Nil, fmt.Errorf("Error retrieving id: %w", err) } return