From c06064c1e2c55104557662d837358f699fdeddcc Mon Sep 17 00:00:00 2001 From: "Brian C. Lane" Date: Thu, 22 May 2025 09:37:21 -0700 Subject: [PATCH] dbjobqueue: Add DeleteJob to database job queue This adds SQL to delete jobs and dependencies, and implements the database version of the DeleteJob function. Related: RHEL-60120 --- go.mod | 2 +- pkg/jobqueue/dbjobqueue/dbjobqueue.go | 118 ++++++++++++++++++++++++-- 2 files changed, 110 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index d1fbc4ae9..d6cd1b275 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/google/uuid v1.6.0 github.com/gophercloud/gophercloud v1.14.0 github.com/hashicorp/go-retryablehttp v0.7.7 + github.com/jackc/pgconn v1.14.3 github.com/jackc/pgtype v1.14.3 github.com/jackc/pgx/v4 v4.18.3 github.com/julienschmidt/httprouter v1.3.0 @@ -165,7 +166,6 @@ require ( github.com/hashicorp/go-version v1.7.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect - github.com/jackc/pgconn v1.14.3 // indirect github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgproto3/v2 v2.3.3 // indirect diff --git a/pkg/jobqueue/dbjobqueue/dbjobqueue.go b/pkg/jobqueue/dbjobqueue/dbjobqueue.go index 2ff0a3e2d..b4ec0bdb5 100644 --- a/pkg/jobqueue/dbjobqueue/dbjobqueue.go +++ b/pkg/jobqueue/dbjobqueue/dbjobqueue.go @@ -15,6 +15,7 @@ import ( "time" "github.com/google/uuid" + "github.com/jackc/pgconn" "github.com/jackc/pgtype" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" @@ -61,6 +62,10 @@ const ( SET started_at = NULL, token = NULL, retries = retries + 1 WHERE id = $1 AND started_at IS NOT NULL AND finished_at IS NULL` + sqlDelete = ` + DELETE FROM jobs + WHERE id = $1` + sqlInsertDependency = `INSERT INTO job_dependencies VALUES ($1, $2)` sqlQueryDependencies = ` SELECT dependency_id @@ -70,6 +75,9 @@ const ( SELECT job_id FROM job_dependencies WHERE dependency_id = $1` + sqlDeleteDependencies = ` + DELETE FROM job_dependencies + WHERE job_id = $1 AND dependency_id = $2` sqlQueryListJobs = ` SELECT id from jobs` @@ -140,6 +148,14 @@ const ( WHERE worker_id = $1` ) +// connection unifies pgxpool.Conn and pgx.Tx interfaces +// Some methods don't care whether they run queries on a raw connection, +// or in a transaction. This interface thus abstracts this concept. +type connection interface { + Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) + Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) +} + type DBJobQueue struct { logger jobqueue.SimpleLogger pool *pgxpool.Pool @@ -851,13 +867,6 @@ func (q *DBJobQueue) DeleteWorker(workerID uuid.UUID) error { return nil } -// connection unifies pgxpool.Conn and pgx.Tx interfaces -// Some methods don't care whether they run queries on a raw connection, -// or in a transaction. This interface thus abstracts this concept. -type connection interface { - Query(ctx context.Context, sql string, args ...interface{}) (pgx.Rows, error) -} - func (q *DBJobQueue) jobDependencies(ctx context.Context, conn connection, id uuid.UUID) ([]uuid.UUID, error) { rows, err := conn.Query(ctx, sqlQueryDependencies, id) if err != nil { @@ -960,8 +969,99 @@ func (q *DBJobQueue) AllRootJobIDs(ctx context.Context) (rootJobs []uuid.UUID, e return } -// DeleteJob deletes a job from the database -// Currently not implemented for the DBJobQueue +// DeleteJob deletes a job and all of its dependencies from the database +// If a dependency has multiple dependents it will only remove the parent job from +// the dependents list for that job instead of removing it. +// +// This assumes that the jobs have been created correctly, and that they have +// no dependency loops. Shared Dependents are ok, but a job cannot have a dependency +// on any of its parents (this should never happen). func (q *DBJobQueue) DeleteJob(ctx context.Context, id uuid.UUID) error { + conn, err := q.pool.Acquire(ctx) + if err != nil { + return err + } + defer conn.Release() + + tx, err := conn.Begin(ctx) + if err != nil { + return fmt.Errorf("error starting database transaction: %w", err) + } + defer func() { + err := tx.Rollback(ctx) + if err != nil && !errors.Is(err, pgx.ErrTxClosed) { + q.logger.Error(err, "Error rolling back delete transaction") + } + }() + + // Start it off with an empty parent + err = q.deleteJobs(ctx, tx, uuid.UUID{}, id) + if err != nil { + return fmt.Errorf("Error deleting job %s: %w", id.String(), err) + } + err = tx.Commit(ctx) + if err != nil { + return fmt.Errorf("unable to commit database transaction: %v", err) + } + + q.logger.Info("Deleted job", "job_id", id.String()) + + return nil +} + +// deleteJobs will delete jobs as far down the list as possible +// missing dependencies are ignored, it deletes as much as it can. +// This function is recursive, the first call to it should be with +// the parent set to uuid.UUID{} +func (q *DBJobQueue) deleteJobs(ctx context.Context, conn connection, parent, id uuid.UUID) error { + // Delete parent:id dependencies if they exist + if len(parent.String()) > 0 { + err := q.deleteJobDependencies(ctx, conn, parent, id) + if err != nil { + return err + } + } + + // Get the list of dependents for this id + dependents, err := q.jobDependents(ctx, conn, id) + if err != nil { + return err + } + + // If this is > 0 then we are done, cannot delete further + if len(dependents) > 0 { + return nil + } + + // Nothing depends on this job, recursively remove the dependencies + deps, err := q.jobDependencies(ctx, conn, id) + if err != nil { + return err + } + for _, d := range deps { + _ = q.deleteJobs(ctx, conn, id, d) // Recursively delete dependencies + } + + return q.deleteJob(ctx, conn, id) // Actual delete from the database +} + +// deleteJob removes the job from the database +// the CASCADE constraint will also delete any entries from the job_dependencies table +func (q *DBJobQueue) deleteJob(ctx context.Context, conn connection, jobID uuid.UUID) error { + _, err := conn.Exec(ctx, sqlDelete, jobID) + if err != nil { + q.logger.Error(err, "Error deleting job") + return err + } + return nil +} + +// deleteJobDependencies removes job dependencies +func (q *DBJobQueue) deleteJobDependencies(ctx context.Context, conn connection, jobID, dependencyID uuid.UUID) error { + _, err := conn.Exec(ctx, sqlDeleteDependencies, jobID, dependencyID) + if err != nil { + q.logger.Error(err, "Error deleting dependency") + return err + } return nil }