worker: add new container resolve job type

This is a new job that can be used to resolve containers. It uses
the existing `container.Resolver` class to do the actual work.
This commit is contained in:
Christian Kellner 2022-07-21 17:18:40 +02:00
parent c2f3f76d96
commit 50e630a76f
5 changed files with 115 additions and 8 deletions

View file

@ -0,0 +1,59 @@
package main
import (
"fmt"
"github.com/sirupsen/logrus"
"github.com/osbuild/osbuild-composer/internal/container"
"github.com/osbuild/osbuild-composer/internal/worker"
"github.com/osbuild/osbuild-composer/internal/worker/clienterrors"
)
type ContainerResolveJobImpl struct {
AuthFilePath string
}
func (impl *ContainerResolveJobImpl) Run(job worker.Job) error {
logWithId := logrus.WithField("jobId", job.Id())
var args worker.ContainerResolveJob
err := job.Args(&args)
if err != nil {
return err
}
result := worker.ContainerResolveJobResult{
Specs: make([]worker.ContainerSpec, len(args.Specs)),
}
logWithId.Infof("Resolving containers (%d)", len(args.Specs))
resolver := container.NewResolver(args.Arch)
for _, s := range args.Specs {
resolver.Add(s.Source, s.Name, s.TLSVerify)
}
specs, err := resolver.Finish()
if err != nil {
result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorContainerResolution, err.Error())
} else {
for i, spec := range specs {
result.Specs[i] = worker.ContainerSpec{
Source: spec.Source,
Name: spec.LocalName,
TLSVerify: spec.TLSVerify,
ImageID: spec.ImageID,
Digest: spec.Digest,
}
}
}
err = job.Update(&result)
if err != nil {
return fmt.Errorf("Error reporting job result: %v", err)
}
return nil
}

View file

@ -434,9 +434,7 @@ func main() {
worker.JobTypeKojiInit: &KojiInitJobImpl{
KojiServers: kojiServers,
},
worker.JobTypeKojiFinalize: &KojiFinalizeJobImpl{
KojiServers: kojiServers,
},
worker.JobTypeKojiFinalize: &KojiFinalizeJobImpl{},
}
acceptedJobTypes := []string{}

View file

@ -30,6 +30,7 @@ const (
ErrorJobMissingHeartbeat ClientErrorCode = 27
ErrorTargetError ClientErrorCode = 28
ErrorParsingJobArgs ClientErrorCode = 29
ErrorContainerResolution ClientErrorCode = 30
)
type ClientErrorCode int
@ -83,6 +84,8 @@ func GetStatusCode(err *Error) StatusCode {
return JobStatusUserInputError
case ErrorEmptyManifest:
return JobStatusUserInputError
case ErrorContainerResolution:
return JobStatusUserInputError
default:
return JobStatusInternalError
}

View file

@ -236,6 +236,26 @@ type ManifestJobByIDResult struct {
JobResult
}
type ContainerSpec struct {
Source string `json:"source"`
Name string `json:"name"`
TLSVerify *bool `json:"tls-verify,omitempty"`
ImageID string `json:"image_id"`
Digest string `json:"digest"`
}
type ContainerResolveJob struct {
Arch string `json:"arch"`
Specs []ContainerSpec `json:"specs"`
}
type ContainerResolveJobResult struct {
Specs []ContainerSpec `json:"specs"`
JobResult
}
//
// JSON-serializable types for the client
//

View file

@ -29,11 +29,12 @@ import (
)
const (
JobTypeOSBuild string = "osbuild"
JobTypeKojiInit string = "koji-init"
JobTypeKojiFinalize string = "koji-finalize"
JobTypeDepsolve string = "depsolve"
JobTypeManifestIDOnly string = "manifest-id-only"
JobTypeOSBuild string = "osbuild"
JobTypeKojiInit string = "koji-init"
JobTypeKojiFinalize string = "koji-finalize"
JobTypeDepsolve string = "depsolve"
JobTypeManifestIDOnly string = "manifest-id-only"
JobTypeContainerResolve string = "container-resolve"
)
type Server struct {
@ -142,6 +143,10 @@ func (s *Server) EnqueueManifestJobByID(job *ManifestJobByID, parent uuid.UUID,
return s.enqueue(JobTypeManifestIDOnly, job, []uuid.UUID{parent}, channel)
}
func (s *Server) EnqueueContainerResolveJob(job *ContainerResolveJob, channel string) (uuid.UUID, error) {
return s.enqueue(JobTypeContainerResolve, job, nil, channel)
}
func (s *Server) enqueue(jobType string, job interface{}, dependencies []uuid.UUID, channel string) (uuid.UUID, error) {
prometheus.EnqueueJobMetrics(jobType, channel)
return s.jobs.Enqueue(jobType, job, dependencies, channel)
@ -198,6 +203,14 @@ func (s *Server) JobDependencyChainErrors(id uuid.UUID) (*clienterrors.Error, er
}
jobResult = &kojiFinalizeJR.JobResult
case JobTypeContainerResolve:
var containerResolveJR ContainerResolveJobResult
_, jobDeps, err = s.ContainerResolveJobStatus(id, &containerResolveJR)
if err != nil {
return nil, err
}
jobResult = &containerResolveJR.JobResult
default:
return nil, fmt.Errorf("unexpected job type: %s", jobType)
}
@ -323,6 +336,20 @@ func (s *Server) ManifestJobStatus(id uuid.UUID, result *ManifestJobByIDResult)
return status, deps, nil
}
func (s *Server) ContainerResolveJobStatus(id uuid.UUID, result *ContainerResolveJobResult) (*JobStatus, []uuid.UUID, error) {
jobType, _, status, deps, err := s.jobStatus(id, result)
if err != nil {
return nil, nil, err
}
if jobType != JobTypeContainerResolve {
return nil, nil, fmt.Errorf("expected %q, found %q job instead", JobTypeDepsolve, jobType)
}
return status, deps, nil
}
func (s *Server) jobStatus(id uuid.UUID, result interface{}) (string, string, *JobStatus, []uuid.UUID, error) {
jobType, channel, rawResult, queued, started, finished, canceled, deps, err := s.jobs.JobStatus(id)
if err != nil {