debian-forge-composer/internal/worker/server.go
Brian C. Lane 56fc58cca3 cloudapi: Add DeleteCompose to delete a job by UUID
This adds the handler for DELETE /composes/{id} which will delete a job and
all of its dependencies, and any artifacts.

Related: RHEL-60120
2025-06-05 10:32:56 +02:00

1249 lines
35 KiB
Go

package worker
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"path"
"strconv"
"strings"
"time"
"github.com/google/uuid"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"github.com/sirupsen/logrus"
"github.com/osbuild/osbuild-composer/pkg/jobqueue"
"github.com/osbuild/osbuild-composer/internal/auth"
"github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/prometheus"
"github.com/osbuild/osbuild-composer/internal/worker/api"
"github.com/osbuild/osbuild-composer/internal/worker/clienterrors"
)
const (
JobTypeOSBuild string = "osbuild"
JobTypeKojiInit string = "koji-init"
JobTypeKojiFinalize string = "koji-finalize"
JobTypeDepsolve string = "depsolve"
JobTypeSearchPackages string = "search-packages"
JobTypeManifestIDOnly string = "manifest-id-only"
JobTypeContainerResolve string = "container-resolve"
JobTypeFileResolve string = "file-resolve"
JobTypeOSTreeResolve string = "ostree-resolve"
JobTypeAWSEC2Copy string = "aws-ec2-copy"
JobTypeAWSEC2Share string = "aws-ec2-share"
)
type Server struct {
jobs jobqueue.JobQueue
logger *log.Logger
config Config
}
type JobStatus struct {
Queued time.Time
Started time.Time
Finished time.Time
Canceled bool
}
type JobInfo struct {
JobType string
Channel string
JobStatus *JobStatus
Deps []uuid.UUID
Dependents []uuid.UUID
}
var ErrInvalidToken = errors.New("token does not exist")
var ErrJobNotRunning = errors.New("job isn't running")
var ErrInvalidJobType = errors.New("job has invalid type")
type Config struct {
ArtifactsDir string
RequestJobTimeout time.Duration
BasePath string
JWTEnabled bool
TenantProviderFields []string
JobTimeout time.Duration
JobWatchFreq time.Duration
WorkerTimeout time.Duration
WorkerWatchFreq time.Duration
}
func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, config Config) *Server {
s := &Server{
jobs: jobs,
logger: logger,
config: config,
}
if s.config.JobTimeout == 0 {
s.config.JobTimeout = time.Second * 120
}
if s.config.JobWatchFreq == 0 {
s.config.JobWatchFreq = time.Second * 30
}
if s.config.WorkerTimeout == 0 {
s.config.WorkerTimeout = time.Hour
}
if s.config.WorkerWatchFreq == 0 {
s.config.WorkerWatchFreq = time.Second * 300
}
api.BasePath = config.BasePath
go s.WatchHeartbeats()
go s.WatchWorkers()
return s
}
func (s *Server) Handler() http.Handler {
e := echo.New()
e.Binder = binder{}
e.Logger = common.Logger()
// log errors returned from handlers
e.HTTPErrorHandler = api.HTTPErrorHandler
e.Use(middleware.Recover())
e.Pre(common.OperationIDMiddleware)
handler := apiHandlers{
server: s,
}
mws := []echo.MiddlewareFunc{
prometheus.StatusMiddleware(prometheus.WorkerSubsystem),
}
if s.config.JWTEnabled {
mws = append(mws, auth.TenantChannelMiddleware(s.config.TenantProviderFields, api.HTTPError(api.ErrorTenantNotFound)))
}
mws = append(mws, prometheus.HTTPDurationMiddleware(prometheus.WorkerSubsystem))
api.RegisterHandlers(e.Group(api.BasePath, mws...), &handler)
return e
}
const maxHeartbeatRetries = 2
// This function should be started as a goroutine
// With default durations it goes through all running jobs every 30 seconds and fails any unresponsive
// ones. Unresponsive jobs haven't checked whether or not they're cancelled in the past 2 minutes.
func (s *Server) WatchHeartbeats() {
//nolint:staticcheck // avoid SA1015, this is an endless function
for range time.Tick(s.config.JobWatchFreq) {
for _, token := range s.jobs.Heartbeats(s.config.JobTimeout) {
id, _ := s.jobs.IdFromToken(token)
logrus.Infof("Removing unresponsive job: %s\n", id)
missingHeartbeatResult := JobResult{
JobError: clienterrors.New(clienterrors.ErrorJobMissingHeartbeat,
fmt.Sprintf("Workers running this job stopped responding more than %d times.", maxHeartbeatRetries),
nil),
}
resJson, err := json.Marshal(missingHeartbeatResult)
if err != nil {
logrus.Panicf("Cannot marshal the heartbeat error: %v", err)
}
err = s.RequeueOrFinishJob(token, maxHeartbeatRetries, resJson)
if err != nil {
logrus.Errorf("Error requeueing or finishing unresponsive job: %v", err)
}
}
}
}
// This function should be started as a goroutine
// Every 5 minutes it goes through all workers, removing any unresponsive ones.
func (s *Server) WatchWorkers() {
//nolint:staticcheck // avoid SA1015, this is an endless function
for range time.Tick(s.config.WorkerWatchFreq) {
workers, err := s.jobs.Workers(s.config.WorkerTimeout)
if err != nil {
logrus.Warningf("Unable to query workers: %v", err)
continue
}
for _, w := range workers {
logrus.Infof("Removing inactive worker: %s", w.ID)
err = s.jobs.DeleteWorker(w.ID)
if err != nil {
logrus.Warningf("Unable to remove worker: %v", err)
}
}
}
}
func (s *Server) EnqueueOSBuild(arch string, job *OSBuildJob, channel string) (uuid.UUID, error) {
return s.enqueue(JobTypeOSBuild+":"+arch, job, nil, channel)
}
func (s *Server) EnqueueOSBuildAsDependency(arch string, job *OSBuildJob, dependencies []uuid.UUID, channel string) (uuid.UUID, error) {
return s.enqueue(JobTypeOSBuild+":"+arch, job, dependencies, channel)
}
func (s *Server) EnqueueKojiInit(job *KojiInitJob, channel string) (uuid.UUID, error) {
return s.enqueue(JobTypeKojiInit, job, nil, channel)
}
func (s *Server) EnqueueKojiFinalize(job *KojiFinalizeJob, initID uuid.UUID, buildIDs []uuid.UUID, channel string) (uuid.UUID, error) {
return s.enqueue(JobTypeKojiFinalize, job, append([]uuid.UUID{initID}, buildIDs...), channel)
}
func (s *Server) EnqueueDepsolve(job *DepsolveJob, channel string) (uuid.UUID, error) {
return s.enqueue(JobTypeDepsolve, job, nil, channel)
}
func (s *Server) EnqueueSearchPackages(job *SearchPackagesJob, channel string) (uuid.UUID, error) {
return s.enqueue(JobTypeSearchPackages, job, nil, channel)
}
func (s *Server) EnqueueManifestJobByID(job *ManifestJobByID, dependencies []uuid.UUID, channel string) (uuid.UUID, error) {
if len(dependencies) == 0 {
panic("EnqueueManifestJobByID has no dependencies, expected at least a depsolve job")
}
return s.enqueue(JobTypeManifestIDOnly, job, dependencies, channel)
}
func (s *Server) EnqueueContainerResolveJob(job *ContainerResolveJob, channel string) (uuid.UUID, error) {
return s.enqueue(JobTypeContainerResolve, job, nil, channel)
}
func (s *Server) EnqueueFileResolveJob(job *FileResolveJob, channel string) (uuid.UUID, error) {
return s.enqueue(JobTypeFileResolve, job, nil, channel)
}
func (s *Server) EnqueueOSTreeResolveJob(job *OSTreeResolveJob, channel string) (uuid.UUID, error) {
return s.enqueue(JobTypeOSTreeResolve, job, nil, channel)
}
func (s *Server) EnqueueAWSEC2CopyJob(job *AWSEC2CopyJob, parent uuid.UUID, channel string) (uuid.UUID, error) {
return s.enqueue(JobTypeAWSEC2Copy, job, []uuid.UUID{parent}, channel)
}
func (s *Server) EnqueueAWSEC2ShareJob(job *AWSEC2ShareJob, parent uuid.UUID, channel string) (uuid.UUID, error) {
return s.enqueue(JobTypeAWSEC2Share, job, []uuid.UUID{parent}, channel)
}
func (s *Server) enqueue(jobType string, job interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error) {
prometheus.EnqueueJobMetrics(strings.Split(jobType, ":")[0], channel)
return s.jobs.Enqueue(jobType, job, dependencies, channel)
}
// DependencyChainErrors recursively gathers all errors from job's dependencies,
// which caused it to fail. If the job didn't fail, `nil` is returned.
func (s *Server) JobDependencyChainErrors(id uuid.UUID) (*clienterrors.Error, error) {
jobType, err := s.JobType(id)
if err != nil {
return nil, err
}
var jobResult *JobResult
var jobInfo *JobInfo
switch jobType {
case JobTypeOSBuild:
var osbuildJR OSBuildJobResult
jobInfo, err = s.OSBuildJobInfo(id, &osbuildJR)
if err != nil {
return nil, err
}
jobResult = &osbuildJR.JobResult
case JobTypeDepsolve:
var depsolveJR DepsolveJobResult
jobInfo, err = s.DepsolveJobInfo(id, &depsolveJR)
if err != nil {
return nil, err
}
jobResult = &depsolveJR.JobResult
case JobTypeSearchPackages:
var searchJR SearchPackagesJobResult
jobInfo, err = s.SearchPackagesJobInfo(id, &searchJR)
if err != nil {
return nil, err
}
jobResult = &searchJR.JobResult
case JobTypeManifestIDOnly:
var manifestJR ManifestJobByIDResult
jobInfo, err = s.ManifestJobInfo(id, &manifestJR)
if err != nil {
return nil, err
}
jobResult = &manifestJR.JobResult
case JobTypeKojiInit:
var kojiInitJR KojiInitJobResult
jobInfo, err = s.KojiInitJobInfo(id, &kojiInitJR)
if err != nil {
return nil, err
}
jobResult = &kojiInitJR.JobResult
case JobTypeKojiFinalize:
var kojiFinalizeJR KojiFinalizeJobResult
jobInfo, err = s.KojiFinalizeJobInfo(id, &kojiFinalizeJR)
if err != nil {
return nil, err
}
jobResult = &kojiFinalizeJR.JobResult
case JobTypeContainerResolve:
var containerResolveJR ContainerResolveJobResult
jobInfo, err = s.ContainerResolveJobInfo(id, &containerResolveJR)
if err != nil {
return nil, err
}
jobResult = &containerResolveJR.JobResult
case JobTypeFileResolve:
var fileResolveJR FileResolveJobResult
jobInfo, err = s.FileResolveJobInfo(id, &fileResolveJR)
if err != nil {
return nil, err
}
jobResult = &fileResolveJR.JobResult
case JobTypeOSTreeResolve:
var ostreeResolveJR OSTreeResolveJobResult
jobInfo, err = s.OSTreeResolveJobInfo(id, &ostreeResolveJR)
if err != nil {
return nil, err
}
jobResult = &ostreeResolveJR.JobResult
default:
return nil, fmt.Errorf("unexpected job type: %s", jobType)
}
if jobError := jobResult.JobError; jobError != nil {
depErrors := []*clienterrors.Error{}
if jobError.IsDependencyError() {
// check job's dependencies
for _, dep := range jobInfo.Deps {
depError, err := s.JobDependencyChainErrors(dep)
if err != nil {
return nil, err
}
if depError != nil {
depErrors = append(depErrors, depError)
}
}
}
if len(depErrors) > 0 {
jobError.Details = depErrors
}
return jobError, nil
}
return nil, nil
}
// AllRootJobIDs returns a list of top level job UUIDs that the worker knows about
func (s *Server) AllRootJobIDs(ctx context.Context) ([]uuid.UUID, error) {
return s.jobs.AllRootJobIDs(ctx)
}
// CleanupArtifacts removes worker artifact directories that do not have matching jobs
// The UUID used for the artifact directory is the same as for the job that created it
func (s *Server) CleanupArtifacts() error {
artifacts, err := os.ReadDir(s.config.ArtifactsDir)
if err != nil {
return err
}
for _, d := range artifacts {
if !d.IsDir() {
continue
}
id, err := uuid.Parse(d.Name())
if err != nil {
continue
}
// Is there a job with this UUID?
if _, _, _, _, err := s.jobs.Job(id); err != nil {
// No associated job, it is safe to remove the unused artifact directory
// and everything under it, and the ComposeRequest (if it exists)
_ = os.Remove(path.Join(s.config.ArtifactsDir, "ComposeRequest", id.String()+".json"))
err = os.RemoveAll(path.Join(s.config.ArtifactsDir, id.String()))
if err != nil {
return err
}
}
}
return nil
}
// DeleteJob deletes a job and all of its dependencies
func (s *Server) DeleteJob(ctx context.Context, id uuid.UUID) error {
jobInfo, err := s.jobInfo(id, nil)
if err != nil {
return err
}
if jobInfo.JobStatus.Finished.IsZero() {
return fmt.Errorf("Cannot delete job before job is finished: %s", id)
}
return s.jobs.DeleteJob(ctx, id)
}
func (s *Server) OSBuildJobInfo(id uuid.UUID, result *OSBuildJobResult) (*JobInfo, error) {
jobInfo, err := s.jobInfo(id, result)
if err != nil {
return nil, err
}
if jobInfo.JobType != JobTypeOSBuild {
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeOSBuild, jobInfo.JobType)
}
if result.JobError == nil && !jobInfo.JobStatus.Finished.IsZero() {
if result.OSBuildOutput == nil {
result.JobError = clienterrors.New(clienterrors.ErrorBuildJob, "osbuild build failed", nil)
} else if len(result.OSBuildOutput.Error) > 0 {
result.JobError = clienterrors.New(clienterrors.ErrorOldResultCompatible, string(result.OSBuildOutput.Error), nil)
} else if len(result.TargetErrors()) > 0 {
result.JobError = clienterrors.New(clienterrors.ErrorTargetError, "at least one target failed", result.TargetErrors())
}
}
// For backwards compatibility: OSBuildJobResult didn't use to have a
// top-level `Success` flag. Override it here by looking into the job.
if !result.Success && result.OSBuildOutput != nil {
result.Success = result.OSBuildOutput.Success && result.JobError == nil
}
return jobInfo, nil
}
func (s *Server) KojiInitJobInfo(id uuid.UUID, result *KojiInitJobResult) (*JobInfo, error) {
jobInfo, err := s.jobInfo(id, result)
if err != nil {
return nil, err
}
if jobInfo.JobType != JobTypeKojiInit {
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeKojiInit, jobInfo.JobType)
}
if result.JobError == nil && result.KojiError != "" {
result.JobError = clienterrors.New(clienterrors.ErrorOldResultCompatible, result.KojiError, nil)
}
return jobInfo, nil
}
func (s *Server) KojiFinalizeJobInfo(id uuid.UUID, result *KojiFinalizeJobResult) (*JobInfo, error) {
jobInfo, err := s.jobInfo(id, result)
if err != nil {
return nil, err
}
if jobInfo.JobType != JobTypeKojiFinalize {
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeKojiFinalize, jobInfo.JobType)
}
if result.JobError == nil && result.KojiError != "" {
result.JobError = clienterrors.New(clienterrors.ErrorOldResultCompatible, result.KojiError, nil)
}
return jobInfo, nil
}
func (s *Server) DepsolveJobInfo(id uuid.UUID, result *DepsolveJobResult) (*JobInfo, error) {
jobInfo, err := s.jobInfo(id, result)
if err != nil {
return nil, err
}
if jobInfo.JobType != JobTypeDepsolve {
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeDepsolve, jobInfo.JobType)
}
return jobInfo, nil
}
// SearchPackagesJobInfo returns JobInfo for a Search job
// and populates the result with the SearchJobResult data
func (s *Server) SearchPackagesJobInfo(id uuid.UUID, result *SearchPackagesJobResult) (*JobInfo, error) {
jobInfo, err := s.jobInfo(id, result)
if err != nil {
return nil, err
}
if jobInfo.JobType != JobTypeSearchPackages {
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeSearchPackages, jobInfo.JobType)
}
return jobInfo, nil
}
func (s *Server) ManifestJobInfo(id uuid.UUID, result *ManifestJobByIDResult) (*JobInfo, error) {
jobInfo, err := s.jobInfo(id, result)
if err != nil {
return nil, err
}
if jobInfo.JobType != JobTypeManifestIDOnly {
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeManifestIDOnly, jobInfo.JobType)
}
return jobInfo, nil
}
func (s *Server) ContainerResolveJobInfo(id uuid.UUID, result *ContainerResolveJobResult) (*JobInfo, error) {
jobInfo, err := s.jobInfo(id, result)
if err != nil {
return nil, err
}
if jobInfo.JobType != JobTypeContainerResolve {
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeContainerResolve, jobInfo.JobType)
}
return jobInfo, nil
}
func (s *Server) FileResolveJobInfo(id uuid.UUID, result *FileResolveJobResult) (*JobInfo, error) {
jobInfo, err := s.jobInfo(id, result)
if err != nil {
return nil, err
}
if jobInfo.JobType != JobTypeFileResolve {
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeFileResolve, jobInfo.JobType)
}
return jobInfo, nil
}
func (s *Server) OSTreeResolveJobInfo(id uuid.UUID, result *OSTreeResolveJobResult) (*JobInfo, error) {
jobInfo, err := s.jobInfo(id, result)
if err != nil {
return nil, err
}
if jobInfo.JobType != JobTypeOSTreeResolve {
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeOSTreeResolve, jobInfo.JobType)
}
return jobInfo, nil
}
func (s *Server) AWSEC2CopyJobInfo(id uuid.UUID, result *AWSEC2CopyJobResult) (*JobInfo, error) {
jobInfo, err := s.jobInfo(id, result)
if err != nil {
return nil, err
}
if jobInfo.JobType != JobTypeAWSEC2Copy {
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeAWSEC2Copy, jobInfo.JobType)
}
return jobInfo, nil
}
func (s *Server) AWSEC2ShareJobInfo(id uuid.UUID, result *AWSEC2ShareJobResult) (*JobInfo, error) {
jobInfo, err := s.jobInfo(id, result)
if err != nil {
return nil, err
}
if jobInfo.JobType != JobTypeAWSEC2Share {
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeAWSEC2Share, jobInfo.JobType)
}
return jobInfo, nil
}
func (s *Server) jobInfo(id uuid.UUID, result interface{}) (*JobInfo, error) {
jobType, channel, rawResult, queued, started, finished, canceled, deps, dependents, err := s.jobs.JobStatus(id)
if err != nil {
return nil, err
}
if result != nil && !finished.IsZero() && !canceled {
err = json.Unmarshal(rawResult, result)
if err != nil {
return nil, fmt.Errorf("error unmarshaling result for job '%s': %v", id, err)
}
}
return &JobInfo{
JobType: strings.Split(jobType, ":")[0],
Channel: channel,
JobStatus: &JobStatus{
Queued: queued,
Started: started,
Finished: finished,
Canceled: canceled,
},
Deps: deps,
Dependents: dependents,
}, nil
}
// OSBuildJob returns the parameters of an OSBuildJob
func (s *Server) OSBuildJob(id uuid.UUID, job *OSBuildJob) error {
jobType, rawArgs, _, _, err := s.jobs.Job(id)
if err != nil {
return err
}
if !strings.HasPrefix(jobType, JobTypeOSBuild+":") { // Build jobs get automatic arch suffix: Check prefix
return fmt.Errorf("expected %s:*, found %q job instead for job '%s'", JobTypeOSBuild, jobType, id)
}
if err := json.Unmarshal(rawArgs, job); err != nil {
return fmt.Errorf("error unmarshaling arguments for job '%s': %v", id, err)
}
return nil
}
func (s *Server) JobChannel(id uuid.UUID) (string, error) {
_, _, _, channel, err := s.jobs.Job(id)
return channel, err
}
// JobType returns the type of the job
func (s *Server) JobType(id uuid.UUID) (string, error) {
jobType, _, _, _, err := s.jobs.Job(id)
// the architecture is internally encdode in the job type, but hide that
// from this API
return strings.Split(jobType, ":")[0], err
}
func (s *Server) Cancel(id uuid.UUID) error {
jobInfo, err := s.jobInfo(id, nil)
if err != nil {
logrus.Errorf("error getting job status: %v", err)
} else {
prometheus.CancelJobMetrics(jobInfo.JobStatus.Started, jobInfo.JobType, jobInfo.Channel)
}
return s.jobs.CancelJob(id)
}
// SetFailed sets the given job id to "failed" with the given error
func (s *Server) SetFailed(id uuid.UUID, error *clienterrors.Error) error {
FailedJobErrorResult := JobResult{
JobError: error,
}
res, err := json.Marshal(FailedJobErrorResult)
if err != nil {
logrus.Errorf("error marshalling the error: %v", err)
return nil
}
return s.jobs.FailJob(id, res)
}
// Return the ArtifactsDir path
func (s *Server) ArtifactsDir() string {
return s.config.ArtifactsDir
}
// Provides access to artifacts of a job. Returns an io.Reader for the artifact
// and the artifact's size.
func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error) {
if s.config.ArtifactsDir == "" {
return nil, 0, errors.New("Artifacts not enabled")
}
jobInfo, err := s.jobInfo(id, nil)
if err != nil {
return nil, 0, err
}
if jobInfo.JobStatus.Finished.IsZero() {
return nil, 0, fmt.Errorf("Cannot access artifacts before job is finished: %s", id)
}
p := path.Join(s.config.ArtifactsDir, id.String(), name)
f, err := os.Open(p)
if err != nil {
return nil, 0, fmt.Errorf("Error accessing artifact %s for job %s: %v", name, id, err)
}
info, err := f.Stat()
if err != nil {
return nil, 0, fmt.Errorf("Error getting size of artifact %s for job %s: %v", name, id, err)
}
return f, info.Size(), nil
}
func (s *Server) JobArtifactLocation(id uuid.UUID, name string) (string, error) {
if s.config.ArtifactsDir == "" {
return "", errors.New("Artifacts not enabled")
}
jobInfo, err := s.jobInfo(id, nil)
if err != nil {
return "", err
}
if jobInfo.JobStatus.Finished.IsZero() {
return "", fmt.Errorf("Cannot access artifacts before job is finished: %s", id)
}
p := path.Join(s.config.ArtifactsDir, id.String(), name)
if _, err := os.Stat(p); errors.Is(err, os.ErrNotExist) {
return p, fmt.Errorf("Artifact not found: %s", p)
}
return p, nil
}
// Deletes all artifacts for job `id`.
func (s *Server) DeleteArtifacts(id uuid.UUID) error {
if s.config.ArtifactsDir == "" {
return errors.New("Artifacts not enabled")
}
jobInfo, err := s.jobInfo(id, nil)
if err != nil {
return err
}
if jobInfo.JobStatus.Finished.IsZero() {
return fmt.Errorf("Cannot delete artifacts before job is finished: %s", id)
}
// Remove the ComposeRequest but ignore any errors
_ = os.Remove(path.Join(s.config.ArtifactsDir, "ComposeRequest", id.String()+".json"))
return os.RemoveAll(path.Join(s.config.ArtifactsDir, id.String()))
}
func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes, channels []string, workerID uuid.UUID) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) {
return s.requestJob(ctx, arch, jobTypes, uuid.Nil, channels, workerID)
}
func (s *Server) RequestJobById(ctx context.Context, arch string, requestedJobId uuid.UUID) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) {
return s.requestJob(ctx, arch, []string{}, requestedJobId, nil, uuid.Nil)
}
func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, requestedJobId uuid.UUID, channels []string, workerID uuid.UUID) (
jobId uuid.UUID, token uuid.UUID, jobType string, args json.RawMessage, dynamicArgs []json.RawMessage, err error) {
// treat osbuild jobs specially until we have found a generic way to
// specify dequeuing restrictions. For now, we only have one
// restriction: arch for osbuild jobs.
jts := []string{}
// Only set the label used for prometheus metrics when it's an osbuild job. Otherwise the
// dequeue metrics would set the label for all job types, while the finish metrics only set
// it for osbuild jobs.
var archPromLabel string
for _, t := range jobTypes {
if t == JobTypeOSBuild {
t = t + ":" + arch
archPromLabel = arch
}
if t == JobTypeManifestIDOnly {
return uuid.Nil, uuid.Nil, "", nil, nil, ErrInvalidJobType
}
jts = append(jts, t)
}
dequeueCtx := ctx
var cancel context.CancelFunc
if s.config.RequestJobTimeout != 0 {
dequeueCtx, cancel = context.WithTimeout(ctx, s.config.RequestJobTimeout)
defer cancel()
}
var depIDs []uuid.UUID
if requestedJobId != uuid.Nil {
jobId = requestedJobId
token, depIDs, jobType, args, err = s.jobs.DequeueByID(dequeueCtx, requestedJobId, workerID)
} else {
jobId, token, depIDs, jobType, args, err = s.jobs.Dequeue(dequeueCtx, workerID, jts, channels)
}
if err != nil {
if err != jobqueue.ErrDequeueTimeout && err != jobqueue.ErrNotPending {
logrus.Errorf("dequeuing job failed: %v", err)
}
return
}
jobInfo, err := s.jobInfo(jobId, nil)
if err != nil {
logrus.Errorf("error retrieving job status: %v", err)
}
// Record how long the job has been pending for, that is either how
// long it has been queued for, in case it has no dependencies, or
// how long it has been since all its dependencies finished, if it
// has any.
pending := jobInfo.JobStatus.Queued
jobType = jobInfo.JobType
for _, depID := range depIDs {
// TODO: include type of arguments
var result json.RawMessage
var finished time.Time
_, _, result, _, _, finished, _, _, _, err = s.jobs.JobStatus(depID)
if err != nil {
return
}
if finished.After(pending) {
pending = finished
}
dynamicArgs = append(dynamicArgs, result)
}
if s.config.ArtifactsDir != "" {
err = os.MkdirAll(path.Join(s.config.ArtifactsDir, "tmp", token.String()), 0700)
if err != nil {
return
}
}
prometheus.DequeueJobMetrics(pending, jobInfo.JobStatus.Started, jobInfo.JobType, jobInfo.Channel, archPromLabel)
return
}
func (s *Server) FinishJob(token uuid.UUID, result json.RawMessage) error {
return s.RequeueOrFinishJob(token, 0, result)
}
func (s *Server) RequeueOrFinishJob(token uuid.UUID, maxRetries uint64, result json.RawMessage) error {
jobId, err := s.jobs.IdFromToken(token)
if err != nil {
switch err {
case jobqueue.ErrNotExist:
return ErrInvalidToken
default:
return err
}
}
requeued, err := s.jobs.RequeueOrFinishJob(jobId, maxRetries, result)
if err != nil {
switch err {
case jobqueue.ErrNotRunning:
return ErrJobNotRunning
default:
return fmt.Errorf("error finishing job: %v", err)
}
}
if requeued {
jobInfo, err := s.jobInfo(jobId, nil)
if err != nil {
return fmt.Errorf("error requeueing job: %w", err)
}
prometheus.RequeueJobMetrics(jobInfo.JobType, jobInfo.Channel)
}
jobType, err := s.JobType(jobId)
if err != nil {
return err
}
var arch string
var jobInfo *JobInfo
var jobResult *JobResult
switch jobType {
case JobTypeOSBuild:
var osbuildJR OSBuildJobResult
jobInfo, err = s.OSBuildJobInfo(jobId, &osbuildJR)
if err != nil {
return err
}
arch = osbuildJR.Arch
jobResult = &osbuildJR.JobResult
case JobTypeDepsolve:
var depsolveJR DepsolveJobResult
jobInfo, err = s.DepsolveJobInfo(jobId, &depsolveJR)
if err != nil {
return err
}
jobResult = &depsolveJR.JobResult
case JobTypeSearchPackages:
var searchJR SearchPackagesJobResult
jobInfo, err = s.SearchPackagesJobInfo(jobId, &searchJR)
if err != nil {
return err
}
jobResult = &searchJR.JobResult
case JobTypeManifestIDOnly:
var manifestJR ManifestJobByIDResult
jobInfo, err = s.ManifestJobInfo(jobId, &manifestJR)
if err != nil {
return err
}
jobResult = &manifestJR.JobResult
case JobTypeKojiInit:
var kojiInitJR KojiInitJobResult
jobInfo, err = s.KojiInitJobInfo(jobId, &kojiInitJR)
if err != nil {
return err
}
jobResult = &kojiInitJR.JobResult
case JobTypeKojiFinalize:
var kojiFinalizeJR KojiFinalizeJobResult
jobInfo, err = s.KojiFinalizeJobInfo(jobId, &kojiFinalizeJR)
if err != nil {
return err
}
jobResult = &kojiFinalizeJR.JobResult
case JobTypeAWSEC2Copy:
var awsEC2CopyJR AWSEC2CopyJobResult
jobInfo, err = s.AWSEC2CopyJobInfo(jobId, &awsEC2CopyJR)
if err != nil {
return err
}
jobResult = &awsEC2CopyJR.JobResult
case JobTypeAWSEC2Share:
var awsEC2ShareJR AWSEC2ShareJobResult
jobInfo, err = s.AWSEC2ShareJobInfo(jobId, &awsEC2ShareJR)
if err != nil {
return err
}
jobResult = &awsEC2ShareJR.JobResult
case JobTypeContainerResolve:
var containerResolveJR ContainerResolveJobResult
jobInfo, err = s.ContainerResolveJobInfo(jobId, &containerResolveJR)
if err != nil {
return err
}
jobResult = &containerResolveJR.JobResult
case JobTypeFileResolve:
var fileResolveJR FileResolveJobResult
jobInfo, err = s.FileResolveJobInfo(jobId, &fileResolveJR)
if err != nil {
return err
}
jobResult = &fileResolveJR.JobResult
case JobTypeOSTreeResolve:
var ostreeResolveJR OSTreeResolveJobResult
jobInfo, err = s.OSTreeResolveJobInfo(jobId, &ostreeResolveJR)
if err != nil {
return err
}
jobResult = &ostreeResolveJR.JobResult
default:
return fmt.Errorf("unexpected job type: %s", jobType)
}
statusCode := clienterrors.GetStatusCode(jobResult.JobError)
prometheus.FinishJobMetrics(jobInfo.JobStatus.Started, jobInfo.JobStatus.Finished, jobInfo.JobStatus.Canceled, jobType, jobInfo.Channel, arch, statusCode)
// Move artifacts from the temporary location to the final job
// location. Log any errors, but do not treat them as fatal. The job is
// already finished.
if s.config.ArtifactsDir != "" {
err := os.Rename(path.Join(s.config.ArtifactsDir, "tmp", token.String()), path.Join(s.config.ArtifactsDir, jobId.String()))
if err != nil {
logrus.Errorf("Error moving artifacts for job %s: %v", jobId, err)
}
}
return nil
}
func (s *Server) RegisterWorker(c, a string) (uuid.UUID, error) {
workerID, err := s.jobs.InsertWorker(c, a)
if err != nil {
return uuid.Nil, err
}
logrus.Infof("Worker (%v) registered", a)
return workerID, nil
}
func (s *Server) WorkerAvailableForArch(a string) (bool, error) {
workers, err := s.jobs.Workers(0)
if err != nil {
return false, err
}
for _, w := range workers {
if a == w.Arch {
return true, nil
}
}
return false, nil
}
// apiHandlers implements api.ServerInterface - the http api route handlers
// generated from api/openapi.yml. This is a separate object, because these
// handlers should not be exposed on the `Server` object.
type apiHandlers struct {
server *Server
}
func (h *apiHandlers) GetOpenapi(ctx echo.Context) error {
spec, err := api.GetSwagger()
if err != nil {
return api.HTTPErrorWithInternal(api.ErrorFailedLoadingOpenAPISpec, err)
}
return ctx.JSON(http.StatusOK, spec)
}
func (h *apiHandlers) GetStatus(ctx echo.Context) error {
return ctx.JSON(http.StatusOK, &api.StatusResponse{
Href: fmt.Sprintf("%s/status", api.BasePath),
Id: "status",
Kind: "Status",
Status: "OK",
})
}
func (h *apiHandlers) GetError(ctx echo.Context, id string) error {
errorId, err := strconv.Atoi(id)
if err != nil {
return api.HTTPErrorWithInternal(api.ErrorInvalidErrorId, err)
}
apiError := api.APIError(api.ServiceErrorCode(errorId), nil, ctx)
// If the service error wasn't found, it's a 404 in this instance
if apiError.Id == fmt.Sprintf("%d", api.ErrorServiceErrorNotFound) {
return api.HTTPError(api.ErrorErrorNotFound)
}
return ctx.JSON(http.StatusOK, apiError)
}
func (h *apiHandlers) RequestJob(ctx echo.Context) error {
var body api.RequestJobJSONRequestBody
err := ctx.Bind(&body)
if err != nil {
return err
}
// channel is empty if JWT is not enabled
var channel string
if h.server.config.JWTEnabled {
tenant, err := auth.GetFromClaims(ctx.Request().Context(), h.server.config.TenantProviderFields)
if err != nil {
return api.HTTPErrorWithInternal(api.ErrorTenantNotFound, err)
}
// prefix the tenant to prevent collisions if support for specifying channels in a request is ever added
channel = "org-" + tenant
}
workerID := uuid.Nil
if body.WorkerId != nil {
workerID, err = uuid.Parse(*body.WorkerId)
if err != nil {
return api.HTTPErrorWithInternal(api.ErrorMalformedWorkerId, err)
}
}
jobId, jobToken, jobType, jobArgs, dynamicJobArgs, err := h.server.RequestJob(ctx.Request().Context(), body.Arch, body.Types, []string{channel}, workerID)
if err != nil {
if err == jobqueue.ErrDequeueTimeout {
return ctx.JSON(http.StatusNoContent, api.ObjectReference{
Href: fmt.Sprintf("%s/jobs", api.BasePath),
Id: uuid.Nil.String(),
Kind: "RequestJob",
})
}
if err == ErrInvalidJobType {
return api.HTTPError(api.ErrorInvalidJobType)
}
return api.HTTPErrorWithInternal(api.ErrorRequestingJob, err)
}
var respArgs *json.RawMessage
if len(jobArgs) != 0 {
respArgs = &jobArgs
}
var respDynArgs *[]json.RawMessage
if len(dynamicJobArgs) != 0 {
respDynArgs = &dynamicJobArgs
}
response := api.RequestJobResponse{
Href: fmt.Sprintf("%s/jobs", api.BasePath),
Id: jobId.String(),
Kind: "RequestJob",
Location: fmt.Sprintf("%s/jobs/%v", api.BasePath, jobToken),
ArtifactLocation: fmt.Sprintf("%s/jobs/%v/artifacts/", api.BasePath, jobToken),
Type: jobType,
Args: respArgs,
DynamicArgs: respDynArgs,
}
return ctx.JSON(http.StatusCreated, response)
}
func (h *apiHandlers) GetJob(ctx echo.Context, tokenstr string) error {
token, err := uuid.Parse(tokenstr)
if err != nil {
return api.HTTPErrorWithInternal(api.ErrorMalformedJobToken, err)
}
jobId, err := h.server.jobs.IdFromToken(token)
if err != nil {
switch err {
case jobqueue.ErrNotExist:
return api.HTTPError(api.ErrorJobNotFound)
default:
return api.HTTPErrorWithInternal(api.ErrorResolvingJobId, err)
}
}
if jobId == uuid.Nil {
return ctx.JSON(http.StatusOK, api.GetJobResponse{
Href: fmt.Sprintf("%s/jobs/%v", api.BasePath, token),
Id: token.String(),
Kind: "JobStatus",
Canceled: false,
})
}
h.server.jobs.RefreshHeartbeat(token)
jobInfo, err := h.server.jobInfo(jobId, nil)
if err != nil {
return api.HTTPErrorWithInternal(api.ErrorRetrievingJobStatus, err)
}
return ctx.JSON(http.StatusOK, api.GetJobResponse{
Href: fmt.Sprintf("%s/jobs/%v", api.BasePath, token),
Id: token.String(),
Kind: "JobStatus",
Canceled: jobInfo.JobStatus.Canceled,
})
}
func (h *apiHandlers) UpdateJob(ctx echo.Context, idstr string) error {
token, err := uuid.Parse(idstr)
if err != nil {
return api.HTTPErrorWithInternal(api.ErrorMalformedJobId, err)
}
var body api.UpdateJobRequest
err = ctx.Bind(&body)
if err != nil {
return err
}
err = h.server.FinishJob(token, body.Result)
if err != nil {
switch err {
case ErrInvalidToken:
return api.HTTPError(api.ErrorJobNotFound)
case ErrJobNotRunning:
return api.HTTPError(api.ErrorJobNotRunning)
default:
return api.HTTPErrorWithInternal(api.ErrorFinishingJob, err)
}
}
return ctx.JSON(http.StatusOK, api.UpdateJobResponse{
Href: fmt.Sprintf("%s/jobs/%v", api.BasePath, token),
Id: token.String(),
Kind: "UpdateJobResponse",
})
}
func (h *apiHandlers) UploadJobArtifact(ctx echo.Context, tokenstr string, name string) error {
token, err := uuid.Parse(tokenstr)
if err != nil {
return api.HTTPErrorWithInternal(api.ErrorMalformedJobId, err)
}
request := ctx.Request()
if h.server.config.ArtifactsDir == "" {
// indicate to the worker that the server is not accepting any artifacts
return ctx.NoContent(http.StatusBadRequest)
}
f, err := os.Create(path.Join(h.server.config.ArtifactsDir, "tmp", token.String(), name))
if err != nil {
return api.HTTPErrorWithInternal(api.ErrorDiscardingArtifact, err)
}
_, err = io.Copy(f, request.Body)
if err != nil {
return api.HTTPErrorWithInternal(api.ErrorWritingArtifact, err)
}
return ctx.NoContent(http.StatusOK)
}
func (h *apiHandlers) PostWorkers(ctx echo.Context) error {
var body api.PostWorkersRequest
err := ctx.Bind(&body)
if err != nil {
return err
}
var channel string
if h.server.config.JWTEnabled {
tenant, err := auth.GetFromClaims(ctx.Request().Context(), h.server.config.TenantProviderFields)
if err != nil {
return api.HTTPErrorWithInternal(api.ErrorTenantNotFound, err)
}
// prefix the tenant to prevent collisions if support for specifying channels in a request is ever added
channel = "org-" + tenant
}
workerID, err := h.server.RegisterWorker(channel, body.Arch)
if err != nil {
return api.HTTPErrorWithInternal(api.ErrorInsertingWorker, err)
}
return ctx.JSON(http.StatusCreated, api.PostWorkersResponse{
Href: fmt.Sprintf("%s/workers", api.BasePath),
Id: workerID.String(),
Kind: "WorkerID",
WorkerId: workerID,
})
}
func (h *apiHandlers) PostWorkerStatus(ctx echo.Context, workerID uuid.UUID) error {
err := h.server.jobs.UpdateWorkerStatus(workerID)
if err == jobqueue.ErrWorkerNotExist {
return api.HTTPErrorWithInternal(api.ErrorWorkerIdNotFound, err)
}
if err != nil {
return api.HTTPErrorWithInternal(api.ErrorUpdatingWorkerStatus, err)
}
return ctx.NoContent(http.StatusOK)
}
// A simple echo.Binder(), which only accepts application/json, but is more
// strict than echo's DefaultBinder. It does not handle binding query
// parameters either.
type binder struct{}
func (b binder) Bind(i interface{}, ctx echo.Context) error {
request := ctx.Request()
contentType := request.Header["Content-Type"]
if len(contentType) != 1 || contentType[0] != "application/json" {
return api.HTTPError(api.ErrorUnsupportedMediaType)
}
err := json.NewDecoder(request.Body).Decode(i)
if err != nil {
return api.HTTPErrorWithInternal(api.ErrorBodyDecodingError, err)
}
return nil
}