dbjobqueue: make jobDependencies and dependants accept transactions
We will need this in following commits in order to make dequeuing atomic. Signed-off-by: Ondřej Budai <ondrej@budai.cz>
This commit is contained in:
parent
9dc0881247
commit
571b959cc1
1 changed files with 11 additions and 3 deletions
|
|
@ -18,9 +18,10 @@ import (
|
|||
"github.com/jackc/pgtype"
|
||||
"github.com/jackc/pgx/v4"
|
||||
"github.com/jackc/pgx/v4/pgxpool"
|
||||
"github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/osbuild/osbuild-composer/internal/common/slogger"
|
||||
"github.com/osbuild/osbuild-composer/pkg/jobqueue"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
@ -639,7 +640,14 @@ func (q *DBJobQueue) RefreshHeartbeat(token uuid.UUID) {
|
|||
}
|
||||
}
|
||||
|
||||
func (q *DBJobQueue) jobDependencies(ctx context.Context, conn *pgxpool.Conn, id uuid.UUID) ([]uuid.UUID, error) {
|
||||
// connection unifies pgxpool.Conn and pgx.Tx interfaces
|
||||
// Some methods don't care whether they run queries on a raw connection,
|
||||
// or in a transaction. This interface thus abstracts this concept.
|
||||
type connection interface {
|
||||
Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
|
||||
}
|
||||
|
||||
func (q *DBJobQueue) jobDependencies(ctx context.Context, conn connection, id uuid.UUID) ([]uuid.UUID, error) {
|
||||
rows, err := conn.Query(ctx, sqlQueryDependencies, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -663,7 +671,7 @@ func (q *DBJobQueue) jobDependencies(ctx context.Context, conn *pgxpool.Conn, id
|
|||
return dependencies, nil
|
||||
}
|
||||
|
||||
func (q *DBJobQueue) jobDependents(ctx context.Context, conn *pgxpool.Conn, id uuid.UUID) ([]uuid.UUID, error) {
|
||||
func (q *DBJobQueue) jobDependents(ctx context.Context, conn connection, id uuid.UUID) ([]uuid.UUID, error) {
|
||||
rows, err := conn.Query(ctx, sqlQueryDependents, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue