From 0a9cf9b6a7be5ae4312447eea75a49bc69ed2c48 Mon Sep 17 00:00:00 2001 From: Sanne Raymaekers Date: Thu, 3 Nov 2022 12:27:21 +0100 Subject: [PATCH] dbjobqueue: check context errors after trying to dequeue This fixes a race condition where the context might have been canceled or timed out in between the preliminary check and trying to dequeue, and consequently returning the wrong error. Instead of doing the preliminary check, just check for the context errors when trying to dequeue. --- pkg/jobqueue/dbjobqueue/dbjobqueue.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/pkg/jobqueue/dbjobqueue/dbjobqueue.go b/pkg/jobqueue/dbjobqueue/dbjobqueue.go index 0af916077..3aaf382aa 100644 --- a/pkg/jobqueue/dbjobqueue/dbjobqueue.go +++ b/pkg/jobqueue/dbjobqueue/dbjobqueue.go @@ -307,11 +307,6 @@ func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu } func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { - // Return early if the context is already canceled. - if err := ctx.Err(); err != nil { - return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout - } - // add ourselves as a dequeuer c := make(chan struct{}, 1) el := q.dequeuers.pushBack(c) @@ -328,6 +323,9 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels [] break } if err != nil && !errors.Is(err, pgx.ErrNoRows) { + if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout + } return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error dequeuing job: %v", err) }