metrics: add initial job metrics

Add job metrics to track the number of
pending/running jobs, the duration of
the jobs and how long the jobs spent in
the job queue.
This commit is contained in:
Gianluca Zuccarelli 2021-11-30 15:45:09 +00:00 committed by Tom Gundersen
parent 4455fba187
commit 1a709eda5c
2 changed files with 91 additions and 20 deletions

View file

@ -21,6 +21,7 @@ import (
logrus "github.com/sirupsen/logrus"
"github.com/osbuild/osbuild-composer/internal/jobqueue"
"github.com/osbuild/osbuild-composer/internal/prometheus"
)
const (
@ -41,7 +42,7 @@ const (
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING id, token, type, args`
RETURNING id, token, type, args, queued_at, started_at`
sqlDequeueByID = `
UPDATE jobs
@ -53,7 +54,7 @@ const (
LIMIT 1
FOR UPDATE SKIP LOCKED
)
RETURNING token, type, args`
RETURNING token, type, args, queued_at, started_at`
sqlInsertDependency = `INSERT INTO job_dependencies VALUES ($1, $2)`
sqlQueryDependencies = `
@ -62,7 +63,7 @@ const (
WHERE job_id = $1`
sqlQueryJob = `
SELECT type, args, finished_at, canceled
SELECT type, args, started_at, finished_at, canceled
FROM jobs
WHERE id = $1`
sqlQueryJobStatus = `
@ -76,11 +77,13 @@ const (
sqlFinishJob = `
UPDATE jobs
SET finished_at = now(), result = $1
WHERE id = $2 AND finished_at IS NULL`
WHERE id = $2 AND finished_at IS NULL
RETURNING finished_at`
sqlCancelJob = `
UPDATE jobs
SET canceled = TRUE
WHERE id = $1 AND finished_at IS NULL`
WHERE id = $1 AND finished_at IS NULL
RETURNING type, started_at`
sqlInsertHeartbeat = `
INSERT INTO heartbeats(token, id, heartbeat)
@ -180,6 +183,7 @@ func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu
return uuid.Nil, fmt.Errorf("unable to commit database transaction: %v", err)
}
prometheus.PendingJobs.WithLabelValues(jobType).Inc()
logrus.Infof("Enqueued job of type %s with ID %s(dependencies %v)", jobType, id, dependencies)
return id, nil
@ -211,9 +215,10 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID,
var id uuid.UUID
var jobType string
var args json.RawMessage
var started, queued *time.Time
token := uuid.New()
for {
err = conn.QueryRow(ctx, sqlDequeue, token, jobTypes).Scan(&id, &token, &jobType, &args)
err = conn.QueryRow(ctx, sqlDequeue, token, jobTypes).Scan(&id, &token, &jobType, &args, &queued, &started)
if err == nil {
break
}
@ -229,6 +234,11 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID,
}
}
diff := started.Sub(*queued).Seconds()
prometheus.JobWaitDuration.WithLabelValues(jobType).Observe(diff)
prometheus.PendingJobs.WithLabelValues(jobType).Dec()
prometheus.RunningJobs.WithLabelValues(jobType).Inc()
// insert heartbeat
_, err = conn.Exec(ctx, sqlInsertHeartbeat, token, id)
if err != nil {
@ -258,9 +268,10 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID,
var jobType string
var args json.RawMessage
var started, queued *time.Time
token := uuid.New()
err = conn.QueryRow(ctx, sqlDequeueByID, token, id).Scan(&token, &jobType, &args)
err = conn.QueryRow(ctx, sqlDequeueByID, token, id).Scan(&token, &jobType, &args, &queued, &started)
if err == pgx.ErrNoRows {
return uuid.Nil, nil, "", nil, jobqueue.ErrNotPending
} else if err != nil {
@ -279,6 +290,10 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID,
}
logrus.Infof("Dequeued job of type %v with ID %s", jobType, id)
diff := started.Sub(*queued).Seconds()
prometheus.JobWaitDuration.WithLabelValues(jobType).Observe(diff)
prometheus.PendingJobs.WithLabelValues(jobType).Dec()
prometheus.RunningJobs.WithLabelValues(jobType).Inc()
return token, dependencies, jobType, args, nil
}
@ -303,9 +318,10 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
}()
// Use double pointers for timestamps because they might be NULL, which would result in *time.Time == nil
var finished *time.Time
var started, finished *time.Time
var jobType string
canceled := false
err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(nil, nil, nil, &finished, &canceled)
err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, nil, &started, nil, &canceled)
if err == pgx.ErrNoRows {
return jobqueue.ErrNotExist
}
@ -326,15 +342,15 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
return jobqueue.ErrNotExist
}
tag, err = conn.Exec(context.Background(), sqlFinishJob, result, id)
err = conn.QueryRow(context.Background(), sqlFinishJob, result, id).Scan(&finished)
if err == pgx.ErrNoRows {
return jobqueue.ErrNotExist
}
if err != nil {
return fmt.Errorf("error finishing job %s: %v", id, err)
}
if tag.RowsAffected() != 1 {
return jobqueue.ErrNotExist
}
_, err = conn.Exec(context.Background(), sqlNotify)
if err != nil {
return fmt.Errorf("error notifying jobs channel: %v", err)
@ -346,6 +362,9 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error {
}
logrus.Infof("Finished job with ID %s", id)
diff := finished.Sub(*started).Seconds()
prometheus.JobDuration.WithLabelValues(jobType).Observe(diff)
prometheus.RunningJobs.WithLabelValues(jobType).Dec()
return nil
}
@ -357,16 +376,22 @@ func (q *DBJobQueue) CancelJob(id uuid.UUID) error {
}
defer conn.Release()
tag, err := conn.Exec(context.Background(), sqlCancelJob, id)
var started *time.Time
var jobType string
err = conn.QueryRow(context.Background(), sqlCancelJob, id).Scan(&jobType, &started)
if err == pgx.ErrNoRows {
return jobqueue.ErrNotRunning
}
if err != nil {
return fmt.Errorf("error canceling job %s: %v", id, err)
}
if tag.RowsAffected() != 1 {
return jobqueue.ErrNotRunning
}
logrus.Infof("Cancelled job with ID %s", id)
if started != nil {
prometheus.RunningJobs.WithLabelValues(jobType).Dec()
} else {
prometheus.PendingJobs.WithLabelValues(jobType).Dec()
}
return nil
}
@ -410,7 +435,7 @@ func (q *DBJobQueue) Job(id uuid.UUID) (jobType string, args json.RawMessage, de
}
defer conn.Release()
err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, &args, nil, nil)
err = conn.QueryRow(context.Background(), sqlQueryJob, id).Scan(&jobType, &args, nil, nil, nil)
if err == pgx.ErrNoRows {
err = jobqueue.ErrNotExist
return

View file

@ -0,0 +1,46 @@
package prometheus
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
const workerSubsystem = "composer_worker"
var (
PendingJobs = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "pending_jobs",
Namespace: namespace,
Subsystem: workerSubsystem,
Help: "Currently pending jobs",
}, []string{"type"})
)
var (
RunningJobs = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "running_jobs",
Namespace: namespace,
Subsystem: workerSubsystem,
Help: "Currently running jobs",
}, []string{"type"})
)
var (
JobDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "job_duration_seconds",
Namespace: namespace,
Subsystem: workerSubsystem,
Help: "Duration spent by workers on a job.",
Buckets: []float64{.1, .2, .5, 1, 2, 4, 8, 16, 32, 64, 96, 128, 160, 192, 224, 256, 320, 382, 448, 512, 640, 768, 896, 1024, 1280, 1536, 1792, 2049},
}, []string{"type"})
)
var (
JobWaitDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "job_wait_duration_seconds",
Namespace: namespace,
Subsystem: workerSubsystem,
Help: "Duration a job spends on the queue.",
Buckets: []float64{.1, .2, .5, 1, 2, 4, 8, 16, 32, 64, 96, 128, 160, 192, 224, 256, 320, 382, 448, 512, 640, 768, 896, 1024, 1280, 1536, 1792, 2049},
}, []string{"type"})
)