dbjobqueue: correct error wrapping

Preserve context.DeadlineExceeded errors through correct error
wrapping. This will reduce error-level logging noise in the worker.
This commit is contained in:
Sanne Raymaekers 2024-07-31 12:05:31 +02:00
parent 769b04b862
commit 09445a1030

View file

@ -399,18 +399,18 @@ func (q *DBJobQueue) dequeueMaybe(ctx context.Context, token, workerID uuid.UUID
if workerID != uuid.Nil {
_, err = tx.Exec(ctx, sqlInsertHeartbeatWithWorker, token, id, workerID)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err)
return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %w", err)
}
} else {
_, err = tx.Exec(ctx, sqlInsertHeartbeat, token, id)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err)
return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %w", err)
}
}
dependencies, err := q.jobDependencies(ctx, tx, id)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err)
return uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %w", err)
}
err = tx.Commit(ctx)
@ -431,7 +431,7 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id, workerID uuid.UUID) (u
conn, err := q.pool.Acquire(ctx)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error connecting to database: %v", err)
return uuid.Nil, nil, "", nil, fmt.Errorf("error connecting to database: %w", err)
}
defer conn.Release()
@ -456,25 +456,25 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id, workerID uuid.UUID) (u
if err == pgx.ErrNoRows {
return uuid.Nil, nil, "", nil, jobqueue.ErrNotPending
} else if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error dequeuing job: %v", err)
return uuid.Nil, nil, "", nil, fmt.Errorf("error dequeuing job: %w", err)
}
// insert heartbeat
if workerID != uuid.Nil {
_, err = tx.Exec(ctx, sqlInsertHeartbeatWithWorker, token, id, workerID)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err)
return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %w", err)
}
} else {
_, err = tx.Exec(ctx, sqlInsertHeartbeat, token, id)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err)
return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %w", err)
}
}
dependencies, err := q.jobDependencies(ctx, tx, id)
if err != nil {
return uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err)
return uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %w", err)
}
err = tx.Commit(ctx)
@ -490,13 +490,13 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id, workerID uuid.UUID) (u
func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result interface{}) error {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return fmt.Errorf("error connecting to database: %v", err)
return fmt.Errorf("error connecting to database: %w", err)
}
defer conn.Release()
tx, err := conn.Begin(context.Background())
if err != nil {
return fmt.Errorf("error starting database transaction: %v", err)
return fmt.Errorf("error starting database transaction: %w", err)
}
defer func() {
err = tx.Rollback(context.Background())
@ -525,7 +525,7 @@ func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result
// Remove from heartbeats if token is null
tag, err := tx.Exec(context.Background(), sqlDeleteHeartbeat, id)
if err != nil {
return fmt.Errorf("error removing job %s from heartbeats: %v", id, err)
return fmt.Errorf("error removing job %s from heartbeats: %w", id, err)
}
if tag.RowsAffected() != 1 {
@ -538,12 +538,12 @@ func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result
return jobqueue.ErrNotExist
}
if err != nil {
return fmt.Errorf("error finishing job %s: %v", id, err)
return fmt.Errorf("error finishing job %s: %w", id, err)
}
} else {
tag, err = tx.Exec(context.Background(), sqlRequeue, id)
if err != nil {
return fmt.Errorf("error requeueing job %s: %v", id, err)
return fmt.Errorf("error requeueing job %s: %w", id, err)
}
if tag.RowsAffected() != 1 {
@ -553,12 +553,12 @@ func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result
_, err = tx.Exec(context.Background(), sqlNotify)
if err != nil {
return fmt.Errorf("error notifying jobs channel: %v", err)
return fmt.Errorf("error notifying jobs channel: %w", err)
}
err = tx.Commit(context.Background())
if err != nil {
return fmt.Errorf("unable to commit database transaction: %v", err)
return fmt.Errorf("unable to commit database transaction: %w", err)
}
if retries >= maxRetries {
@ -572,7 +572,7 @@ func (q *DBJobQueue) RequeueOrFinishJob(id uuid.UUID, maxRetries uint64, result
func (q *DBJobQueue) CancelJob(id uuid.UUID) error {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return fmt.Errorf("error connecting to database: %v", err)
return fmt.Errorf("error connecting to database: %w", err)
}
defer conn.Release()
@ -583,7 +583,7 @@ func (q *DBJobQueue) CancelJob(id uuid.UUID) error {
return jobqueue.ErrNotRunning
}
if err != nil {
return fmt.Errorf("error canceling job %s: %v", id, err)
return fmt.Errorf("error canceling job %s: %w", id, err)
}
q.logger.Info("Cancelled job", "job_type", jobType, "job_id", id.String())
@ -651,7 +651,7 @@ func (q *DBJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, de
func (q *DBJobQueue) IdFromToken(token uuid.UUID) (id uuid.UUID, err error) {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return uuid.Nil, fmt.Errorf("error establishing connection: %v", err)
return uuid.Nil, fmt.Errorf("error establishing connection: %w", err)
}
defer conn.Release()
@ -659,7 +659,7 @@ func (q *DBJobQueue) IdFromToken(token uuid.UUID) (id uuid.UUID, err error) {
if err == pgx.ErrNoRows {
return uuid.Nil, jobqueue.ErrNotExist
} else if err != nil {
return uuid.Nil, fmt.Errorf("Error retrieving id: %v", err)
return uuid.Nil, fmt.Errorf("Error retrieving id: %w", err)
}
return