diff --git a/pkg/jobqueue/dbjobqueue/dbjobqueue.go b/pkg/jobqueue/dbjobqueue/dbjobqueue.go index 0af916077..3aaf382aa 100644 --- a/pkg/jobqueue/dbjobqueue/dbjobqueue.go +++ b/pkg/jobqueue/dbjobqueue/dbjobqueue.go @@ -307,11 +307,6 @@ func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu } func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { - // Return early if the context is already canceled. - if err := ctx.Err(); err != nil { - return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout - } - // add ourselves as a dequeuer c := make(chan struct{}, 1) el := q.dequeuers.pushBack(c) @@ -328,6 +323,9 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels [] break } if err != nil && !errors.Is(err, pgx.ErrNoRows) { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout + } return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error dequeuing job: %v", err) }