dbjobqueue: check context errors after trying to dequeue

This fixes a race condition where the context might have been canceled
or timed out in between the preliminary check and trying to dequeue, and
consequently returning the wrong error.

Instead of doing the preliminary check, just check for the context
errors when trying to dequeue.
This commit is contained in:
Sanne Raymaekers 2022-11-03 12:27:21 +01:00 committed by Tom Gundersen
parent 26b8e2ff6e
commit 0a9cf9b6a7

View file

@ -307,11 +307,6 @@ func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
}
func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
// Return early if the context is already canceled.
if err := ctx.Err(); err != nil {
return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
}
// add ourselves as a dequeuer
c := make(chan struct{}, 1)
el := q.dequeuers.pushBack(c)
@ -328,6 +323,9 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels []
break
}
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
}
return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error dequeuing job: %v", err)
}