The original compose request contains useful details that are not preserved when it is converted to a manifest. Things like the distribution, arch, image type, blueprint or customizations are useful when examining builds later. This saves the original request json using the job id and a new directory (ComposeRequest) under the artifacts directory. The original request, if present, is then added to the compose/<id>/metadata response alongside the package list. Related: RHEL-60120
1218 lines
34 KiB
Go
1218 lines
34 KiB
Go
package worker
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"path"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/labstack/echo/v4"
|
|
"github.com/labstack/echo/v4/middleware"
|
|
"github.com/sirupsen/logrus"
|
|
|
|
"github.com/osbuild/osbuild-composer/pkg/jobqueue"
|
|
|
|
"github.com/osbuild/osbuild-composer/internal/auth"
|
|
"github.com/osbuild/osbuild-composer/internal/common"
|
|
"github.com/osbuild/osbuild-composer/internal/prometheus"
|
|
"github.com/osbuild/osbuild-composer/internal/worker/api"
|
|
"github.com/osbuild/osbuild-composer/internal/worker/clienterrors"
|
|
)
|
|
|
|
const (
|
|
JobTypeOSBuild string = "osbuild"
|
|
JobTypeKojiInit string = "koji-init"
|
|
JobTypeKojiFinalize string = "koji-finalize"
|
|
JobTypeDepsolve string = "depsolve"
|
|
JobTypeSearchPackages string = "search-packages"
|
|
JobTypeManifestIDOnly string = "manifest-id-only"
|
|
JobTypeContainerResolve string = "container-resolve"
|
|
JobTypeFileResolve string = "file-resolve"
|
|
JobTypeOSTreeResolve string = "ostree-resolve"
|
|
JobTypeAWSEC2Copy string = "aws-ec2-copy"
|
|
JobTypeAWSEC2Share string = "aws-ec2-share"
|
|
)
|
|
|
|
type Server struct {
|
|
jobs jobqueue.JobQueue
|
|
logger *log.Logger
|
|
config Config
|
|
}
|
|
|
|
type JobStatus struct {
|
|
Queued time.Time
|
|
Started time.Time
|
|
Finished time.Time
|
|
Canceled bool
|
|
}
|
|
|
|
type JobInfo struct {
|
|
JobType string
|
|
Channel string
|
|
JobStatus *JobStatus
|
|
Deps []uuid.UUID
|
|
Dependents []uuid.UUID
|
|
}
|
|
|
|
var ErrInvalidToken = errors.New("token does not exist")
|
|
var ErrJobNotRunning = errors.New("job isn't running")
|
|
var ErrInvalidJobType = errors.New("job has invalid type")
|
|
|
|
type Config struct {
|
|
ArtifactsDir string
|
|
RequestJobTimeout time.Duration
|
|
BasePath string
|
|
JWTEnabled bool
|
|
TenantProviderFields []string
|
|
JobTimeout time.Duration
|
|
JobWatchFreq time.Duration
|
|
WorkerTimeout time.Duration
|
|
WorkerWatchFreq time.Duration
|
|
}
|
|
|
|
func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, config Config) *Server {
|
|
s := &Server{
|
|
jobs: jobs,
|
|
logger: logger,
|
|
config: config,
|
|
}
|
|
|
|
if s.config.JobTimeout == 0 {
|
|
s.config.JobTimeout = time.Second * 120
|
|
}
|
|
if s.config.JobWatchFreq == 0 {
|
|
s.config.JobWatchFreq = time.Second * 30
|
|
}
|
|
if s.config.WorkerTimeout == 0 {
|
|
s.config.WorkerTimeout = time.Hour
|
|
}
|
|
if s.config.WorkerWatchFreq == 0 {
|
|
s.config.WorkerWatchFreq = time.Second * 300
|
|
}
|
|
|
|
api.BasePath = config.BasePath
|
|
|
|
go s.WatchHeartbeats()
|
|
go s.WatchWorkers()
|
|
return s
|
|
}
|
|
|
|
func (s *Server) Handler() http.Handler {
|
|
e := echo.New()
|
|
e.Binder = binder{}
|
|
e.Logger = common.Logger()
|
|
|
|
// log errors returned from handlers
|
|
e.HTTPErrorHandler = api.HTTPErrorHandler
|
|
e.Use(middleware.Recover())
|
|
e.Pre(common.OperationIDMiddleware)
|
|
handler := apiHandlers{
|
|
server: s,
|
|
}
|
|
|
|
mws := []echo.MiddlewareFunc{
|
|
prometheus.StatusMiddleware(prometheus.WorkerSubsystem),
|
|
}
|
|
if s.config.JWTEnabled {
|
|
mws = append(mws, auth.TenantChannelMiddleware(s.config.TenantProviderFields, api.HTTPError(api.ErrorTenantNotFound)))
|
|
}
|
|
mws = append(mws, prometheus.HTTPDurationMiddleware(prometheus.WorkerSubsystem))
|
|
api.RegisterHandlers(e.Group(api.BasePath, mws...), &handler)
|
|
|
|
return e
|
|
}
|
|
|
|
const maxHeartbeatRetries = 2
|
|
|
|
// This function should be started as a goroutine
|
|
|
|
// With default durations it goes through all running jobs every 30 seconds and fails any unresponsive
|
|
// ones. Unresponsive jobs haven't checked whether or not they're cancelled in the past 2 minutes.
|
|
func (s *Server) WatchHeartbeats() {
|
|
//nolint:staticcheck // avoid SA1015, this is an endless function
|
|
for range time.Tick(s.config.JobWatchFreq) {
|
|
for _, token := range s.jobs.Heartbeats(s.config.JobTimeout) {
|
|
id, _ := s.jobs.IdFromToken(token)
|
|
logrus.Infof("Removing unresponsive job: %s\n", id)
|
|
|
|
missingHeartbeatResult := JobResult{
|
|
JobError: clienterrors.New(clienterrors.ErrorJobMissingHeartbeat,
|
|
fmt.Sprintf("Workers running this job stopped responding more than %d times.", maxHeartbeatRetries),
|
|
nil),
|
|
}
|
|
|
|
resJson, err := json.Marshal(missingHeartbeatResult)
|
|
if err != nil {
|
|
logrus.Panicf("Cannot marshal the heartbeat error: %v", err)
|
|
}
|
|
|
|
err = s.RequeueOrFinishJob(token, maxHeartbeatRetries, resJson)
|
|
if err != nil {
|
|
logrus.Errorf("Error requeueing or finishing unresponsive job: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// This function should be started as a goroutine
|
|
// Every 5 minutes it goes through all workers, removing any unresponsive ones.
|
|
func (s *Server) WatchWorkers() {
|
|
//nolint:staticcheck // avoid SA1015, this is an endless function
|
|
for range time.Tick(s.config.WorkerWatchFreq) {
|
|
workers, err := s.jobs.Workers(s.config.WorkerTimeout)
|
|
if err != nil {
|
|
logrus.Warningf("Unable to query workers: %v", err)
|
|
continue
|
|
}
|
|
for _, w := range workers {
|
|
logrus.Infof("Removing inactive worker: %s", w.ID)
|
|
err = s.jobs.DeleteWorker(w.ID)
|
|
if err != nil {
|
|
logrus.Warningf("Unable to remove worker: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) EnqueueOSBuild(arch string, job *OSBuildJob, channel string) (uuid.UUID, error) {
|
|
return s.enqueue(JobTypeOSBuild+":"+arch, job, nil, channel)
|
|
}
|
|
|
|
func (s *Server) EnqueueOSBuildAsDependency(arch string, job *OSBuildJob, dependencies []uuid.UUID, channel string) (uuid.UUID, error) {
|
|
return s.enqueue(JobTypeOSBuild+":"+arch, job, dependencies, channel)
|
|
}
|
|
|
|
func (s *Server) EnqueueKojiInit(job *KojiInitJob, channel string) (uuid.UUID, error) {
|
|
return s.enqueue(JobTypeKojiInit, job, nil, channel)
|
|
}
|
|
|
|
func (s *Server) EnqueueKojiFinalize(job *KojiFinalizeJob, initID uuid.UUID, buildIDs []uuid.UUID, channel string) (uuid.UUID, error) {
|
|
return s.enqueue(JobTypeKojiFinalize, job, append([]uuid.UUID{initID}, buildIDs...), channel)
|
|
}
|
|
|
|
func (s *Server) EnqueueDepsolve(job *DepsolveJob, channel string) (uuid.UUID, error) {
|
|
return s.enqueue(JobTypeDepsolve, job, nil, channel)
|
|
}
|
|
|
|
func (s *Server) EnqueueSearchPackages(job *SearchPackagesJob, channel string) (uuid.UUID, error) {
|
|
return s.enqueue(JobTypeSearchPackages, job, nil, channel)
|
|
}
|
|
|
|
func (s *Server) EnqueueManifestJobByID(job *ManifestJobByID, dependencies []uuid.UUID, channel string) (uuid.UUID, error) {
|
|
if len(dependencies) == 0 {
|
|
panic("EnqueueManifestJobByID has no dependencies, expected at least a depsolve job")
|
|
}
|
|
return s.enqueue(JobTypeManifestIDOnly, job, dependencies, channel)
|
|
}
|
|
|
|
func (s *Server) EnqueueContainerResolveJob(job *ContainerResolveJob, channel string) (uuid.UUID, error) {
|
|
return s.enqueue(JobTypeContainerResolve, job, nil, channel)
|
|
}
|
|
|
|
func (s *Server) EnqueueFileResolveJob(job *FileResolveJob, channel string) (uuid.UUID, error) {
|
|
return s.enqueue(JobTypeFileResolve, job, nil, channel)
|
|
}
|
|
|
|
func (s *Server) EnqueueOSTreeResolveJob(job *OSTreeResolveJob, channel string) (uuid.UUID, error) {
|
|
return s.enqueue(JobTypeOSTreeResolve, job, nil, channel)
|
|
}
|
|
|
|
func (s *Server) EnqueueAWSEC2CopyJob(job *AWSEC2CopyJob, parent uuid.UUID, channel string) (uuid.UUID, error) {
|
|
return s.enqueue(JobTypeAWSEC2Copy, job, []uuid.UUID{parent}, channel)
|
|
}
|
|
|
|
func (s *Server) EnqueueAWSEC2ShareJob(job *AWSEC2ShareJob, parent uuid.UUID, channel string) (uuid.UUID, error) {
|
|
return s.enqueue(JobTypeAWSEC2Share, job, []uuid.UUID{parent}, channel)
|
|
}
|
|
|
|
func (s *Server) enqueue(jobType string, job interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error) {
|
|
prometheus.EnqueueJobMetrics(strings.Split(jobType, ":")[0], channel)
|
|
return s.jobs.Enqueue(jobType, job, dependencies, channel)
|
|
}
|
|
|
|
// DependencyChainErrors recursively gathers all errors from job's dependencies,
|
|
// which caused it to fail. If the job didn't fail, `nil` is returned.
|
|
func (s *Server) JobDependencyChainErrors(id uuid.UUID) (*clienterrors.Error, error) {
|
|
jobType, err := s.JobType(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var jobResult *JobResult
|
|
var jobInfo *JobInfo
|
|
switch jobType {
|
|
case JobTypeOSBuild:
|
|
var osbuildJR OSBuildJobResult
|
|
jobInfo, err = s.OSBuildJobInfo(id, &osbuildJR)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
jobResult = &osbuildJR.JobResult
|
|
|
|
case JobTypeDepsolve:
|
|
var depsolveJR DepsolveJobResult
|
|
jobInfo, err = s.DepsolveJobInfo(id, &depsolveJR)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
jobResult = &depsolveJR.JobResult
|
|
|
|
case JobTypeSearchPackages:
|
|
var searchJR SearchPackagesJobResult
|
|
jobInfo, err = s.SearchPackagesJobInfo(id, &searchJR)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
jobResult = &searchJR.JobResult
|
|
|
|
case JobTypeManifestIDOnly:
|
|
var manifestJR ManifestJobByIDResult
|
|
jobInfo, err = s.ManifestJobInfo(id, &manifestJR)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
jobResult = &manifestJR.JobResult
|
|
|
|
case JobTypeKojiInit:
|
|
var kojiInitJR KojiInitJobResult
|
|
jobInfo, err = s.KojiInitJobInfo(id, &kojiInitJR)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
jobResult = &kojiInitJR.JobResult
|
|
|
|
case JobTypeKojiFinalize:
|
|
var kojiFinalizeJR KojiFinalizeJobResult
|
|
jobInfo, err = s.KojiFinalizeJobInfo(id, &kojiFinalizeJR)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
jobResult = &kojiFinalizeJR.JobResult
|
|
|
|
case JobTypeContainerResolve:
|
|
var containerResolveJR ContainerResolveJobResult
|
|
jobInfo, err = s.ContainerResolveJobInfo(id, &containerResolveJR)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
jobResult = &containerResolveJR.JobResult
|
|
case JobTypeFileResolve:
|
|
var fileResolveJR FileResolveJobResult
|
|
jobInfo, err = s.FileResolveJobInfo(id, &fileResolveJR)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
jobResult = &fileResolveJR.JobResult
|
|
case JobTypeOSTreeResolve:
|
|
var ostreeResolveJR OSTreeResolveJobResult
|
|
jobInfo, err = s.OSTreeResolveJobInfo(id, &ostreeResolveJR)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
jobResult = &ostreeResolveJR.JobResult
|
|
|
|
default:
|
|
return nil, fmt.Errorf("unexpected job type: %s", jobType)
|
|
}
|
|
|
|
if jobError := jobResult.JobError; jobError != nil {
|
|
depErrors := []*clienterrors.Error{}
|
|
if jobError.IsDependencyError() {
|
|
// check job's dependencies
|
|
for _, dep := range jobInfo.Deps {
|
|
depError, err := s.JobDependencyChainErrors(dep)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if depError != nil {
|
|
depErrors = append(depErrors, depError)
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(depErrors) > 0 {
|
|
jobError.Details = depErrors
|
|
}
|
|
return jobError, nil
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
// AllRootJobIDs returns a list of top level job UUIDs that the worker knows about
|
|
func (s *Server) AllRootJobIDs() ([]uuid.UUID, error) {
|
|
return s.jobs.AllRootJobIDs()
|
|
}
|
|
|
|
func (s *Server) OSBuildJobInfo(id uuid.UUID, result *OSBuildJobResult) (*JobInfo, error) {
|
|
jobInfo, err := s.jobInfo(id, result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if jobInfo.JobType != JobTypeOSBuild {
|
|
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeOSBuild, jobInfo.JobType)
|
|
}
|
|
|
|
if result.JobError == nil && !jobInfo.JobStatus.Finished.IsZero() {
|
|
if result.OSBuildOutput == nil {
|
|
result.JobError = clienterrors.New(clienterrors.ErrorBuildJob, "osbuild build failed", nil)
|
|
} else if len(result.OSBuildOutput.Error) > 0 {
|
|
result.JobError = clienterrors.New(clienterrors.ErrorOldResultCompatible, string(result.OSBuildOutput.Error), nil)
|
|
} else if len(result.TargetErrors()) > 0 {
|
|
result.JobError = clienterrors.New(clienterrors.ErrorTargetError, "at least one target failed", result.TargetErrors())
|
|
}
|
|
}
|
|
// For backwards compatibility: OSBuildJobResult didn't use to have a
|
|
// top-level `Success` flag. Override it here by looking into the job.
|
|
if !result.Success && result.OSBuildOutput != nil {
|
|
result.Success = result.OSBuildOutput.Success && result.JobError == nil
|
|
}
|
|
|
|
return jobInfo, nil
|
|
}
|
|
|
|
func (s *Server) KojiInitJobInfo(id uuid.UUID, result *KojiInitJobResult) (*JobInfo, error) {
|
|
jobInfo, err := s.jobInfo(id, result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if jobInfo.JobType != JobTypeKojiInit {
|
|
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeKojiInit, jobInfo.JobType)
|
|
}
|
|
|
|
if result.JobError == nil && result.KojiError != "" {
|
|
result.JobError = clienterrors.New(clienterrors.ErrorOldResultCompatible, result.KojiError, nil)
|
|
}
|
|
|
|
return jobInfo, nil
|
|
}
|
|
|
|
func (s *Server) KojiFinalizeJobInfo(id uuid.UUID, result *KojiFinalizeJobResult) (*JobInfo, error) {
|
|
jobInfo, err := s.jobInfo(id, result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if jobInfo.JobType != JobTypeKojiFinalize {
|
|
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeKojiFinalize, jobInfo.JobType)
|
|
}
|
|
|
|
if result.JobError == nil && result.KojiError != "" {
|
|
result.JobError = clienterrors.New(clienterrors.ErrorOldResultCompatible, result.KojiError, nil)
|
|
}
|
|
|
|
return jobInfo, nil
|
|
}
|
|
|
|
func (s *Server) DepsolveJobInfo(id uuid.UUID, result *DepsolveJobResult) (*JobInfo, error) {
|
|
jobInfo, err := s.jobInfo(id, result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if jobInfo.JobType != JobTypeDepsolve {
|
|
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeDepsolve, jobInfo.JobType)
|
|
}
|
|
|
|
return jobInfo, nil
|
|
}
|
|
|
|
// SearchPackagesJobInfo returns JobInfo for a Search job
|
|
// and populates the result with the SearchJobResult data
|
|
func (s *Server) SearchPackagesJobInfo(id uuid.UUID, result *SearchPackagesJobResult) (*JobInfo, error) {
|
|
jobInfo, err := s.jobInfo(id, result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if jobInfo.JobType != JobTypeSearchPackages {
|
|
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeSearchPackages, jobInfo.JobType)
|
|
}
|
|
|
|
return jobInfo, nil
|
|
}
|
|
|
|
func (s *Server) ManifestJobInfo(id uuid.UUID, result *ManifestJobByIDResult) (*JobInfo, error) {
|
|
jobInfo, err := s.jobInfo(id, result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if jobInfo.JobType != JobTypeManifestIDOnly {
|
|
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeManifestIDOnly, jobInfo.JobType)
|
|
}
|
|
|
|
return jobInfo, nil
|
|
}
|
|
|
|
func (s *Server) ContainerResolveJobInfo(id uuid.UUID, result *ContainerResolveJobResult) (*JobInfo, error) {
|
|
jobInfo, err := s.jobInfo(id, result)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if jobInfo.JobType != JobTypeContainerResolve {
|
|
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeContainerResolve, jobInfo.JobType)
|
|
}
|
|
|
|
return jobInfo, nil
|
|
}
|
|
|
|
func (s *Server) FileResolveJobInfo(id uuid.UUID, result *FileResolveJobResult) (*JobInfo, error) {
|
|
jobInfo, err := s.jobInfo(id, result)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if jobInfo.JobType != JobTypeFileResolve {
|
|
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeFileResolve, jobInfo.JobType)
|
|
}
|
|
|
|
return jobInfo, nil
|
|
}
|
|
|
|
func (s *Server) OSTreeResolveJobInfo(id uuid.UUID, result *OSTreeResolveJobResult) (*JobInfo, error) {
|
|
jobInfo, err := s.jobInfo(id, result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if jobInfo.JobType != JobTypeOSTreeResolve {
|
|
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeOSTreeResolve, jobInfo.JobType)
|
|
}
|
|
|
|
return jobInfo, nil
|
|
}
|
|
|
|
func (s *Server) AWSEC2CopyJobInfo(id uuid.UUID, result *AWSEC2CopyJobResult) (*JobInfo, error) {
|
|
jobInfo, err := s.jobInfo(id, result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if jobInfo.JobType != JobTypeAWSEC2Copy {
|
|
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeAWSEC2Copy, jobInfo.JobType)
|
|
}
|
|
|
|
return jobInfo, nil
|
|
}
|
|
|
|
func (s *Server) AWSEC2ShareJobInfo(id uuid.UUID, result *AWSEC2ShareJobResult) (*JobInfo, error) {
|
|
jobInfo, err := s.jobInfo(id, result)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if jobInfo.JobType != JobTypeAWSEC2Share {
|
|
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeAWSEC2Share, jobInfo.JobType)
|
|
}
|
|
|
|
return jobInfo, nil
|
|
}
|
|
|
|
func (s *Server) jobInfo(id uuid.UUID, result interface{}) (*JobInfo, error) {
|
|
jobType, channel, rawResult, queued, started, finished, canceled, deps, dependents, err := s.jobs.JobStatus(id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if result != nil && !finished.IsZero() && !canceled {
|
|
err = json.Unmarshal(rawResult, result)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error unmarshaling result for job '%s': %v", id, err)
|
|
}
|
|
}
|
|
|
|
return &JobInfo{
|
|
JobType: strings.Split(jobType, ":")[0],
|
|
Channel: channel,
|
|
JobStatus: &JobStatus{
|
|
Queued: queued,
|
|
Started: started,
|
|
Finished: finished,
|
|
Canceled: canceled,
|
|
},
|
|
Deps: deps,
|
|
Dependents: dependents,
|
|
}, nil
|
|
}
|
|
|
|
// OSBuildJob returns the parameters of an OSBuildJob
|
|
func (s *Server) OSBuildJob(id uuid.UUID, job *OSBuildJob) error {
|
|
jobType, rawArgs, _, _, err := s.jobs.Job(id)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !strings.HasPrefix(jobType, JobTypeOSBuild+":") { // Build jobs get automatic arch suffix: Check prefix
|
|
return fmt.Errorf("expected %s:*, found %q job instead for job '%s'", JobTypeOSBuild, jobType, id)
|
|
}
|
|
|
|
if err := json.Unmarshal(rawArgs, job); err != nil {
|
|
return fmt.Errorf("error unmarshaling arguments for job '%s': %v", id, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) JobChannel(id uuid.UUID) (string, error) {
|
|
_, _, _, channel, err := s.jobs.Job(id)
|
|
return channel, err
|
|
}
|
|
|
|
// JobType returns the type of the job
|
|
func (s *Server) JobType(id uuid.UUID) (string, error) {
|
|
jobType, _, _, _, err := s.jobs.Job(id)
|
|
// the architecture is internally encdode in the job type, but hide that
|
|
// from this API
|
|
return strings.Split(jobType, ":")[0], err
|
|
}
|
|
|
|
func (s *Server) Cancel(id uuid.UUID) error {
|
|
jobInfo, err := s.jobInfo(id, nil)
|
|
if err != nil {
|
|
logrus.Errorf("error getting job status: %v", err)
|
|
} else {
|
|
prometheus.CancelJobMetrics(jobInfo.JobStatus.Started, jobInfo.JobType, jobInfo.Channel)
|
|
}
|
|
return s.jobs.CancelJob(id)
|
|
}
|
|
|
|
// SetFailed sets the given job id to "failed" with the given error
|
|
func (s *Server) SetFailed(id uuid.UUID, error *clienterrors.Error) error {
|
|
FailedJobErrorResult := JobResult{
|
|
JobError: error,
|
|
}
|
|
|
|
res, err := json.Marshal(FailedJobErrorResult)
|
|
if err != nil {
|
|
logrus.Errorf("error marshalling the error: %v", err)
|
|
return nil
|
|
}
|
|
return s.jobs.FailJob(id, res)
|
|
}
|
|
|
|
// Return the ArtifactsDir path
|
|
func (s *Server) ArtifactsDir() string {
|
|
return s.config.ArtifactsDir
|
|
}
|
|
|
|
// Provides access to artifacts of a job. Returns an io.Reader for the artifact
|
|
// and the artifact's size.
|
|
func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error) {
|
|
if s.config.ArtifactsDir == "" {
|
|
return nil, 0, errors.New("Artifacts not enabled")
|
|
}
|
|
|
|
jobInfo, err := s.jobInfo(id, nil)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
if jobInfo.JobStatus.Finished.IsZero() {
|
|
return nil, 0, fmt.Errorf("Cannot access artifacts before job is finished: %s", id)
|
|
}
|
|
|
|
p := path.Join(s.config.ArtifactsDir, id.String(), name)
|
|
f, err := os.Open(p)
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("Error accessing artifact %s for job %s: %v", name, id, err)
|
|
}
|
|
|
|
info, err := f.Stat()
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("Error getting size of artifact %s for job %s: %v", name, id, err)
|
|
}
|
|
|
|
return f, info.Size(), nil
|
|
}
|
|
|
|
func (s *Server) JobArtifactLocation(id uuid.UUID, name string) (string, error) {
|
|
if s.config.ArtifactsDir == "" {
|
|
return "", errors.New("Artifacts not enabled")
|
|
}
|
|
|
|
jobInfo, err := s.jobInfo(id, nil)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if jobInfo.JobStatus.Finished.IsZero() {
|
|
return "", fmt.Errorf("Cannot access artifacts before job is finished: %s", id)
|
|
}
|
|
|
|
p := path.Join(s.config.ArtifactsDir, id.String(), name)
|
|
if _, err := os.Stat(p); errors.Is(err, os.ErrNotExist) {
|
|
return p, fmt.Errorf("Artifact not found: %s", p)
|
|
}
|
|
return p, nil
|
|
}
|
|
|
|
// Deletes all artifacts for job `id`.
|
|
func (s *Server) DeleteArtifacts(id uuid.UUID) error {
|
|
if s.config.ArtifactsDir == "" {
|
|
return errors.New("Artifacts not enabled")
|
|
}
|
|
|
|
jobInfo, err := s.jobInfo(id, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if jobInfo.JobStatus.Finished.IsZero() {
|
|
return fmt.Errorf("Cannot delete artifacts before job is finished: %s", id)
|
|
}
|
|
|
|
// Remove the ComposeRequest but ignore any errors
|
|
_ = os.Remove(path.Join(s.config.ArtifactsDir, "ComposeRequest", id.String()+".json"))
|
|
|
|
return os.RemoveAll(path.Join(s.config.ArtifactsDir, id.String()))
|
|
}
|
|
|
|
func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes, channels []string, workerID uuid.UUID) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) {
|
|
return s.requestJob(ctx, arch, jobTypes, uuid.Nil, channels, workerID)
|
|
}
|
|
|
|
func (s *Server) RequestJobById(ctx context.Context, arch string, requestedJobId uuid.UUID) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) {
|
|
return s.requestJob(ctx, arch, []string{}, requestedJobId, nil, uuid.Nil)
|
|
}
|
|
|
|
func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, requestedJobId uuid.UUID, channels []string, workerID uuid.UUID) (
|
|
jobId uuid.UUID, token uuid.UUID, jobType string, args json.RawMessage, dynamicArgs []json.RawMessage, err error) {
|
|
// treat osbuild jobs specially until we have found a generic way to
|
|
// specify dequeuing restrictions. For now, we only have one
|
|
// restriction: arch for osbuild jobs.
|
|
jts := []string{}
|
|
// Only set the label used for prometheus metrics when it's an osbuild job. Otherwise the
|
|
// dequeue metrics would set the label for all job types, while the finish metrics only set
|
|
// it for osbuild jobs.
|
|
var archPromLabel string
|
|
for _, t := range jobTypes {
|
|
if t == JobTypeOSBuild {
|
|
t = t + ":" + arch
|
|
archPromLabel = arch
|
|
}
|
|
if t == JobTypeManifestIDOnly {
|
|
return uuid.Nil, uuid.Nil, "", nil, nil, ErrInvalidJobType
|
|
}
|
|
jts = append(jts, t)
|
|
}
|
|
|
|
dequeueCtx := ctx
|
|
var cancel context.CancelFunc
|
|
if s.config.RequestJobTimeout != 0 {
|
|
dequeueCtx, cancel = context.WithTimeout(ctx, s.config.RequestJobTimeout)
|
|
defer cancel()
|
|
}
|
|
|
|
var depIDs []uuid.UUID
|
|
if requestedJobId != uuid.Nil {
|
|
jobId = requestedJobId
|
|
token, depIDs, jobType, args, err = s.jobs.DequeueByID(dequeueCtx, requestedJobId, workerID)
|
|
} else {
|
|
jobId, token, depIDs, jobType, args, err = s.jobs.Dequeue(dequeueCtx, workerID, jts, channels)
|
|
}
|
|
if err != nil {
|
|
if err != jobqueue.ErrDequeueTimeout && err != jobqueue.ErrNotPending {
|
|
logrus.Errorf("dequeuing job failed: %v", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
jobInfo, err := s.jobInfo(jobId, nil)
|
|
if err != nil {
|
|
logrus.Errorf("error retrieving job status: %v", err)
|
|
}
|
|
|
|
// Record how long the job has been pending for, that is either how
|
|
// long it has been queued for, in case it has no dependencies, or
|
|
// how long it has been since all its dependencies finished, if it
|
|
// has any.
|
|
pending := jobInfo.JobStatus.Queued
|
|
jobType = jobInfo.JobType
|
|
|
|
for _, depID := range depIDs {
|
|
// TODO: include type of arguments
|
|
var result json.RawMessage
|
|
var finished time.Time
|
|
_, _, result, _, _, finished, _, _, _, err = s.jobs.JobStatus(depID)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if finished.After(pending) {
|
|
pending = finished
|
|
}
|
|
dynamicArgs = append(dynamicArgs, result)
|
|
}
|
|
|
|
if s.config.ArtifactsDir != "" {
|
|
err = os.MkdirAll(path.Join(s.config.ArtifactsDir, "tmp", token.String()), 0700)
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
prometheus.DequeueJobMetrics(pending, jobInfo.JobStatus.Started, jobInfo.JobType, jobInfo.Channel, archPromLabel)
|
|
|
|
return
|
|
}
|
|
|
|
func (s *Server) FinishJob(token uuid.UUID, result json.RawMessage) error {
|
|
return s.RequeueOrFinishJob(token, 0, result)
|
|
}
|
|
|
|
func (s *Server) RequeueOrFinishJob(token uuid.UUID, maxRetries uint64, result json.RawMessage) error {
|
|
jobId, err := s.jobs.IdFromToken(token)
|
|
if err != nil {
|
|
switch err {
|
|
case jobqueue.ErrNotExist:
|
|
return ErrInvalidToken
|
|
default:
|
|
return err
|
|
}
|
|
}
|
|
|
|
requeued, err := s.jobs.RequeueOrFinishJob(jobId, maxRetries, result)
|
|
if err != nil {
|
|
switch err {
|
|
case jobqueue.ErrNotRunning:
|
|
return ErrJobNotRunning
|
|
default:
|
|
return fmt.Errorf("error finishing job: %v", err)
|
|
}
|
|
}
|
|
|
|
if requeued {
|
|
jobInfo, err := s.jobInfo(jobId, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("error requeueing job: %w", err)
|
|
}
|
|
prometheus.RequeueJobMetrics(jobInfo.JobType, jobInfo.Channel)
|
|
}
|
|
|
|
jobType, err := s.JobType(jobId)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var arch string
|
|
var jobInfo *JobInfo
|
|
var jobResult *JobResult
|
|
switch jobType {
|
|
case JobTypeOSBuild:
|
|
var osbuildJR OSBuildJobResult
|
|
jobInfo, err = s.OSBuildJobInfo(jobId, &osbuildJR)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
arch = osbuildJR.Arch
|
|
jobResult = &osbuildJR.JobResult
|
|
|
|
case JobTypeDepsolve:
|
|
var depsolveJR DepsolveJobResult
|
|
jobInfo, err = s.DepsolveJobInfo(jobId, &depsolveJR)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
jobResult = &depsolveJR.JobResult
|
|
|
|
case JobTypeSearchPackages:
|
|
var searchJR SearchPackagesJobResult
|
|
jobInfo, err = s.SearchPackagesJobInfo(jobId, &searchJR)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
jobResult = &searchJR.JobResult
|
|
|
|
case JobTypeManifestIDOnly:
|
|
var manifestJR ManifestJobByIDResult
|
|
jobInfo, err = s.ManifestJobInfo(jobId, &manifestJR)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
jobResult = &manifestJR.JobResult
|
|
|
|
case JobTypeKojiInit:
|
|
var kojiInitJR KojiInitJobResult
|
|
jobInfo, err = s.KojiInitJobInfo(jobId, &kojiInitJR)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
jobResult = &kojiInitJR.JobResult
|
|
|
|
case JobTypeKojiFinalize:
|
|
var kojiFinalizeJR KojiFinalizeJobResult
|
|
jobInfo, err = s.KojiFinalizeJobInfo(jobId, &kojiFinalizeJR)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
jobResult = &kojiFinalizeJR.JobResult
|
|
case JobTypeAWSEC2Copy:
|
|
var awsEC2CopyJR AWSEC2CopyJobResult
|
|
jobInfo, err = s.AWSEC2CopyJobInfo(jobId, &awsEC2CopyJR)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
jobResult = &awsEC2CopyJR.JobResult
|
|
case JobTypeAWSEC2Share:
|
|
var awsEC2ShareJR AWSEC2ShareJobResult
|
|
jobInfo, err = s.AWSEC2ShareJobInfo(jobId, &awsEC2ShareJR)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
jobResult = &awsEC2ShareJR.JobResult
|
|
case JobTypeContainerResolve:
|
|
var containerResolveJR ContainerResolveJobResult
|
|
jobInfo, err = s.ContainerResolveJobInfo(jobId, &containerResolveJR)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
jobResult = &containerResolveJR.JobResult
|
|
case JobTypeFileResolve:
|
|
var fileResolveJR FileResolveJobResult
|
|
jobInfo, err = s.FileResolveJobInfo(jobId, &fileResolveJR)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
jobResult = &fileResolveJR.JobResult
|
|
case JobTypeOSTreeResolve:
|
|
var ostreeResolveJR OSTreeResolveJobResult
|
|
jobInfo, err = s.OSTreeResolveJobInfo(jobId, &ostreeResolveJR)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
jobResult = &ostreeResolveJR.JobResult
|
|
|
|
default:
|
|
return fmt.Errorf("unexpected job type: %s", jobType)
|
|
}
|
|
|
|
statusCode := clienterrors.GetStatusCode(jobResult.JobError)
|
|
prometheus.FinishJobMetrics(jobInfo.JobStatus.Started, jobInfo.JobStatus.Finished, jobInfo.JobStatus.Canceled, jobType, jobInfo.Channel, arch, statusCode)
|
|
|
|
// Move artifacts from the temporary location to the final job
|
|
// location. Log any errors, but do not treat them as fatal. The job is
|
|
// already finished.
|
|
if s.config.ArtifactsDir != "" {
|
|
err := os.Rename(path.Join(s.config.ArtifactsDir, "tmp", token.String()), path.Join(s.config.ArtifactsDir, jobId.String()))
|
|
if err != nil {
|
|
logrus.Errorf("Error moving artifacts for job %s: %v", jobId, err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) RegisterWorker(c, a string) (uuid.UUID, error) {
|
|
workerID, err := s.jobs.InsertWorker(c, a)
|
|
if err != nil {
|
|
return uuid.Nil, err
|
|
}
|
|
logrus.Infof("Worker (%v) registered", a)
|
|
return workerID, nil
|
|
}
|
|
|
|
func (s *Server) WorkerAvailableForArch(a string) (bool, error) {
|
|
workers, err := s.jobs.Workers(0)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
for _, w := range workers {
|
|
if a == w.Arch {
|
|
return true, nil
|
|
}
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
// apiHandlers implements api.ServerInterface - the http api route handlers
|
|
// generated from api/openapi.yml. This is a separate object, because these
|
|
// handlers should not be exposed on the `Server` object.
|
|
type apiHandlers struct {
|
|
server *Server
|
|
}
|
|
|
|
func (h *apiHandlers) GetOpenapi(ctx echo.Context) error {
|
|
spec, err := api.GetSwagger()
|
|
if err != nil {
|
|
return api.HTTPErrorWithInternal(api.ErrorFailedLoadingOpenAPISpec, err)
|
|
}
|
|
return ctx.JSON(http.StatusOK, spec)
|
|
}
|
|
|
|
func (h *apiHandlers) GetStatus(ctx echo.Context) error {
|
|
return ctx.JSON(http.StatusOK, &api.StatusResponse{
|
|
ObjectReference: api.ObjectReference{
|
|
Href: fmt.Sprintf("%s/status", api.BasePath),
|
|
Id: "status",
|
|
Kind: "Status",
|
|
},
|
|
Status: "OK",
|
|
})
|
|
}
|
|
|
|
func (h *apiHandlers) GetError(ctx echo.Context, id string) error {
|
|
errorId, err := strconv.Atoi(id)
|
|
if err != nil {
|
|
return api.HTTPErrorWithInternal(api.ErrorInvalidErrorId, err)
|
|
}
|
|
|
|
apiError := api.APIError(api.ServiceErrorCode(errorId), nil, ctx)
|
|
// If the service error wasn't found, it's a 404 in this instance
|
|
if apiError.Id == fmt.Sprintf("%d", api.ErrorServiceErrorNotFound) {
|
|
return api.HTTPError(api.ErrorErrorNotFound)
|
|
}
|
|
return ctx.JSON(http.StatusOK, apiError)
|
|
}
|
|
|
|
func (h *apiHandlers) RequestJob(ctx echo.Context) error {
|
|
var body api.RequestJobJSONRequestBody
|
|
err := ctx.Bind(&body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// channel is empty if JWT is not enabled
|
|
var channel string
|
|
if h.server.config.JWTEnabled {
|
|
tenant, err := auth.GetFromClaims(ctx.Request().Context(), h.server.config.TenantProviderFields)
|
|
if err != nil {
|
|
return api.HTTPErrorWithInternal(api.ErrorTenantNotFound, err)
|
|
}
|
|
|
|
// prefix the tenant to prevent collisions if support for specifying channels in a request is ever added
|
|
channel = "org-" + tenant
|
|
}
|
|
|
|
workerID := uuid.Nil
|
|
if body.WorkerId != nil {
|
|
workerID, err = uuid.Parse(*body.WorkerId)
|
|
if err != nil {
|
|
return api.HTTPErrorWithInternal(api.ErrorMalformedWorkerId, err)
|
|
}
|
|
}
|
|
|
|
jobId, jobToken, jobType, jobArgs, dynamicJobArgs, err := h.server.RequestJob(ctx.Request().Context(), body.Arch, body.Types, []string{channel}, workerID)
|
|
if err != nil {
|
|
if err == jobqueue.ErrDequeueTimeout {
|
|
return ctx.JSON(http.StatusNoContent, api.ObjectReference{
|
|
Href: fmt.Sprintf("%s/jobs", api.BasePath),
|
|
Id: uuid.Nil.String(),
|
|
Kind: "RequestJob",
|
|
})
|
|
}
|
|
if err == ErrInvalidJobType {
|
|
return api.HTTPError(api.ErrorInvalidJobType)
|
|
}
|
|
return api.HTTPErrorWithInternal(api.ErrorRequestingJob, err)
|
|
}
|
|
|
|
var respArgs *json.RawMessage
|
|
if len(jobArgs) != 0 {
|
|
respArgs = &jobArgs
|
|
}
|
|
var respDynArgs *[]json.RawMessage
|
|
if len(dynamicJobArgs) != 0 {
|
|
respDynArgs = &dynamicJobArgs
|
|
}
|
|
|
|
response := api.RequestJobResponse{
|
|
ObjectReference: api.ObjectReference{
|
|
Href: fmt.Sprintf("%s/jobs", api.BasePath),
|
|
Id: jobId.String(),
|
|
Kind: "RequestJob",
|
|
},
|
|
Location: fmt.Sprintf("%s/jobs/%v", api.BasePath, jobToken),
|
|
ArtifactLocation: fmt.Sprintf("%s/jobs/%v/artifacts/", api.BasePath, jobToken),
|
|
Type: jobType,
|
|
Args: respArgs,
|
|
DynamicArgs: respDynArgs,
|
|
}
|
|
return ctx.JSON(http.StatusCreated, response)
|
|
}
|
|
|
|
func (h *apiHandlers) GetJob(ctx echo.Context, tokenstr string) error {
|
|
token, err := uuid.Parse(tokenstr)
|
|
if err != nil {
|
|
return api.HTTPErrorWithInternal(api.ErrorMalformedJobToken, err)
|
|
}
|
|
|
|
jobId, err := h.server.jobs.IdFromToken(token)
|
|
if err != nil {
|
|
switch err {
|
|
case jobqueue.ErrNotExist:
|
|
return api.HTTPError(api.ErrorJobNotFound)
|
|
default:
|
|
return api.HTTPErrorWithInternal(api.ErrorResolvingJobId, err)
|
|
}
|
|
}
|
|
|
|
if jobId == uuid.Nil {
|
|
return ctx.JSON(http.StatusOK, api.GetJobResponse{
|
|
ObjectReference: api.ObjectReference{
|
|
Href: fmt.Sprintf("%s/jobs/%v", api.BasePath, token),
|
|
Id: token.String(),
|
|
Kind: "JobStatus",
|
|
},
|
|
Canceled: false,
|
|
})
|
|
}
|
|
|
|
h.server.jobs.RefreshHeartbeat(token)
|
|
|
|
jobInfo, err := h.server.jobInfo(jobId, nil)
|
|
if err != nil {
|
|
return api.HTTPErrorWithInternal(api.ErrorRetrievingJobStatus, err)
|
|
}
|
|
|
|
return ctx.JSON(http.StatusOK, api.GetJobResponse{
|
|
ObjectReference: api.ObjectReference{
|
|
Href: fmt.Sprintf("%s/jobs/%v", api.BasePath, token),
|
|
Id: token.String(),
|
|
Kind: "JobStatus",
|
|
},
|
|
Canceled: jobInfo.JobStatus.Canceled,
|
|
})
|
|
}
|
|
|
|
func (h *apiHandlers) UpdateJob(ctx echo.Context, idstr string) error {
|
|
token, err := uuid.Parse(idstr)
|
|
if err != nil {
|
|
return api.HTTPErrorWithInternal(api.ErrorMalformedJobId, err)
|
|
}
|
|
|
|
var body api.UpdateJobRequest
|
|
err = ctx.Bind(&body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = h.server.FinishJob(token, body.Result)
|
|
if err != nil {
|
|
switch err {
|
|
case ErrInvalidToken:
|
|
return api.HTTPError(api.ErrorJobNotFound)
|
|
case ErrJobNotRunning:
|
|
return api.HTTPError(api.ErrorJobNotRunning)
|
|
default:
|
|
return api.HTTPErrorWithInternal(api.ErrorFinishingJob, err)
|
|
}
|
|
}
|
|
|
|
return ctx.JSON(http.StatusOK, api.UpdateJobResponse{
|
|
Href: fmt.Sprintf("%s/jobs/%v", api.BasePath, token),
|
|
Id: token.String(),
|
|
Kind: "UpdateJobResponse",
|
|
})
|
|
}
|
|
|
|
func (h *apiHandlers) UploadJobArtifact(ctx echo.Context, tokenstr string, name string) error {
|
|
token, err := uuid.Parse(tokenstr)
|
|
if err != nil {
|
|
return api.HTTPErrorWithInternal(api.ErrorMalformedJobId, err)
|
|
}
|
|
|
|
request := ctx.Request()
|
|
|
|
if h.server.config.ArtifactsDir == "" {
|
|
// indicate to the worker that the server is not accepting any artifacts
|
|
return ctx.NoContent(http.StatusBadRequest)
|
|
}
|
|
|
|
f, err := os.Create(path.Join(h.server.config.ArtifactsDir, "tmp", token.String(), name))
|
|
if err != nil {
|
|
return api.HTTPErrorWithInternal(api.ErrorDiscardingArtifact, err)
|
|
}
|
|
|
|
_, err = io.Copy(f, request.Body)
|
|
if err != nil {
|
|
return api.HTTPErrorWithInternal(api.ErrorWritingArtifact, err)
|
|
}
|
|
|
|
return ctx.NoContent(http.StatusOK)
|
|
}
|
|
|
|
func (h *apiHandlers) PostWorkers(ctx echo.Context) error {
|
|
var body api.PostWorkersRequest
|
|
err := ctx.Bind(&body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var channel string
|
|
if h.server.config.JWTEnabled {
|
|
tenant, err := auth.GetFromClaims(ctx.Request().Context(), h.server.config.TenantProviderFields)
|
|
if err != nil {
|
|
return api.HTTPErrorWithInternal(api.ErrorTenantNotFound, err)
|
|
}
|
|
|
|
// prefix the tenant to prevent collisions if support for specifying channels in a request is ever added
|
|
channel = "org-" + tenant
|
|
}
|
|
|
|
workerID, err := h.server.RegisterWorker(channel, body.Arch)
|
|
if err != nil {
|
|
return api.HTTPErrorWithInternal(api.ErrorInsertingWorker, err)
|
|
}
|
|
|
|
return ctx.JSON(http.StatusCreated, api.PostWorkersResponse{
|
|
ObjectReference: api.ObjectReference{
|
|
Href: fmt.Sprintf("%s/workers", api.BasePath),
|
|
Id: workerID.String(),
|
|
Kind: "WorkerID",
|
|
},
|
|
WorkerId: workerID.String(),
|
|
})
|
|
}
|
|
|
|
func (h *apiHandlers) PostWorkerStatus(ctx echo.Context, workerIdstr string) error {
|
|
workerID, err := uuid.Parse(workerIdstr)
|
|
if err != nil {
|
|
return api.HTTPErrorWithInternal(api.ErrorMalformedWorkerId, err)
|
|
}
|
|
err = h.server.jobs.UpdateWorkerStatus(workerID)
|
|
|
|
if err == jobqueue.ErrWorkerNotExist {
|
|
return api.HTTPErrorWithInternal(api.ErrorWorkerIdNotFound, err)
|
|
}
|
|
|
|
if err != nil {
|
|
return api.HTTPErrorWithInternal(api.ErrorUpdatingWorkerStatus, err)
|
|
}
|
|
|
|
return ctx.NoContent(http.StatusOK)
|
|
}
|
|
|
|
// A simple echo.Binder(), which only accepts application/json, but is more
|
|
// strict than echo's DefaultBinder. It does not handle binding query
|
|
// parameters either.
|
|
type binder struct{}
|
|
|
|
func (b binder) Bind(i interface{}, ctx echo.Context) error {
|
|
request := ctx.Request()
|
|
|
|
contentType := request.Header["Content-Type"]
|
|
if len(contentType) != 1 || contentType[0] != "application/json" {
|
|
return api.HTTPError(api.ErrorUnsupportedMediaType)
|
|
}
|
|
|
|
err := json.NewDecoder(request.Body).Decode(i)
|
|
if err != nil {
|
|
return api.HTTPErrorWithInternal(api.ErrorBodyDecodingError, err)
|
|
}
|
|
|
|
return nil
|
|
}
|