diff --git a/internal/jobqueue/dbjobqueue/dbjobqueue.go b/internal/jobqueue/dbjobqueue/dbjobqueue.go index f40320d9a..7a3cb2542 100644 --- a/internal/jobqueue/dbjobqueue/dbjobqueue.go +++ b/internal/jobqueue/dbjobqueue/dbjobqueue.go @@ -21,6 +21,7 @@ import ( logrus "github.com/sirupsen/logrus" "github.com/osbuild/osbuild-composer/internal/jobqueue" + "github.com/osbuild/osbuild-composer/internal/prometheus" ) const ( @@ -41,7 +42,7 @@ const ( LIMIT 1 FOR UPDATE SKIP LOCKED ) - RETURNING id, token, type, args` + RETURNING id, token, type, args, queued_at, started_at` sqlDequeueByID = ` UPDATE jobs @@ -53,7 +54,7 @@ const ( LIMIT 1 FOR UPDATE SKIP LOCKED ) - RETURNING token, type, args` + RETURNING token, type, args, queued_at, started_at` sqlInsertDependency = `INSERT INTO job_dependencies VALUES ($1, $2)` sqlQueryDependencies = ` @@ -62,7 +63,7 @@ const ( WHERE job_id = $1` sqlQueryJob = ` - SELECT type, args, finished_at, canceled + SELECT type, args, started_at, finished_at, canceled FROM jobs WHERE id = $1` sqlQueryJobStatus = ` @@ -76,11 +77,13 @@ const ( sqlFinishJob = ` UPDATE jobs SET finished_at = now(), result = $1 - WHERE id = $2 AND finished_at IS NULL` + WHERE id = $2 AND finished_at IS NULL + RETURNING finished_at` sqlCancelJob = ` UPDATE jobs SET canceled = TRUE - WHERE id = $1 AND finished_at IS NULL` + WHERE id = $1 AND finished_at IS NULL + RETURNING type, started_at` sqlInsertHeartbeat = ` INSERT INTO heartbeats(token, id, heartbeat) @@ -180,6 +183,7 @@ func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu return uuid.Nil, fmt.Errorf("unable to commit database transaction: %v", err) } + prometheus.PendingJobs.WithLabelValues(jobType).Inc() logrus.Infof("Enqueued job of type %s with ID %s(dependencies %v)", jobType, id, dependencies) return id, nil @@ -211,9 +215,10 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, var id uuid.UUID var jobType string var args json.RawMessage + var started, queued *time.Time token := uuid.New() for { - err = conn.QueryRow(ctx, sqlDequeue, token, jobTypes).Scan(&id, &token, &jobType, &args) + err = conn.QueryRow(ctx, sqlDequeue, token, jobTypes).Scan(&id, &token, &jobType, &args, &queued, &started) if err == nil { break } @@ -229,6 +234,11 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, } } + diff := started.Sub(*queued).Seconds() + prometheus.JobWaitDuration.WithLabelValues(jobType).Observe(diff) + prometheus.PendingJobs.WithLabelValues(jobType).Dec() + prometheus.RunningJobs.WithLabelValues(jobType).Inc() + // insert heartbeat _, err = conn.Exec(ctx, sqlInsertHeartbeat, token, id) if err != nil { @@ -258,9 +268,10 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, var jobType string var args json.RawMessage + var started, queued *time.Time token := uuid.New() - err = conn.QueryRow(ctx, sqlDequeueByID, token, id).Scan(&token, &jobType, &args) + err = conn.QueryRow(ctx, sqlDequeueByID, token, id).Scan(&token, &jobType, &args, &queued, &started) if err == pgx.ErrNoRows { return uuid.Nil, nil, "", nil, jobqueue.ErrNotPending } else if err != nil { @@ -279,6 +290,10 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, } logrus.Infof("Dequeued job of type %v with ID %s", jobType, id) + diff := started.Sub(*queued).Seconds() + prometheus.JobWaitDuration.WithLabelValues(jobType).Observe(diff) + prometheus.PendingJobs.WithLabelValues(jobType).Dec() + prometheus.RunningJobs.WithLabelValues(jobType).Inc() return token, dependencies, jobType, args, nil } @@ -303,9 +318,10 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error { }() // Use double pointers for timestamps because they might be NULL, which would result in *time.Time == nil - var finished *time.Time + var started, finished *time.Time + var jobType string canceled := false - err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(nil, nil, nil, &finished, &canceled) + err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, nil, &started, nil, &canceled) if err == pgx.ErrNoRows { return jobqueue.ErrNotExist } @@ -326,15 +342,15 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error { return jobqueue.ErrNotExist } - tag, err = conn.Exec(context.Background(), sqlFinishJob, result, id) + err = conn.QueryRow(context.Background(), sqlFinishJob, result, id).Scan(&finished) + + if err == pgx.ErrNoRows { + return jobqueue.ErrNotExist + } if err != nil { return fmt.Errorf("error finishing job %s: %v", id, err) } - if tag.RowsAffected() != 1 { - return jobqueue.ErrNotExist - } - _, err = conn.Exec(context.Background(), sqlNotify) if err != nil { return fmt.Errorf("error notifying jobs channel: %v", err) @@ -346,6 +362,9 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error { } logrus.Infof("Finished job with ID %s", id) + diff := finished.Sub(*started).Seconds() + prometheus.JobDuration.WithLabelValues(jobType).Observe(diff) + prometheus.RunningJobs.WithLabelValues(jobType).Dec() return nil } @@ -357,16 +376,22 @@ func (q *DBJobQueue) CancelJob(id uuid.UUID) error { } defer conn.Release() - tag, err := conn.Exec(context.Background(), sqlCancelJob, id) + var started *time.Time + var jobType string + err = conn.QueryRow(context.Background(), sqlCancelJob, id).Scan(&jobType, &started) + if err == pgx.ErrNoRows { + return jobqueue.ErrNotRunning + } if err != nil { return fmt.Errorf("error canceling job %s: %v", id, err) } - if tag.RowsAffected() != 1 { - return jobqueue.ErrNotRunning - } - logrus.Infof("Cancelled job with ID %s", id) + if started != nil { + prometheus.RunningJobs.WithLabelValues(jobType).Dec() + } else { + prometheus.PendingJobs.WithLabelValues(jobType).Dec() + } return nil } @@ -410,7 +435,7 @@ func (q *DBJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, de } defer conn.Release() - err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, &args, nil, nil) + err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, &args, nil, nil, nil) if err == pgx.ErrNoRows { err = jobqueue.ErrNotExist return diff --git a/internal/prometheus/job_metrics.go b/internal/prometheus/job_metrics.go new file mode 100644 index 000000000..3f87f7072 --- /dev/null +++ b/internal/prometheus/job_metrics.go @@ -0,0 +1,46 @@ +package prometheus + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const workerSubsystem = "composer_worker" + +var ( + PendingJobs = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "pending_jobs", + Namespace: namespace, + Subsystem: workerSubsystem, + Help: "Currently pending jobs", + }, []string{"type"}) +) + +var ( + RunningJobs = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "running_jobs", + Namespace: namespace, + Subsystem: workerSubsystem, + Help: "Currently running jobs", + }, []string{"type"}) +) + +var ( + JobDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "job_duration_seconds", + Namespace: namespace, + Subsystem: workerSubsystem, + Help: "Duration spent by workers on a job.", + Buckets: []float64{.1, .2, .5, 1, 2, 4, 8, 16, 32, 64, 96, 128, 160, 192, 224, 256, 320, 382, 448, 512, 640, 768, 896, 1024, 1280, 1536, 1792, 2049}, + }, []string{"type"}) +) + +var ( + JobWaitDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "job_wait_duration_seconds", + Namespace: namespace, + Subsystem: workerSubsystem, + Help: "Duration a job spends on the queue.", + Buckets: []float64{.1, .2, .5, 1, 2, 4, 8, 16, 32, 64, 96, 128, 160, 192, 224, 256, 320, 382, 448, 512, 640, 768, 896, 1024, 1280, 1536, 1792, 2049}, + }, []string{"type"}) +)