From 6c4caec02248eb55d24ae0ded7c37250e8e9a44a Mon Sep 17 00:00:00 2001 From: Gianluca Zuccarelli Date: Mon, 31 Jan 2022 11:20:53 +0000 Subject: [PATCH] metrics: move metrics to worker server For simplicity, the collection of the job metrics was carried out in the job queue. This was only being done in the dbqueue and not in the fsqueue. This pr refactors the metric collection and moves the job metrics to the worker server, by adding a wrapper function to enqueueing jobs so that the metrics only have to be recorded in one place when queueing a job. --- internal/jobqueue/dbjobqueue/dbjobqueue.go | 7 ---- internal/worker/server.go | 43 ++++++++++++++++++---- 2 files changed, 35 insertions(+), 15 deletions(-) diff --git a/internal/jobqueue/dbjobqueue/dbjobqueue.go b/internal/jobqueue/dbjobqueue/dbjobqueue.go index 2ecc7880c..84f289a03 100644 --- a/internal/jobqueue/dbjobqueue/dbjobqueue.go +++ b/internal/jobqueue/dbjobqueue/dbjobqueue.go @@ -21,7 +21,6 @@ import ( logrus "github.com/sirupsen/logrus" "github.com/osbuild/osbuild-composer/internal/jobqueue" - "github.com/osbuild/osbuild-composer/internal/prometheus" ) const ( @@ -183,7 +182,6 @@ func (q *DBJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu return uuid.Nil, fmt.Errorf("unable to commit database transaction: %v", err) } - prometheus.EnqueueJobMetrics(jobType) logrus.Infof("Enqueued job of type %s with ID %s(dependencies %v)", jobType, id, dependencies) return id, nil @@ -234,8 +232,6 @@ func (q *DBJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, } } - prometheus.DequeueJobMetrics(*queued, *started, jobType) - // insert heartbeat _, err = conn.Exec(ctx, sqlInsertHeartbeat, token, id) if err != nil { @@ -287,7 +283,6 @@ func (q *DBJobQueue) DequeueByID(ctx context.Context, id uuid.UUID) (uuid.UUID, } logrus.Infof("Dequeued job of type %v with ID %s", jobType, id) - prometheus.DequeueJobMetrics(*queued, *started, jobType) return token, dependencies, jobType, args, nil } @@ -356,7 +351,6 @@ func (q *DBJobQueue) FinishJob(id uuid.UUID, result interface{}) error { } logrus.Infof("Finished job with ID %s", id) - prometheus.FinishJobMetrics(*started, *finished, canceled, jobType) return nil } @@ -379,7 +373,6 @@ func (q *DBJobQueue) CancelJob(id uuid.UUID) error { } logrus.Infof("Cancelled job with ID %s", id) - prometheus.CancelJobMetrics(*started, jobType) return nil } diff --git a/internal/worker/server.go b/internal/worker/server.go index aa9a84ce4..65f60b14a 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -22,6 +22,7 @@ import ( "github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/jobqueue" + "github.com/osbuild/osbuild-composer/internal/prometheus" "github.com/osbuild/osbuild-composer/internal/worker/api" "github.com/osbuild/osbuild-composer/internal/worker/clienterrors" ) @@ -93,35 +94,40 @@ func (s *Server) WatchHeartbeats() { } func (s *Server) EnqueueOSBuild(arch string, job *OSBuildJob) (uuid.UUID, error) { - return s.jobs.Enqueue("osbuild:"+arch, job, nil) + return s.enqueue("osbuild:"+arch, job, nil) } func (s *Server) EnqueueOSBuildAsDependency(arch string, job *OSBuildJob, manifestID uuid.UUID) (uuid.UUID, error) { - return s.jobs.Enqueue("osbuild:"+arch, job, []uuid.UUID{manifestID}) + return s.enqueue("osbuild:"+arch, job, []uuid.UUID{manifestID}) } func (s *Server) EnqueueOSBuildKoji(arch string, job *OSBuildKojiJob, initID uuid.UUID) (uuid.UUID, error) { - return s.jobs.Enqueue("osbuild-koji:"+arch, job, []uuid.UUID{initID}) + return s.enqueue("osbuild-koji:"+arch, job, []uuid.UUID{initID}) } func (s *Server) EnqueueOSBuildKojiAsDependency(arch string, job *OSBuildKojiJob, manifestID, initID uuid.UUID) (uuid.UUID, error) { - return s.jobs.Enqueue("osbuild-koji:"+arch, job, []uuid.UUID{initID, manifestID}) + return s.enqueue("osbuild-koji:"+arch, job, []uuid.UUID{initID, manifestID}) } func (s *Server) EnqueueKojiInit(job *KojiInitJob) (uuid.UUID, error) { - return s.jobs.Enqueue("koji-init", job, nil) + return s.enqueue("koji-init", job, nil) } func (s *Server) EnqueueKojiFinalize(job *KojiFinalizeJob, initID uuid.UUID, buildIDs []uuid.UUID) (uuid.UUID, error) { - return s.jobs.Enqueue("koji-finalize", job, append([]uuid.UUID{initID}, buildIDs...)) + return s.enqueue("koji-finalize", job, append([]uuid.UUID{initID}, buildIDs...)) } func (s *Server) EnqueueDepsolve(job *DepsolveJob) (uuid.UUID, error) { - return s.jobs.Enqueue("depsolve", job, nil) + return s.enqueue("depsolve", job, nil) } func (s *Server) EnqueueManifestJobByID(job *ManifestJobByID, parent uuid.UUID) (uuid.UUID, error) { - return s.jobs.Enqueue("manifest-id-only", job, []uuid.UUID{parent}) + return s.enqueue("manifest-id-only", job, []uuid.UUID{parent}) +} + +func (s *Server) enqueue(jobType string, job interface{}, dependencies []uuid.UUID) (uuid.UUID, error) { + prometheus.EnqueueJobMetrics(jobType) + return s.jobs.Enqueue(jobType, job, dependencies) } func (s *Server) OSBuildJobStatus(id uuid.UUID, result *OSBuildJobResult) (*JobStatus, []uuid.UUID, error) { @@ -326,6 +332,12 @@ func (s *Server) JobType(id uuid.UUID) (string, error) { } func (s *Server) Cancel(id uuid.UUID) error { + jobType, status, _, err := s.jobStatus(id, nil) + if err != nil { + logrus.Errorf("error getting job status: %v", err) + } else { + prometheus.CancelJobMetrics(status.Started, jobType) + } return s.jobs.CancelJob(id) } @@ -419,6 +431,13 @@ func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, return } + jobType, status, _, err := s.jobStatus(jobId, nil) + if err != nil { + logrus.Errorf("error retrieving job status: %v", err) + } else { + prometheus.DequeueJobMetrics(status.Queued, status.Started, jobType) + } + for _, depID := range depIDs { // TODO: include type of arguments _, result, _, _, _, _, _, _ := s.jobs.JobStatus(depID) @@ -462,6 +481,14 @@ func (s *Server) FinishJob(token uuid.UUID, result json.RawMessage) error { } } + var jobResult JobResult + jobType, status, _, err := s.jobStatus(jobId, &jobResult) + if err != nil { + logrus.Errorf("error finding job status: %v", err) + } else { + prometheus.FinishJobMetrics(status.Started, status.Finished, status.Canceled, jobType) + } + // Move artifacts from the temporary location to the final job // location. Log any errors, but do not treat them as fatal. The job is // already finished.