dbjobqueue: Add DeleteJob to database job queue

This adds SQL to delete jobs and dependencies, and implements the
database version of the DeleteJob function.

Related: RHEL-60120
This commit is contained in:
Brian C. Lane 2025-05-22 09:37:21 -07:00 committed by Tomáš Hozza
parent 5cddc4223d
commit c06064c1e2
2 changed files with 110 additions and 10 deletions

2
go.mod
View file

@ -34,6 +34,7 @@ require (
github.com/google/uuid v1.6.0
github.com/gophercloud/gophercloud v1.14.0
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/jackc/pgconn v1.14.3
github.com/jackc/pgtype v1.14.3
github.com/jackc/pgx/v4 v4.18.3
github.com/julienschmidt/httprouter v1.3.0
@ -165,7 +166,6 @@ require (
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.3 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.3 // indirect

View file

@ -15,6 +15,7 @@ import (
"time"
"github.com/google/uuid"
"github.com/jackc/pgconn"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
@ -61,6 +62,10 @@ const (
SET started_at = NULL, token = NULL, retries = retries + 1
WHERE id = $1 AND started_at IS NOT NULL AND finished_at IS NULL`
sqlDelete = `
DELETE FROM jobs
WHERE id = $1`
sqlInsertDependency = `INSERT INTO job_dependencies VALUES ($1, $2)`
sqlQueryDependencies = `
SELECT dependency_id
@ -70,6 +75,9 @@ const (
SELECT job_id
FROM job_dependencies
WHERE dependency_id = $1`
sqlDeleteDependencies = `
DELETE FROM job_dependencies
WHERE job_id = $1 AND dependency_id = $2`
sqlQueryListJobs = `
SELECT id from jobs`
@ -140,6 +148,14 @@ const (
WHERE worker_id = $1`
)
// connection unifies pgxpool.Conn and pgx.Tx interfaces
// Some methods don't care whether they run queries on a raw connection,
// or in a transaction. This interface thus abstracts this concept.
type connection interface {
Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
}
type DBJobQueue struct {
logger jobqueue.SimpleLogger
pool *pgxpool.Pool
@ -851,13 +867,6 @@ func (q *DBJobQueue) DeleteWorker(workerID uuid.UUID) error {
return nil
}
// connection unifies pgxpool.Conn and pgx.Tx interfaces
// Some methods don't care whether they run queries on a raw connection,
// or in a transaction. This interface thus abstracts this concept.
type connection interface {
Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error)
}
func (q *DBJobQueue) jobDependencies(ctx context.Context, conn connection, id uuid.UUID) ([]uuid.UUID, error) {
rows, err := conn.Query(ctx, sqlQueryDependencies, id)
if err != nil {
@ -960,8 +969,99 @@ func (q *DBJobQueue) AllRootJobIDs(ctx context.Context) (rootJobs []uuid.UUID, e
return
}
// DeleteJob deletes a job from the database
// Currently not implemented for the DBJobQueue
// DeleteJob deletes a job and all of its dependencies from the database
// If a dependency has multiple dependents it will only remove the parent job from
// the dependents list for that job instead of removing it.
//
// This assumes that the jobs have been created correctly, and that they have
// no dependency loops. Shared Dependents are ok, but a job cannot have a dependency
// on any of its parents (this should never happen).
func (q *DBJobQueue) DeleteJob(ctx context.Context, id uuid.UUID) error {
conn, err := q.pool.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()
tx, err := conn.Begin(ctx)
if err != nil {
return fmt.Errorf("error starting database transaction: %w", err)
}
defer func() {
err := tx.Rollback(ctx)
if err != nil && !errors.Is(err, pgx.ErrTxClosed) {
q.logger.Error(err, "Error rolling back delete transaction")
}
}()
// Start it off with an empty parent
err = q.deleteJobs(ctx, tx, uuid.UUID{}, id)
if err != nil {
return fmt.Errorf("Error deleting job %s: %w", id.String(), err)
}
err = tx.Commit(ctx)
if err != nil {
return fmt.Errorf("unable to commit database transaction: %v", err)
}
q.logger.Info("Deleted job", "job_id", id.String())
return nil
}
// deleteJobs will delete jobs as far down the list as possible
// missing dependencies are ignored, it deletes as much as it can.
// This function is recursive, the first call to it should be with
// the parent set to uuid.UUID{}
func (q *DBJobQueue) deleteJobs(ctx context.Context, conn connection, parent, id uuid.UUID) error {
// Delete parent:id dependencies if they exist
if len(parent.String()) > 0 {
err := q.deleteJobDependencies(ctx, conn, parent, id)
if err != nil {
return err
}
}
// Get the list of dependents for this id
dependents, err := q.jobDependents(ctx, conn, id)
if err != nil {
return err
}
// If this is > 0 then we are done, cannot delete further
if len(dependents) > 0 {
return nil
}
// Nothing depends on this job, recursively remove the dependencies
deps, err := q.jobDependencies(ctx, conn, id)
if err != nil {
return err
}
for _, d := range deps {
_ = q.deleteJobs(ctx, conn, id, d) // Recursively delete dependencies
}
return q.deleteJob(ctx, conn, id) // Actual delete from the database
}
// deleteJob removes the job from the database
// the CASCADE constraint will also delete any entries from the job_dependencies table
func (q *DBJobQueue) deleteJob(ctx context.Context, conn connection, jobID uuid.UUID) error {
_, err := conn.Exec(ctx, sqlDelete, jobID)
if err != nil {
q.logger.Error(err, "Error deleting job")
return err
}
return nil
}
// deleteJobDependencies removes job dependencies
func (q *DBJobQueue) deleteJobDependencies(ctx context.Context, conn connection, jobID, dependencyID uuid.UUID) error {
_, err := conn.Exec(ctx, sqlDeleteDependencies, jobID, dependencyID)
if err != nil {
q.logger.Error(err, "Error deleting dependency")
return err
}
return nil
}