dbjobqueue: put all SQL queries into dequeueMaybe

Let's move all SQL queries together. In the following commit, we will actually
put all of them into a transaction in order to ensure atomicity.

This isn't a functional change, just code shuffling.

Signed-off-by: Ondřej Budai <ondrej@budai.cz>
This commit is contained in:
Ondřej Budai 2023-04-14 13:12:24 +02:00 committed by Sanne Raymaekers
parent 464ce568b2
commit c3f6baad7f

View file

@ -313,15 +313,12 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels []
el := q.dequeuers.pushBack(c)
defer q.dequeuers.remove(el)
var id uuid.UUID
var jobType string
var args json.RawMessage
token := uuid.New()
for {
var err error
id, jobType, args, err = q.dequeueMaybe(ctx, token, jobTypes, channels)
id, dependencies, jobType, args, err := q.dequeueMaybe(ctx, token, jobTypes, channels)
if err == nil {
break
return id, token, dependencies, jobType, args, nil
}
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
@ -337,41 +334,47 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string, channels []
return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
}
}
}
// dequeueMaybe tries to dequeue a job.
//
// Dequeuing a job means to run the sqlDequeue query, insert an initial
// heartbeat and retrieve all extra metadata from the database.
//
// This method returns pgx.ErrNoRows if the method didn't manage to dequeue
// anything
func (q *DBJobQueue) dequeueMaybe(ctx context.Context, token uuid.UUID, jobTypes []string, channels []string) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
var conn *pgxpool.Conn
conn, err := q.pool.Acquire(ctx)
if err != nil {
return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error connecting to database: %v", err)
return uuid.Nil, nil, "", nil, fmt.Errorf("error acquiring a new connection when dequeueing: %w", err)
}
defer conn.Release()
var id uuid.UUID
var jobType string
var args json.RawMessage
err = conn.QueryRow(ctx, sqlDequeue, token, jobTypes, channels).Scan(&id, &jobType, &args)
// skip the rest of the dequeueing operation if there are no rows
if err != nil && errors.Is(err, pgx.ErrNoRows) {
return uuid.Nil, nil, "", nil, err
}
// insert heartbeat
_, err = conn.Exec(ctx, sqlInsertHeartbeat, token, id)
if err != nil {
return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err)
return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err)
}
dependencies, err := q.jobDependencies(ctx, conn, id)
if err != nil {
return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err)
return uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err)
}
q.logger.Info("Dequeued job", "job_type", jobType, "job_id", id.String(), "job_dependencies", fmt.Sprintf("%+v", dependencies))
return id, token, dependencies, jobType, args, nil
}
// dequeueMaybe is just a smaller helper for acquiring a connection and
// running the sqlDequeue query
func (q *DBJobQueue) dequeueMaybe(ctx context.Context, token uuid.UUID, jobTypes []string, channels []string) (id uuid.UUID, jobType string, args json.RawMessage, err error) {
var conn *pgxpool.Conn
conn, err = q.pool.Acquire(ctx)
if err != nil {
return
}
defer conn.Release()
err = conn.QueryRow(ctx, sqlDequeue, token, jobTypes, channels).Scan(&id, &jobType, &args)
return
return id, dependencies, jobType, args, nil
}
func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {