Refactor the current metric collection to make use of re-usable functions, since some of the same queries are repeated. This will also make it easier to move the collection of metrics from the job queue.
614 lines
17 KiB
Go
614 lines
17 KiB
Go
// Package dbjobqueue implements the interfaces in package jobqueue backed by a
|
|
// PostreSQL database.
|
|
//
|
|
// Data is stored non-reduntantly. Any data structure necessary for efficient
|
|
// access (e.g., dependants) are kept in memory.
|
|
package dbjobqueue
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/jackc/pgconn"
|
|
"github.com/jackc/pgtype"
|
|
"github.com/jackc/pgx/v4"
|
|
"github.com/jackc/pgx/v4/pgxpool"
|
|
|
|
logrus "github.com/sirupsen/logrus"
|
|
|
|
"github.com/osbuild/osbuild-composer/internal/jobqueue"
|
|
"github.com/osbuild/osbuild-composer/internal/prometheus"
|
|
)
|
|
|
|
const (
|
|
sqlNotify = `NOTIFY jobs`
|
|
sqlListen = `LISTEN jobs`
|
|
sqlUnlisten = `UNLISTEN jobs`
|
|
|
|
sqlEnqueue = `INSERT INTO jobs(id, type, args, queued_at) VALUES ($1, $2, $3, NOW())`
|
|
sqlDequeue = `
|
|
UPDATE jobs
|
|
SET token = $1, started_at = now()
|
|
WHERE id = (
|
|
SELECT id
|
|
FROM ready_jobs
|
|
-- use ANY here, because "type in ()" doesn't work with bound parameters
|
|
-- literal syntax for this is '{"a", "b"}': https://www.postgresql.org/docs/13/arrays.html
|
|
WHERE type = ANY($2)
|
|
LIMIT 1
|
|
FOR UPDATE SKIP LOCKED
|
|
)
|
|
RETURNING id, token, type, args, queued_at, started_at`
|
|
|
|
sqlDequeueByID = `
|
|
UPDATE jobs
|
|
SET token = $1, started_at = now()
|
|
WHERE id = (
|
|
SELECT id
|
|
FROM ready_jobs
|
|
WHERE id = $2
|
|
LIMIT 1
|
|
FOR UPDATE SKIP LOCKED
|
|
)
|
|
RETURNING token, type, args, queued_at, started_at`
|
|
|
|
sqlInsertDependency = `INSERT INTO job_dependencies VALUES ($1, $2)`
|
|
sqlQueryDependencies = `
|
|
SELECT dependency_id
|
|
FROM job_dependencies
|
|
WHERE job_id = $1`
|
|
|
|
sqlQueryJob = `
|
|
SELECT type, args, started_at, finished_at, canceled
|
|
FROM jobs
|
|
WHERE id = $1`
|
|
sqlQueryJobStatus = `
|
|
SELECT type, result, queued_at, started_at, finished_at, canceled
|
|
FROM jobs
|
|
WHERE id = $1`
|
|
sqlQueryRunningId = `
|
|
SELECT id
|
|
FROM jobs
|
|
WHERE token = $1 AND finished_at IS NULL AND canceled = FALSE`
|
|
sqlFinishJob = `
|
|
UPDATE jobs
|
|
SET finished_at = now(), result = $1
|
|
WHERE id = $2 AND finished_at IS NULL
|
|
RETURNING finished_at`
|
|
sqlCancelJob = `
|
|
UPDATE jobs
|
|
SET canceled = TRUE
|
|
WHERE id = $1 AND finished_at IS NULL
|
|
RETURNING type, started_at`
|
|
|
|
sqlInsertHeartbeat = `
|
|
INSERT INTO heartbeats(token, id, heartbeat)
|
|
VALUES ($1, $2, now())`
|
|
sqlQueryHeartbeats = `
|
|
SELECT token
|
|
FROM heartbeats
|
|
WHERE age(now(), heartbeat) > $1`
|
|
sqlRefreshHeartbeat = `
|
|
UPDATE heartbeats
|
|
SET heartbeat = now()
|
|
WHERE token = $1`
|
|
sqlDeleteHeartbeat = `
|
|
DELETE FROM heartbeats
|
|
WHERE id = $1`
|
|
|
|
// Maintenance queries
|
|
sqlQueryJobsUptoByType = `
|
|
SELECT array_agg(id), type
|
|
FROM jobs
|
|
WHERE type = ANY($1) AND finished_at < $2
|
|
GROUP BY type`
|
|
sqlQueryDepedenciesRecursively = `
|
|
WITH RECURSIVE dependencies(d) AS (
|
|
SELECT dependency_id
|
|
FROM job_dependencies
|
|
WHERE job_id = $1
|
|
UNION ALL
|
|
SELECT dependency_id
|
|
FROM dependencies, job_dependencies
|
|
WHERE job_dependencies.job_id = d )
|
|
SELECT * FROM dependencies`
|
|
sqlDeleteJobDependencies = `
|
|
DELETE FROM job_dependencies
|
|
WHERE dependency_id = ANY($1)`
|
|
sqlDeleteJobs = `
|
|
DELETE FROM jobs
|
|
WHERE id = ANY($1)`
|
|
)
|
|
|
|
type DBJobQueue struct {
|
|
pool *pgxpool.Pool
|
|
}
|
|
|
|
// Create a new DBJobQueue object for `url`.
|
|
func New(url string) (*DBJobQueue, error) {
|
|
pool, err := pgxpool.Connect(context.Background(), url)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error establishing connection: %v", err)
|
|
}
|
|
|
|
return &DBJobQueue{pool}, nil
|
|
}
|
|
|
|
func (q *DBJobQueue) Close() {
|
|
q.pool.Close()
|
|
}
|
|
|
|
func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uuid.UUID) (uuid.UUID, error) {
|
|
conn, err := q.pool.Acquire(context.Background())
|
|
if err != nil {
|
|
return uuid.Nil, fmt.Errorf("error connecting to database: %v", err)
|
|
}
|
|
defer conn.Release()
|
|
|
|
tx, err := conn.Begin(context.Background())
|
|
if err != nil {
|
|
return uuid.Nil, fmt.Errorf("error starting database transaction: %v", err)
|
|
}
|
|
defer func() {
|
|
err := tx.Rollback(context.Background())
|
|
if err != nil && !errors.As(err, &pgx.ErrTxClosed) {
|
|
logrus.Error("error rolling back enqueue transaction: ", err)
|
|
}
|
|
}()
|
|
|
|
id := uuid.New()
|
|
_, err = conn.Exec(context.Background(), sqlEnqueue, id, jobType, args)
|
|
if err != nil {
|
|
return uuid.Nil, fmt.Errorf("error enqueuing job: %v", err)
|
|
}
|
|
|
|
for _, d := range dependencies {
|
|
_, err = conn.Exec(context.Background(), sqlInsertDependency, id, d)
|
|
if err != nil {
|
|
return uuid.Nil, fmt.Errorf("error inserting dependency: %v", err)
|
|
}
|
|
}
|
|
|
|
_, err = conn.Exec(context.Background(), sqlNotify)
|
|
if err != nil {
|
|
return uuid.Nil, fmt.Errorf("error notifying jobs channel: %v", err)
|
|
}
|
|
|
|
err = tx.Commit(context.Background())
|
|
if err != nil {
|
|
return uuid.Nil, fmt.Errorf("unable to commit database transaction: %v", err)
|
|
}
|
|
|
|
prometheus.EnqueueJobMetrics(jobType)
|
|
logrus.Infof("Enqueued job of type %s with ID %s(dependencies %v)", jobType, id, dependencies)
|
|
|
|
return id, nil
|
|
}
|
|
|
|
func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
|
|
// Return early if the context is already canceled.
|
|
if err := ctx.Err(); err != nil {
|
|
return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
|
|
}
|
|
|
|
conn, err := q.pool.Acquire(ctx)
|
|
if err != nil {
|
|
return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error connecting to database: %v", err)
|
|
}
|
|
defer func() {
|
|
_, err := conn.Exec(ctx, sqlUnlisten)
|
|
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
|
|
logrus.Error("Error unlistening for jobs in dequeue: ", err)
|
|
}
|
|
conn.Release()
|
|
}()
|
|
|
|
_, err = conn.Exec(ctx, sqlListen)
|
|
if err != nil {
|
|
return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error listening on jobs channel: %v", err)
|
|
}
|
|
|
|
var id uuid.UUID
|
|
var jobType string
|
|
var args json.RawMessage
|
|
var started, queued *time.Time
|
|
token := uuid.New()
|
|
for {
|
|
err = conn.QueryRow(ctx, sqlDequeue, token, jobTypes).Scan(&id, &token, &jobType, &args, &queued, &started)
|
|
if err == nil {
|
|
break
|
|
}
|
|
if err != nil && !errors.As(err, &pgx.ErrNoRows) {
|
|
return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error dequeuing job: %v", err)
|
|
}
|
|
_, err = conn.Conn().WaitForNotification(ctx)
|
|
if err != nil {
|
|
if pgconn.Timeout(err) {
|
|
return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
|
|
}
|
|
return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error waiting for notification on jobs channel: %v", err)
|
|
}
|
|
}
|
|
|
|
prometheus.DequeueJobMetrics(*queued, *started, jobType)
|
|
|
|
// insert heartbeat
|
|
_, err = conn.Exec(ctx, sqlInsertHeartbeat, token, id)
|
|
if err != nil {
|
|
return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err)
|
|
}
|
|
|
|
dependencies, err := q.jobDependencies(ctx, conn, id)
|
|
if err != nil {
|
|
return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err)
|
|
}
|
|
|
|
logrus.Infof("Dequeued job of type %v with ID %s", jobType, id)
|
|
|
|
return id, token, dependencies, jobType, args, nil
|
|
}
|
|
func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, []uuid.UUID, string, json.RawMessage, error) {
|
|
// Return early if the context is already canceled.
|
|
if err := ctx.Err(); err != nil {
|
|
return uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout
|
|
}
|
|
|
|
conn, err := q.pool.Acquire(ctx)
|
|
if err != nil {
|
|
return uuid.Nil, nil, "", nil, fmt.Errorf("error connecting to database: %v", err)
|
|
}
|
|
defer conn.Release()
|
|
|
|
var jobType string
|
|
var args json.RawMessage
|
|
var started, queued *time.Time
|
|
token := uuid.New()
|
|
|
|
err = conn.QueryRow(ctx, sqlDequeueByID, token, id).Scan(&token, &jobType, &args, &queued, &started)
|
|
if err == pgx.ErrNoRows {
|
|
return uuid.Nil, nil, "", nil, jobqueue.ErrNotPending
|
|
} else if err != nil {
|
|
return uuid.Nil, nil, "", nil, fmt.Errorf("error dequeuing job: %v", err)
|
|
}
|
|
|
|
// insert heartbeat
|
|
_, err = conn.Exec(ctx, sqlInsertHeartbeat, token, id)
|
|
if err != nil {
|
|
return uuid.Nil, nil, "", nil, fmt.Errorf("error inserting the job's heartbeat: %v", err)
|
|
}
|
|
|
|
dependencies, err := q.jobDependencies(ctx, conn, id)
|
|
if err != nil {
|
|
return uuid.Nil, nil, "", nil, fmt.Errorf("error querying the job's dependencies: %v", err)
|
|
}
|
|
|
|
logrus.Infof("Dequeued job of type %v with ID %s", jobType, id)
|
|
prometheus.DequeueJobMetrics(*queued, *started, jobType)
|
|
|
|
return token, dependencies, jobType, args, nil
|
|
}
|
|
|
|
func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
|
|
conn, err := q.pool.Acquire(context.Background())
|
|
if err != nil {
|
|
return fmt.Errorf("error connecting to database: %v", err)
|
|
}
|
|
defer conn.Release()
|
|
|
|
tx, err := conn.Begin(context.Background())
|
|
if err != nil {
|
|
return fmt.Errorf("error starting database transaction: %v", err)
|
|
}
|
|
defer func() {
|
|
err = tx.Rollback(context.Background())
|
|
if err != nil && !errors.As(err, &pgx.ErrTxClosed) {
|
|
logrus.Errorf("error rolling back finish job transaction for job %s: %v", id, err)
|
|
}
|
|
|
|
}()
|
|
|
|
// Use double pointers for timestamps because they might be NULL, which would result in *time.Time == nil
|
|
var started, finished *time.Time
|
|
var jobType string
|
|
canceled := false
|
|
err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, nil, &started, &finished, &canceled)
|
|
if err == pgx.ErrNoRows {
|
|
return jobqueue.ErrNotExist
|
|
}
|
|
if canceled {
|
|
return jobqueue.ErrCanceled
|
|
}
|
|
if finished != nil {
|
|
return jobqueue.ErrNotRunning
|
|
}
|
|
|
|
// Remove from heartbeats
|
|
tag, err := conn.Exec(context.Background(), sqlDeleteHeartbeat, id)
|
|
if err != nil {
|
|
return fmt.Errorf("error finishing job %s: %v", id, err)
|
|
}
|
|
|
|
if tag.RowsAffected() != 1 {
|
|
return jobqueue.ErrNotExist
|
|
}
|
|
|
|
err = conn.QueryRow(context.Background(), sqlFinishJob, result, id).Scan(&finished)
|
|
|
|
if err == pgx.ErrNoRows {
|
|
return jobqueue.ErrNotExist
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("error finishing job %s: %v", id, err)
|
|
}
|
|
|
|
_, err = conn.Exec(context.Background(), sqlNotify)
|
|
if err != nil {
|
|
return fmt.Errorf("error notifying jobs channel: %v", err)
|
|
}
|
|
|
|
err = tx.Commit(context.Background())
|
|
if err != nil {
|
|
return fmt.Errorf("unable to commit database transaction: %v", err)
|
|
}
|
|
|
|
logrus.Infof("Finished job with ID %s", id)
|
|
prometheus.FinishJobMetrics(*started, *finished, canceled, jobType)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *DBJobQueue) CancelJob(id uuid.UUID) error {
|
|
conn, err := q.pool.Acquire(context.Background())
|
|
if err != nil {
|
|
return fmt.Errorf("error connecting to database: %v", err)
|
|
}
|
|
defer conn.Release()
|
|
|
|
var started *time.Time
|
|
var jobType string
|
|
err = conn.QueryRow(context.Background(), sqlCancelJob, id).Scan(&jobType, &started)
|
|
if err == pgx.ErrNoRows {
|
|
return jobqueue.ErrNotRunning
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("error canceling job %s: %v", id, err)
|
|
}
|
|
|
|
logrus.Infof("Cancelled job with ID %s", id)
|
|
prometheus.CancelJobMetrics(*started, jobType)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) {
|
|
conn, err := q.pool.Acquire(context.Background())
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer conn.Release()
|
|
|
|
// Use double pointers for timestamps because they might be NULL, which would result in *time.Time == nil
|
|
var sp, fp *time.Time
|
|
var rp pgtype.JSON
|
|
err = conn.QueryRow(context.Background(), sqlQueryJobStatus, id).Scan(&jobType, &rp, &queued, &sp, &fp, &canceled)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if sp != nil {
|
|
started = *sp
|
|
}
|
|
if fp != nil {
|
|
finished = *fp
|
|
}
|
|
if rp.Status != pgtype.Null {
|
|
result = rp.Bytes
|
|
}
|
|
|
|
deps, err = q.jobDependencies(context.Background(), conn, id)
|
|
if err != nil {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// Job returns all the parameters that define a job (everything provided during Enqueue).
|
|
func (q *DBJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, err error) {
|
|
conn, err := q.pool.Acquire(context.Background())
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer conn.Release()
|
|
|
|
err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, &args, nil, nil, nil)
|
|
if err == pgx.ErrNoRows {
|
|
err = jobqueue.ErrNotExist
|
|
return
|
|
} else if err != nil {
|
|
return
|
|
}
|
|
|
|
dependencies, err = q.jobDependencies(context.Background(), conn, id)
|
|
return
|
|
}
|
|
|
|
// Find job by token, this will return an error if the job hasn't been dequeued
|
|
func (q *DBJobQueue) IdFromToken(token uuid.UUID) (id uuid.UUID, err error) {
|
|
conn, err := q.pool.Acquire(context.Background())
|
|
if err != nil {
|
|
return uuid.Nil, fmt.Errorf("error establishing connection: %v", err)
|
|
}
|
|
defer conn.Release()
|
|
|
|
err = conn.QueryRow(context.Background(), sqlQueryRunningId, token).Scan(&id)
|
|
if err == pgx.ErrNoRows {
|
|
return uuid.Nil, jobqueue.ErrNotExist
|
|
} else if err != nil {
|
|
return uuid.Nil, fmt.Errorf("Error retrieving id: %v", err)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Get a list of tokens which haven't been updated in the specified time frame
|
|
func (q *DBJobQueue) Heartbeats(olderThan time.Duration) (tokens []uuid.UUID) {
|
|
conn, err := q.pool.Acquire(context.Background())
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer conn.Release()
|
|
|
|
rows, err := conn.Query(context.Background(), sqlQueryHeartbeats, olderThan.String())
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var t uuid.UUID
|
|
err = rows.Scan(&t)
|
|
if err != nil {
|
|
// Log the error and try to continue with the next row
|
|
logrus.Error("Unable to read token from heartbeats: ", err)
|
|
continue
|
|
}
|
|
tokens = append(tokens, t)
|
|
}
|
|
if rows.Err() != nil {
|
|
logrus.Error("Error reading tokens from heartbeats: ", rows.Err())
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Reset the last heartbeat time to time.Now()
|
|
func (q *DBJobQueue) RefreshHeartbeat(token uuid.UUID) {
|
|
conn, err := q.pool.Acquire(context.Background())
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer conn.Release()
|
|
|
|
tag, err := conn.Exec(context.Background(), sqlRefreshHeartbeat, token)
|
|
if err != nil {
|
|
logrus.Error("Error refreshing heartbeat: ", err)
|
|
}
|
|
if tag.RowsAffected() != 1 {
|
|
logrus.Error("No rows affected when refreshing heartbeat for ", token)
|
|
}
|
|
}
|
|
|
|
func (q *DBJobQueue) jobDependencies(ctx context.Context, conn *pgxpool.Conn, id uuid.UUID) ([]uuid.UUID, error) {
|
|
rows, err := conn.Query(ctx, sqlQueryDependencies, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
dependencies := []uuid.UUID{}
|
|
for rows.Next() {
|
|
var d uuid.UUID
|
|
err = rows.Scan(&d)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dependencies = append(dependencies, d)
|
|
}
|
|
if rows.Err() != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return dependencies, nil
|
|
}
|
|
|
|
// return map id -> jobtype ?
|
|
func (q *DBJobQueue) JobsUptoByType(jobTypes []string, upto time.Time) (result map[string][]uuid.UUID, err error) {
|
|
result = make(map[string][]uuid.UUID)
|
|
|
|
conn, err := q.pool.Acquire(context.Background())
|
|
if err != nil {
|
|
err = fmt.Errorf("error connecting to database: %v", err)
|
|
return
|
|
}
|
|
defer conn.Release()
|
|
|
|
rows, err := conn.Query(context.Background(), sqlQueryJobsUptoByType, jobTypes, upto)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer rows.Close()
|
|
|
|
for rows.Next() {
|
|
var ids []uuid.UUID
|
|
var jt string
|
|
err = rows.Scan(&ids, &jt)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
result[jt] = ids
|
|
}
|
|
err = rows.Err()
|
|
return
|
|
}
|
|
|
|
// Deletes single job and dependencies (recursively)
|
|
func (q *DBJobQueue) DeleteJobIncludingDependencies(jobId uuid.UUID) error {
|
|
conn, err := q.pool.Acquire(context.Background())
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("error connecting to database: %v", err)
|
|
}
|
|
defer conn.Release()
|
|
|
|
tx, err := conn.Begin(context.Background())
|
|
if err != nil {
|
|
return fmt.Errorf("error starting database transaction: %v", err)
|
|
}
|
|
defer func() {
|
|
err := tx.Rollback(context.Background())
|
|
if err != nil && !errors.As(err, &pgx.ErrTxClosed) {
|
|
logrus.Error("error rolling back enqueue transaction: ", err)
|
|
}
|
|
}()
|
|
|
|
rows, err := conn.Query(context.Background(), sqlQueryDepedenciesRecursively, jobId)
|
|
if err != nil {
|
|
return fmt.Errorf("error querying the job's dependencies: %v", err)
|
|
}
|
|
|
|
var dependencies []uuid.UUID
|
|
for rows.Next() {
|
|
var dep uuid.UUID
|
|
err = rows.Scan(&dep)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
dependencies = append(dependencies, dep)
|
|
}
|
|
|
|
depTag, err := conn.Exec(context.Background(), sqlDeleteJobDependencies, dependencies)
|
|
if err != nil {
|
|
return fmt.Errorf("Error removing from dependencies recursively for job %v: %v", jobId, err)
|
|
}
|
|
|
|
jobAndDependencies := append(dependencies, jobId)
|
|
jobsTag, err := conn.Exec(context.Background(), sqlDeleteJobs, jobAndDependencies)
|
|
if err != nil {
|
|
return fmt.Errorf("Error removing from jobs recursively for job %v: %v", jobId, err)
|
|
}
|
|
|
|
err = tx.Commit(context.Background())
|
|
if err != nil {
|
|
return fmt.Errorf("unable to commit database transaction: %v", err)
|
|
}
|
|
|
|
logrus.Infof("Removed %d rows from dependencies for job %v", depTag.RowsAffected(), jobId)
|
|
logrus.Infof("Removed %d rows from jobs for job %v, this includes dependencies", jobsTag.RowsAffected(), jobId)
|
|
return nil
|
|
}
|