diff --git a/internal/jobqueue/dbjobqueue/dbjobqueue.go b/internal/jobqueue/dbjobqueue/dbjobqueue.go index 1df982b50..3b0819e29 100644 --- a/internal/jobqueue/dbjobqueue/dbjobqueue.go +++ b/internal/jobqueue/dbjobqueue/dbjobqueue.go @@ -67,7 +67,7 @@ const ( FROM jobs WHERE id = $1` sqlQueryJobStatus = ` - SELECT type, result, queued_at, started_at, finished_at, canceled + SELECT type, channel, result, queued_at, started_at, finished_at, canceled FROM jobs WHERE id = $1` sqlQueryRunningId = ` @@ -473,7 +473,7 @@ func (q *DBJobQueue) CancelJob(id uuid.UUID) error { return nil } -func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) { +func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) { conn, err := q.pool.Acquire(context.Background()) if err != nil { return @@ -483,7 +483,7 @@ func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, result json.RawMes // Use double pointers for timestamps because they might be NULL, which would result in *time.Time == nil var sp, fp *time.Time var rp pgtype.JSON - err = conn.QueryRow(context.Background(), sqlQueryJobStatus, id).Scan(&jobType, &rp, &queued, &sp, &fp, &canceled) + err = conn.QueryRow(context.Background(), sqlQueryJobStatus, id).Scan(&jobType, &channel, &rp, &queued, &sp, &fp, &canceled) if err != nil { return } diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index e3c8c5797..3a38d7483 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -343,13 +343,14 @@ func (q *fsJobQueue) CancelJob(id uuid.UUID) error { return nil } -func (q *fsJobQueue) JobStatus(id uuid.UUID) (jobType string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) { +func (q *fsJobQueue) JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) { j, err := q.readJob(id) if err != nil { return } jobType = j.Type + channel = j.Channel result = j.Result queued = j.QueuedAt started = j.StartedAt diff --git a/internal/jobqueue/jobqueue.go b/internal/jobqueue/jobqueue.go index 28ae645d2..d23133728 100644 --- a/internal/jobqueue/jobqueue.go +++ b/internal/jobqueue/jobqueue.go @@ -64,7 +64,7 @@ type JobQueue interface { // finished, respectively. // // Lastly, the IDs of the jobs dependencies are returned. - JobStatus(id uuid.UUID) (jobType string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) + JobStatus(id uuid.UUID) (jobType string, channel string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) // Job returns all the parameters that define a job (everything provided during Enqueue). Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, channel string, err error) diff --git a/internal/jobqueue/jobqueuetest/jobqueuetest.go b/internal/jobqueue/jobqueuetest/jobqueuetest.go index 4cf262d3d..fd391ad1a 100644 --- a/internal/jobqueue/jobqueuetest/jobqueuetest.go +++ b/internal/jobqueue/jobqueuetest/jobqueuetest.go @@ -212,7 +212,7 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) { require.ElementsMatch(t, []uuid.UUID{one, two}, r) j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two}, "") - jobType, _, queued, started, finished, canceled, deps, err := q.JobStatus(j) + jobType, _, _, queued, started, finished, canceled, deps, err := q.JobStatus(j) require.NoError(t, err) require.Equal(t, jobType, "test") require.True(t, !queued.IsZero()) @@ -223,7 +223,7 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two})) - jobType, result, queued, started, finished, canceled, deps, err := q.JobStatus(j) + jobType, _, result, queued, started, finished, canceled, deps, err := q.JobStatus(j) require.NoError(t, err) require.Equal(t, jobType, "test") require.True(t, !queued.IsZero()) @@ -241,7 +241,7 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) { two := pushTestJob(t, q, "test", nil, nil, "") j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two}, "") - jobType, _, queued, started, finished, canceled, deps, err := q.JobStatus(j) + jobType, _, _, queued, started, finished, canceled, deps, err := q.JobStatus(j) require.NoError(t, err) require.Equal(t, jobType, "test") require.True(t, !queued.IsZero()) @@ -257,7 +257,7 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two})) - jobType, result, queued, started, finished, canceled, deps, err := q.JobStatus(j) + jobType, _, result, queued, started, finished, canceled, deps, err := q.JobStatus(j) require.NoError(t, err) require.Equal(t, jobType, "test") require.True(t, !queued.IsZero()) @@ -353,7 +353,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { require.NotEmpty(t, id) err = q.CancelJob(id) require.NoError(t, err) - jobType, result, _, _, _, canceled, _, err := q.JobStatus(id) + jobType, _, result, _, _, _, canceled, _, err := q.JobStatus(id) require.NoError(t, err) require.Equal(t, jobType, "clownfish") require.True(t, canceled) @@ -373,7 +373,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { require.Equal(t, json.RawMessage("null"), args) err = q.CancelJob(id) require.NoError(t, err) - jobType, result, _, _, _, canceled, _, err = q.JobStatus(id) + jobType, _, result, _, _, _, canceled, _, err = q.JobStatus(id) require.NoError(t, err) require.Equal(t, jobType, "clownfish") require.True(t, canceled) @@ -396,7 +396,7 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) { err = q.CancelJob(id) require.Error(t, err) require.Equal(t, jobqueue.ErrNotRunning, err) - jobType, result, _, _, _, canceled, _, err = q.JobStatus(id) + jobType, _, result, _, _, _, canceled, _, err = q.JobStatus(id) require.NoError(t, err) require.Equal(t, jobType, "clownfish") require.False(t, canceled) @@ -613,7 +613,7 @@ func test100dequeuers(t *testing.T, q jobqueue.JobQueue) { // try to do some other operations on the jobqueue id := pushTestJob(t, q, "clownfish", nil, nil, "") - _, _, _, _, _, _, _, err := q.JobStatus(id) + _, _, _, _, _, _, _, _, err := q.JobStatus(id) require.NoError(t, err) finishNextTestJob(t, q, "clownfish", testResult{}, nil) diff --git a/internal/prometheus/job_metrics.go b/internal/prometheus/job_metrics.go index b0f63c3fa..0b720830f 100644 --- a/internal/prometheus/job_metrics.go +++ b/internal/prometheus/job_metrics.go @@ -16,7 +16,7 @@ var ( Namespace: namespace, Subsystem: workerSubsystem, Help: "Total jobs", - }, []string{"type", "status"}) + }, []string{"type", "status", "tenant"}) ) var ( @@ -25,7 +25,7 @@ var ( Namespace: namespace, Subsystem: workerSubsystem, Help: "Currently pending jobs", - }, []string{"type"}) + }, []string{"type", "tenant"}) ) var ( @@ -34,7 +34,7 @@ var ( Namespace: namespace, Subsystem: workerSubsystem, Help: "Currently running jobs", - }, []string{"type"}) + }, []string{"type", "tenant"}) ) var ( @@ -44,7 +44,7 @@ var ( Subsystem: workerSubsystem, Help: "Duration spent by workers on a job.", Buckets: []float64{.1, .2, .5, 1, 2, 4, 8, 16, 32, 40, 48, 64, 96, 128, 160, 192, 224, 256, 320, 382, 448, 512, 640, 768, 896, 1024, 1280, 1536, 1792, 2049}, - }, []string{"type", "status"}) + }, []string{"type", "status", "tenant"}) ) var ( @@ -54,35 +54,35 @@ var ( Subsystem: workerSubsystem, Help: "Duration a job spends on the queue.", Buckets: []float64{.1, .2, .5, 1, 2, 4, 8, 16, 32, 40, 48, 64, 96, 128, 160, 192, 224, 256, 320, 382, 448, 512, 640, 768, 896, 1024, 1280, 1536, 1792, 2049}, - }, []string{"type"}) + }, []string{"type", "tenant"}) ) -func EnqueueJobMetrics(jobType string) { - PendingJobs.WithLabelValues(jobType).Inc() +func EnqueueJobMetrics(jobType, tenant string) { + PendingJobs.WithLabelValues(jobType, tenant).Inc() } -func DequeueJobMetrics(pending time.Time, started time.Time, jobType string) { +func DequeueJobMetrics(pending time.Time, started time.Time, jobType, tenant string) { if !started.IsZero() && !pending.IsZero() { diff := started.Sub(pending).Seconds() - JobWaitDuration.WithLabelValues(jobType).Observe(diff) - PendingJobs.WithLabelValues(jobType).Dec() - RunningJobs.WithLabelValues(jobType).Inc() + JobWaitDuration.WithLabelValues(jobType, tenant).Observe(diff) + PendingJobs.WithLabelValues(jobType, tenant).Dec() + RunningJobs.WithLabelValues(jobType, tenant).Inc() } } -func CancelJobMetrics(started time.Time, jobType string) { +func CancelJobMetrics(started time.Time, jobType string, tenant string) { if !started.IsZero() { - RunningJobs.WithLabelValues(jobType).Dec() + RunningJobs.WithLabelValues(jobType, tenant).Dec() } else { - PendingJobs.WithLabelValues(jobType).Dec() + PendingJobs.WithLabelValues(jobType, tenant).Dec() } } -func FinishJobMetrics(started time.Time, finished time.Time, canceled bool, jobType string, status clienterrors.StatusCode) { +func FinishJobMetrics(started time.Time, finished time.Time, canceled bool, jobType, tenant string, status clienterrors.StatusCode) { if !finished.IsZero() && !canceled { diff := finished.Sub(started).Seconds() - JobDuration.WithLabelValues(jobType, status.ToString()).Observe(diff) - TotalJobs.WithLabelValues(jobType, status.ToString()).Inc() - RunningJobs.WithLabelValues(jobType).Dec() + JobDuration.WithLabelValues(jobType, status.ToString(), tenant).Observe(diff) + TotalJobs.WithLabelValues(jobType, status.ToString(), tenant).Inc() + RunningJobs.WithLabelValues(jobType, tenant).Dec() } } diff --git a/internal/worker/server.go b/internal/worker/server.go index 1416fb6ef..149ac82d0 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -133,7 +133,7 @@ func (s *Server) EnqueueManifestJobByID(job *ManifestJobByID, parent uuid.UUID, } func (s *Server) enqueue(jobType string, job interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error) { - prometheus.EnqueueJobMetrics(jobType) + prometheus.EnqueueJobMetrics(jobType, channel) return s.jobs.Enqueue(jobType, job, dependencies, channel) } @@ -166,7 +166,7 @@ func (s *Server) CheckBuildDependencies(dep uuid.UUID, jobErr *clienterrors.Erro } func (s *Server) OSBuildJobStatus(id uuid.UUID, result *OSBuildJobResult) (*JobStatus, []uuid.UUID, error) { - jobType, status, deps, err := s.jobStatus(id, result) + jobType, _, status, deps, err := s.jobStatus(id, result) if err != nil { return nil, nil, err } @@ -194,7 +194,7 @@ func (s *Server) OSBuildJobStatus(id uuid.UUID, result *OSBuildJobResult) (*JobS } func (s *Server) OSBuildKojiJobStatus(id uuid.UUID, result *OSBuildKojiJobResult) (*JobStatus, []uuid.UUID, error) { - jobType, status, deps, err := s.jobStatus(id, result) + jobType, _, status, deps, err := s.jobStatus(id, result) if err != nil { return nil, nil, err } @@ -217,7 +217,7 @@ func (s *Server) OSBuildKojiJobStatus(id uuid.UUID, result *OSBuildKojiJobResult } func (s *Server) KojiInitJobStatus(id uuid.UUID, result *KojiInitJobResult) (*JobStatus, []uuid.UUID, error) { - jobType, status, deps, err := s.jobStatus(id, result) + jobType, _, status, deps, err := s.jobStatus(id, result) if err != nil { return nil, nil, err } @@ -234,7 +234,7 @@ func (s *Server) KojiInitJobStatus(id uuid.UUID, result *KojiInitJobResult) (*Jo } func (s *Server) KojiFinalizeJobStatus(id uuid.UUID, result *KojiFinalizeJobResult) (*JobStatus, []uuid.UUID, error) { - jobType, status, deps, err := s.jobStatus(id, result) + jobType, _, status, deps, err := s.jobStatus(id, result) if err != nil { return nil, nil, err } @@ -251,7 +251,7 @@ func (s *Server) KojiFinalizeJobStatus(id uuid.UUID, result *KojiFinalizeJobResu } func (s *Server) DepsolveJobStatus(id uuid.UUID, result *DepsolveJobResult) (*JobStatus, []uuid.UUID, error) { - jobType, status, deps, err := s.jobStatus(id, result) + jobType, _, status, deps, err := s.jobStatus(id, result) if err != nil { return nil, nil, err } @@ -272,7 +272,7 @@ func (s *Server) DepsolveJobStatus(id uuid.UUID, result *DepsolveJobResult) (*Jo } func (s *Server) ManifestJobStatus(id uuid.UUID, result *ManifestJobByIDResult) (*JobStatus, []uuid.UUID, error) { - jobType, status, deps, err := s.jobStatus(id, result) + jobType, _, status, deps, err := s.jobStatus(id, result) if err != nil { return nil, nil, err } @@ -284,20 +284,20 @@ func (s *Server) ManifestJobStatus(id uuid.UUID, result *ManifestJobByIDResult) return status, deps, nil } -func (s *Server) jobStatus(id uuid.UUID, result interface{}) (string, *JobStatus, []uuid.UUID, error) { - jobType, rawResult, queued, started, finished, canceled, deps, err := s.jobs.JobStatus(id) +func (s *Server) jobStatus(id uuid.UUID, result interface{}) (string, string, *JobStatus, []uuid.UUID, error) { + jobType, channel, rawResult, queued, started, finished, canceled, deps, err := s.jobs.JobStatus(id) if err != nil { - return "", nil, nil, err + return "", "", nil, nil, err } if result != nil && !finished.IsZero() && !canceled { err = json.Unmarshal(rawResult, result) if err != nil { - return "", nil, nil, fmt.Errorf("error unmarshaling result for job '%s': %v", id, err) + return "", "", nil, nil, fmt.Errorf("error unmarshaling result for job '%s': %v", id, err) } } - return jobType, &JobStatus{ + return jobType, channel, &JobStatus{ Queued: queued, Started: started, Finished: finished, @@ -350,11 +350,11 @@ func (s *Server) JobType(id uuid.UUID) (string, error) { } func (s *Server) Cancel(id uuid.UUID) error { - jobType, status, _, err := s.jobStatus(id, nil) + jobType, channel, status, _, err := s.jobStatus(id, nil) if err != nil { logrus.Errorf("error getting job status: %v", err) } else { - prometheus.CancelJobMetrics(status.Started, jobType) + prometheus.CancelJobMetrics(status.Started, jobType, channel) } return s.jobs.CancelJob(id) } @@ -366,7 +366,7 @@ func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error return nil, 0, errors.New("Artifacts not enabled") } - _, status, _, err := s.jobStatus(id, nil) + _, _, status, _, err := s.jobStatus(id, nil) if err != nil { return nil, 0, err } @@ -395,7 +395,7 @@ func (s *Server) DeleteArtifacts(id uuid.UUID) error { return errors.New("Artifacts not enabled") } - _, status, _, err := s.jobStatus(id, nil) + _, _, status, _, err := s.jobStatus(id, nil) if err != nil { return err } @@ -449,10 +449,11 @@ func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, return } - jobType, status, _, err := s.jobStatus(jobId, nil) + jobType, channel, status, _, err := s.jobStatus(jobId, nil) if err != nil { logrus.Errorf("error retrieving job status: %v", err) - return + } else { + prometheus.DequeueJobMetrics(status.Queued, status.Started, jobType, channel) } // Record how long the job has been pending for, that is either how @@ -465,7 +466,7 @@ func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, // TODO: include type of arguments var result json.RawMessage var finished time.Time - _, result, _, _, finished, _, _, err = s.jobs.JobStatus(depID) + _, _, result, _, _, finished, _, _, err = s.jobs.JobStatus(depID) if err != nil { return } @@ -484,7 +485,7 @@ func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, // TODO: Drop the ':$architecture' for metrics too, first prometheus queries for alerts and // dashboards need to be adjusted. - prometheus.DequeueJobMetrics(pending, status.Started, jobType) + prometheus.DequeueJobMetrics(pending, status.Started, jobType, channel) if jobType == "osbuild:"+arch { jobType = "osbuild" } else if jobType == "osbuild-koji:"+arch { @@ -516,12 +517,12 @@ func (s *Server) FinishJob(token uuid.UUID, result json.RawMessage) error { } var jobResult JobResult - jobType, status, _, err := s.jobStatus(jobId, &jobResult) + jobType, channel, status, _, err := s.jobStatus(jobId, &jobResult) if err != nil { logrus.Errorf("error finding job status: %v", err) } else { statusCode := clienterrors.GetStatusCode(jobResult.JobError) - prometheus.FinishJobMetrics(status.Started, status.Finished, status.Canceled, jobType, statusCode) + prometheus.FinishJobMetrics(status.Started, status.Finished, status.Canceled, jobType, channel, statusCode) } // Move artifacts from the temporary location to the final job @@ -664,7 +665,7 @@ func (h *apiHandlers) GetJob(ctx echo.Context, tokenstr string) error { h.server.jobs.RefreshHeartbeat(token) - _, status, _, err := h.server.jobStatus(jobId, nil) + _, _, status, _, err := h.server.jobStatus(jobId, nil) if err != nil { return api.HTTPErrorWithInternal(api.ErrorRetrievingJobStatus, err) }