worker: Introduce manifest-id-only job
A job intended to run in composer itself, after which a dependant osbuild job can parse the manifest from it's dynamic arguments.
This commit is contained in:
parent
d3a3dbafed
commit
6757916c54
5 changed files with 100 additions and 7 deletions
|
|
@ -84,6 +84,17 @@ func (impl *OSBuildJobImpl) Run(job worker.Job) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// In case the manifest is empty, try to get it from dynamic args
|
||||
if len(args.Manifest) == 0 && job.NDynamicArgs() > 0 {
|
||||
var manifestJR worker.ManifestJobByIDResult
|
||||
err = job.DynamicArgs(0, &manifestJR)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
args.Manifest = manifestJR.Manifest
|
||||
}
|
||||
|
||||
// The specification allows multiple upload targets because it is an array, but we don't support it.
|
||||
// Return an error to osbuild-composer.
|
||||
if len(args.Targets) > 1 {
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ const (
|
|||
ErrorMethodNotAllowed ServiceErrorCode = 12
|
||||
ErrorNotAcceptable ServiceErrorCode = 13
|
||||
ErrorErrorNotFound ServiceErrorCode = 14
|
||||
ErrorInvalidJobType ServiceErrorCode = 15
|
||||
// ErrorTokenNotFound ServiceErrorCode = 6
|
||||
|
||||
// internal errors
|
||||
|
|
@ -59,7 +60,6 @@ func getServiceErrors() serviceErrors {
|
|||
serviceError{ErrorJobNotRunning, http.StatusBadRequest, "Job is not running"},
|
||||
serviceError{ErrorMalformedJobId, http.StatusBadRequest, "Given job id is not a uuidv4"},
|
||||
serviceError{ErrorMalformedJobToken, http.StatusBadRequest, "Given job id is not a uuidv4"},
|
||||
|
||||
serviceError{ErrorDiscardingArtifact, http.StatusInternalServerError, "Error discarding artifact"},
|
||||
serviceError{ErrorCreatingArtifact, http.StatusInternalServerError, "Error creating artifact"},
|
||||
serviceError{ErrorWritingArtifact, http.StatusInternalServerError, "Error writing artifact"},
|
||||
|
|
@ -73,6 +73,7 @@ func getServiceErrors() serviceErrors {
|
|||
serviceError{ErrorMethodNotAllowed, http.StatusMethodNotAllowed, "Requested method isn't supported for resource"},
|
||||
serviceError{ErrorNotAcceptable, http.StatusNotAcceptable, "Only 'application/json' content is supported"},
|
||||
serviceError{ErrorErrorNotFound, http.StatusNotFound, "Error with given id not found"},
|
||||
serviceError{ErrorInvalidJobType, http.StatusBadRequest, "Requested job type cannot be dequeued"},
|
||||
|
||||
serviceError{ErrorUnspecified, http.StatusInternalServerError, "Unspecified internal error "},
|
||||
serviceError{ErrorNotHTTPError, http.StatusInternalServerError, "Error is not an instance of HTTPError"},
|
||||
|
|
|
|||
|
|
@ -94,6 +94,13 @@ type DepsolveJobResult struct {
|
|||
ErrorType ErrorType `json:"error_type"`
|
||||
}
|
||||
|
||||
type ManifestJobByID struct{}
|
||||
|
||||
type ManifestJobByIDResult struct {
|
||||
Manifest distro.Manifest `json:"data,omitempty"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
//
|
||||
// JSON-serializable types for the client
|
||||
//
|
||||
|
|
|
|||
|
|
@ -41,6 +41,7 @@ type JobStatus struct {
|
|||
|
||||
var ErrInvalidToken = errors.New("token does not exist")
|
||||
var ErrJobNotRunning = errors.New("job isn't running")
|
||||
var ErrInvalidJobType = errors.New("job has invalid type")
|
||||
|
||||
func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, artifactsDir string, requestJobTimeout time.Duration, basePath string) *Server {
|
||||
s := &Server{
|
||||
|
|
@ -94,6 +95,10 @@ func (s *Server) EnqueueOSBuild(arch string, job *OSBuildJob) (uuid.UUID, error)
|
|||
return s.jobs.Enqueue("osbuild:"+arch, job, nil)
|
||||
}
|
||||
|
||||
func (s *Server) EnqueueOSBuildAsDependency(arch string, job *OSBuildJob, manifestID uuid.UUID) (uuid.UUID, error) {
|
||||
return s.jobs.Enqueue("osbuild:"+arch, job, []uuid.UUID{manifestID})
|
||||
}
|
||||
|
||||
func (s *Server) EnqueueOSBuildKoji(arch string, job *OSBuildKojiJob, initID uuid.UUID) (uuid.UUID, error) {
|
||||
return s.jobs.Enqueue("osbuild-koji:"+arch, job, []uuid.UUID{initID})
|
||||
}
|
||||
|
|
@ -110,6 +115,10 @@ func (s *Server) EnqueueDepsolve(job *DepsolveJob) (uuid.UUID, error) {
|
|||
return s.jobs.Enqueue("depsolve", job, nil)
|
||||
}
|
||||
|
||||
func (s *Server) EnqueueManifestJobByID(job *ManifestJobByID, parent uuid.UUID) (uuid.UUID, error) {
|
||||
return s.jobs.Enqueue("manifest-id-only", job, []uuid.UUID{parent})
|
||||
}
|
||||
|
||||
func (s *Server) JobStatus(id uuid.UUID, result interface{}) (*JobStatus, []uuid.UUID, error) {
|
||||
rawResult, queued, started, finished, canceled, deps, err := s.jobs.JobStatus(id)
|
||||
if err != nil {
|
||||
|
|
@ -208,6 +217,15 @@ func (s *Server) DeleteArtifacts(id uuid.UUID) error {
|
|||
}
|
||||
|
||||
func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) {
|
||||
return s.requestJob(ctx, arch, jobTypes, uuid.Nil)
|
||||
}
|
||||
|
||||
func (s *Server) RequestJobById(ctx context.Context, arch string, requestedJobId uuid.UUID) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) {
|
||||
return s.requestJob(ctx, arch, []string{}, requestedJobId)
|
||||
}
|
||||
|
||||
func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, requestedJobId uuid.UUID) (
|
||||
jobId uuid.UUID, token uuid.UUID, jobType string, args json.RawMessage, dynamicArgs []json.RawMessage, err error) {
|
||||
// treat osbuild jobs specially until we have found a generic way to
|
||||
// specify dequeuing restrictions. For now, we only have one
|
||||
// restriction: arch for osbuild jobs.
|
||||
|
|
@ -216,6 +234,9 @@ func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string)
|
|||
if t == "osbuild" || t == "osbuild-koji" {
|
||||
t = t + ":" + arch
|
||||
}
|
||||
if t == "manifest-id-only" {
|
||||
return uuid.Nil, uuid.Nil, "", nil, nil, ErrInvalidJobType
|
||||
}
|
||||
jts = append(jts, t)
|
||||
}
|
||||
|
||||
|
|
@ -225,21 +246,27 @@ func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string)
|
|||
dequeueCtx, cancel = context.WithTimeout(ctx, s.requestJobTimeout)
|
||||
defer cancel()
|
||||
}
|
||||
jobId, token, depIDs, jobType, args, err := s.jobs.Dequeue(dequeueCtx, jts)
|
||||
|
||||
var depIDs []uuid.UUID
|
||||
if requestedJobId != uuid.Nil {
|
||||
jobId = requestedJobId
|
||||
token, depIDs, jobType, args, err = s.jobs.DequeueByID(dequeueCtx, requestedJobId)
|
||||
} else {
|
||||
jobId, token, depIDs, jobType, args, err = s.jobs.Dequeue(dequeueCtx, jts)
|
||||
}
|
||||
if err != nil {
|
||||
return uuid.Nil, uuid.Nil, "", nil, nil, err
|
||||
return
|
||||
}
|
||||
|
||||
var dynamicArgs []json.RawMessage
|
||||
for _, depID := range depIDs {
|
||||
result, _, _, _, _, _, _ := s.jobs.JobStatus(depID)
|
||||
dynamicArgs = append(dynamicArgs, result)
|
||||
}
|
||||
|
||||
if s.artifactsDir != "" {
|
||||
err := os.MkdirAll(path.Join(s.artifactsDir, "tmp", token.String()), 0700)
|
||||
err = os.MkdirAll(path.Join(s.artifactsDir, "tmp", token.String()), 0700)
|
||||
if err != nil {
|
||||
return uuid.Nil, uuid.Nil, "", nil, nil, fmt.Errorf("cannot create artifact directory: %v", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -249,7 +276,7 @@ func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string)
|
|||
jobType = "osbuild-koji"
|
||||
}
|
||||
|
||||
return jobId, token, jobType, args, dynamicArgs, nil
|
||||
return
|
||||
}
|
||||
|
||||
func (s *Server) FinishJob(token uuid.UUID, result json.RawMessage) error {
|
||||
|
|
@ -352,6 +379,9 @@ func (h *apiHandlers) RequestJob(ctx echo.Context) error {
|
|||
Kind: "RequestJob",
|
||||
})
|
||||
}
|
||||
if err == ErrInvalidJobType {
|
||||
return api.HTTPError(api.ErrorInvalidJobType)
|
||||
}
|
||||
return api.HTTPErrorWithInternal(api.ErrorRequestingJob, err)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -407,3 +407,47 @@ func TestTimeout(t *testing.T) {
|
|||
test.TestRoute(t, server.Handler(), false, "POST", "/api/image-builder-worker/v1/jobs", `{"arch":"arch","types":["types"]}`, http.StatusNoContent,
|
||||
`{"href":"/api/image-builder-worker/v1/jobs","id":"00000000-0000-0000-0000-000000000000","kind":"RequestJob"}`)
|
||||
}
|
||||
|
||||
func TestRequestJobById(t *testing.T) {
|
||||
tempdir, err := ioutil.TempDir("", "worker-tests-")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(tempdir)
|
||||
|
||||
distroStruct := test_distro.New()
|
||||
arch, err := distroStruct.GetArch(test_distro.TestArchName)
|
||||
if err != nil {
|
||||
t.Fatalf("error getting arch from distro: %v", err)
|
||||
}
|
||||
server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1")
|
||||
handler := server.Handler()
|
||||
|
||||
depsolveJobId, err := server.EnqueueDepsolve(&worker.DepsolveJob{})
|
||||
require.NoError(t, err)
|
||||
|
||||
jobId, err := server.EnqueueManifestJobByID(&worker.ManifestJobByID{}, depsolveJobId)
|
||||
require.NoError(t, err)
|
||||
|
||||
test.TestRoute(t, server.Handler(), false, "POST", "/api/worker/v1/jobs", `{"arch":"arch","types":["manifest-id-only"]}`, http.StatusBadRequest,
|
||||
`{"href":"/api/worker/v1/errors/15","kind":"Error","id": "15","code":"IMAGE-BUILDER-WORKER-15"}`, "operation_id", "reason", "message")
|
||||
|
||||
_, _, _, _, _, err = server.RequestJobById(context.Background(), arch.Name(), jobId)
|
||||
require.Error(t, jobqueue.ErrNotPending, err)
|
||||
|
||||
_, token, _, _, _, err := server.RequestJob(context.Background(), arch.Name(), []string{"depsolve"})
|
||||
require.NoError(t, err)
|
||||
|
||||
depsolveJR, err := json.Marshal(worker.DepsolveJobResult{})
|
||||
require.NoError(t, err)
|
||||
err = server.FinishJob(token, depsolveJR)
|
||||
require.NoError(t, err)
|
||||
|
||||
j, token, typ, args, dynamicArgs, err := server.RequestJobById(context.Background(), arch.Name(), jobId)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, jobId, j)
|
||||
require.Equal(t, "manifest-id-only", typ)
|
||||
require.NotNil(t, args)
|
||||
require.NotNil(t, dynamicArgs)
|
||||
|
||||
test.TestRoute(t, handler, false, "GET", fmt.Sprintf("/api/worker/v1/jobs/%s", token), `{}`, http.StatusOK,
|
||||
fmt.Sprintf(`{"canceled":false,"href":"/api/worker/v1/jobs/%s","id":"%s","kind":"JobStatus"}`, token, token))
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue