diff --git a/internal/jobqueue/dbjobqueue/dbjobqueue.go b/internal/jobqueue/dbjobqueue/dbjobqueue.go index 246798bec..2ecc7880c 100644 --- a/internal/jobqueue/dbjobqueue/dbjobqueue.go +++ b/internal/jobqueue/dbjobqueue/dbjobqueue.go @@ -183,7 +183,7 @@ func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu return uuid.Nil, fmt.Errorf("unable to commit database transaction: %v", err) } - prometheus.PendingJobs.WithLabelValues(jobType).Inc() + prometheus.EnqueueJobMetrics(jobType) logrus.Infof("Enqueued job of type %s with ID %s(dependencies %v)", jobType, id, dependencies) return id, nil @@ -234,10 +234,7 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, } } - diff := started.Sub(*queued).Seconds() - prometheus.JobWaitDuration.WithLabelValues(jobType).Observe(diff) - prometheus.PendingJobs.WithLabelValues(jobType).Dec() - prometheus.RunningJobs.WithLabelValues(jobType).Inc() + prometheus.DequeueJobMetrics(*queued, *started, jobType) // insert heartbeat _, err = conn.Exec(ctx, sqlInsertHeartbeat, token, id) @@ -290,10 +287,7 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, } logrus.Infof("Dequeued job of type %v with ID %s", jobType, id) - diff := started.Sub(*queued).Seconds() - prometheus.JobWaitDuration.WithLabelValues(jobType).Observe(diff) - prometheus.PendingJobs.WithLabelValues(jobType).Dec() - prometheus.RunningJobs.WithLabelValues(jobType).Inc() + prometheus.DequeueJobMetrics(*queued, *started, jobType) return token, dependencies, jobType, args, nil } @@ -362,9 +356,7 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error { } logrus.Infof("Finished job with ID %s", id) - diff := finished.Sub(*started).Seconds() - prometheus.JobDuration.WithLabelValues(jobType).Observe(diff) - prometheus.RunningJobs.WithLabelValues(jobType).Dec() + prometheus.FinishJobMetrics(*started, *finished, canceled, jobType) return nil } @@ -387,11 +379,7 @@ func (q *DBJobQueue) CancelJob(id uuid.UUID) error { } logrus.Infof("Cancelled job with ID %s", id) - if started != nil { - prometheus.RunningJobs.WithLabelValues(jobType).Dec() - } else { - prometheus.PendingJobs.WithLabelValues(jobType).Dec() - } + prometheus.CancelJobMetrics(*started, jobType) return nil } diff --git a/internal/prometheus/job_metrics.go b/internal/prometheus/job_metrics.go index 9c6e17b5e..93503a2c9 100644 --- a/internal/prometheus/job_metrics.go +++ b/internal/prometheus/job_metrics.go @@ -1,6 +1,8 @@ package prometheus import ( + "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -44,3 +46,32 @@ var ( Buckets: []float64{.1, .2, .5, 1, 2, 4, 8, 16, 32, 40, 48, 64, 96, 128, 160, 192, 224, 256, 320, 382, 448, 512, 640, 768, 896, 1024, 1280, 1536, 1792, 2049}, }, []string{"type"}) ) + +func EnqueueJobMetrics(jobType string) { + PendingJobs.WithLabelValues(jobType).Inc() +} + +func DequeueJobMetrics(queued time.Time, started time.Time, jobType string) { + if !started.IsZero() && !queued.IsZero() { + diff := started.Sub(queued).Seconds() + JobWaitDuration.WithLabelValues(jobType).Observe(diff) + PendingJobs.WithLabelValues(jobType).Dec() + RunningJobs.WithLabelValues(jobType).Inc() + } +} + +func CancelJobMetrics(started time.Time, jobType string) { + if !started.IsZero() { + RunningJobs.WithLabelValues(jobType).Dec() + } else { + PendingJobs.WithLabelValues(jobType).Dec() + } +} + +func FinishJobMetrics(started time.Time, finished time.Time, canceled bool, jobType string) { + if !finished.IsZero() && !canceled { + diff := finished.Sub(started).Seconds() + JobDuration.WithLabelValues(jobType).Observe(diff) + RunningJobs.WithLabelValues(jobType).Dec() + } +}