diff --git a/pkg/jobqueue/dbjobqueue/dbjobqueue.go b/pkg/jobqueue/dbjobqueue/dbjobqueue.go index d40368eb4..004658354 100644 --- a/pkg/jobqueue/dbjobqueue/dbjobqueue.go +++ b/pkg/jobqueue/dbjobqueue/dbjobqueue.go @@ -313,15 +313,12 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels [] el := q.dequeuers.pushBack(c) defer q.dequeuers.remove(el) - var id uuid.UUID - var jobType string - var args json.RawMessage token := uuid.New() for { var err error - id, jobType, args, err = q.dequeueMaybe(ctx, token, jobTypes, channels) + id, dependencies, jobType, args, err := q.dequeueMaybe(ctx, token, jobTypes, channels) if err == nil { - break + return id, token, dependencies, jobType, args, nil } if err != nil && !errors.Is(err, pgx.ErrNoRows) { if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { @@ -337,41 +334,47 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels [] return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout } } +} +// dequeueMaybe tries to dequeue a job. +// +// Dequeuing a job means to run the sqlDequeue query, insert an initial +// heartbeat and retrieve all extra metadata from the database. +// +// This method returns pgx.ErrNoRows if the method didn't manage to dequeue +// anything +func (q *DBJobQueue) dequeueMaybe(ctx context.Context, token uuid.UUID, jobTypes []string, channels []string) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { + var conn *pgxpool.Conn conn, err := q.pool.Acquire(ctx) if err != nil { - return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error connecting to database: %v", err) + return uuid.Nil, nil, "", nil, fmt.Errorf("error acquiring a new connection when dequeueing: %w", err) } defer conn.Release() + var id uuid.UUID + var jobType string + var args json.RawMessage + err = conn.QueryRow(ctx, sqlDequeue, token, jobTypes, channels).Scan(&id, &jobType, &args) + + // skip the rest of the dequeueing operation if there are no rows + if err != nil && errors.Is(err, pgx.ErrNoRows) { + return uuid.Nil, nil, "", nil, err + } + // insert heartbeat _, err = conn.Exec(ctx, sqlInsertHeartbeat, token, id) if err != nil { - return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err) + return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err) } dependencies, err := q.jobDependencies(ctx, conn, id) if err != nil { - return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err) + return uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err) } q.logger.Info("Dequeued job", "job_type", jobType, "job_id", id.String(), "job_dependencies", fmt.Sprintf("%+v", dependencies)) - return id, token, dependencies, jobType, args, nil -} - -// dequeueMaybe is just a smaller helper for acquiring a connection and -// running the sqlDequeue query -func (q *DBJobQueue) dequeueMaybe(ctx context.Context, token uuid.UUID, jobTypes []string, channels []string) (id uuid.UUID, jobType string, args json.RawMessage, err error) { - var conn *pgxpool.Conn - conn, err = q.pool.Acquire(ctx) - if err != nil { - return - } - defer conn.Release() - - err = conn.QueryRow(ctx, sqlDequeue, token, jobTypes, channels).Scan(&id, &jobType, &args) - return + return id, dependencies, jobType, args, nil } func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {