worker/server: typesafe Job and JobStatus

Replace Job() and JobStatus() with typesafe versions, and introduce JobType()
for the rare instances where we don't know the type up front.

Additionally, catch a few more error cases:
 - if OSBuildResult is nil, then we failed to invoke osbuild
 - make sure the same JobResult handling is done for osbuild-koji, as for osbuild
This commit is contained in:
Tom Gundersen 2022-01-27 17:45:05 +00:00
parent da1537dee6
commit b32ab36e1d
10 changed files with 240 additions and 202 deletions

View file

@ -584,7 +584,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
return
}
_, _, err = workers.JobStatus(depsolveJobID, &depsolveResults)
_, _, err = workers.DepsolveJobStatus(depsolveJobID, &depsolveResults)
if err != nil {
reason := "Error reading depsolve status"
jobResult.JobError = clienterrors.WorkerClientError(clienterrors.ErrorReadingJobStatus, reason)
@ -650,14 +650,14 @@ func (h *apiHandlers) GetComposeStatus(ctx echo.Context, id string) error {
return HTTPError(ErrorInvalidComposeId)
}
jobType, _, _, err := h.server.workers.Job(jobId, nil)
jobType, err := h.server.workers.JobType(jobId)
if err != nil {
return HTTPError(ErrorComposeNotFound)
}
if strings.HasPrefix(jobType, "osbuild:") {
if jobType == "osbuild" {
var result worker.OSBuildJobResult
status, _, err := h.server.workers.JobStatus(jobId, &result)
status, _, err := h.server.workers.OSBuildJobStatus(jobId, &result)
if err != nil {
return HTTPError(ErrorMalformedOSBuildJobResult)
}
@ -725,7 +725,7 @@ func (h *apiHandlers) GetComposeStatus(ctx echo.Context, id string) error {
})
} else if jobType == "koji-finalize" {
var result worker.KojiFinalizeJobResult
finalizeStatus, deps, err := h.server.workers.JobStatus(jobId, &result)
finalizeStatus, deps, err := h.server.workers.KojiFinalizeJobStatus(jobId, &result)
if err != nil {
return HTTPError(ErrorMalformedOSBuildJobResult)
}
@ -734,12 +734,12 @@ func (h *apiHandlers) GetComposeStatus(ctx echo.Context, id string) error {
return HTTPError(ErrorUnexpectedNumberOfImageBuilds)
}
var initResult worker.KojiInitJobResult
_, _, err = h.server.workers.JobStatus(deps[0], &initResult)
_, _, err = h.server.workers.KojiInitJobStatus(deps[0], &initResult)
if err != nil {
return HTTPError(ErrorMalformedOSBuildJobResult)
}
var buildResult worker.OSBuildKojiJobResult
buildJobStatus, _, err := h.server.workers.JobStatus(deps[1], &buildResult)
buildJobStatus, _, err := h.server.workers.OSBuildKojiJobStatus(deps[1], &buildResult)
if err != nil {
return HTTPError(ErrorMalformedOSBuildJobResult)
}
@ -863,13 +863,13 @@ func (h *apiHandlers) GetComposeMetadata(ctx echo.Context, id string) error {
}
var result worker.OSBuildJobResult
status, _, err := h.server.workers.JobStatus(jobId, &result)
status, _, err := h.server.workers.OSBuildJobStatus(jobId, &result)
if err != nil {
return HTTPErrorWithInternal(ErrorComposeNotFound, err)
}
var job worker.OSBuildJob
if _, _, _, err = h.server.workers.Job(jobId, &job); err != nil {
if err = h.server.workers.OSBuildJob(jobId, &job); err != nil {
return HTTPErrorWithInternal(ErrorComposeNotFound, err)
}

View file

@ -67,7 +67,7 @@ const (
FROM jobs
WHERE id = $1`
sqlQueryJobStatus = `
SELECT result, queued_at, started_at, finished_at, canceled
SELECT type, result, queued_at, started_at, finished_at, canceled
FROM jobs
WHERE id = $1`
sqlQueryRunningId = `
@ -396,7 +396,7 @@ func (q *DBJobQueue) CancelJob(id uuid.UUID) error {
return nil
}
func (q *DBJobQueue) JobStatus(id uuid.UUID) (result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) {
func (q *DBJobQueue) JobStatus(id uuid.UUID) (jobType string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) {
conn, err := q.pool.Acquire(context.Background())
if err != nil {
return
@ -406,7 +406,7 @@ func (q *DBJobQueue) JobStatus(id uuid.UUID) (result json.RawMessage, queued, st
// Use double pointers for timestamps because they might be NULL, which would result in *time.Time == nil
var sp, fp *time.Time
var rp pgtype.JSON
err = conn.QueryRow(context.Background(), sqlQueryJobStatus, id).Scan(&rp, &queued, &sp, &fp, &canceled)
err = conn.QueryRow(context.Background(), sqlQueryJobStatus, id).Scan(&jobType, &rp, &queued, &sp, &fp, &canceled)
if err != nil {
return
}

View file

@ -343,12 +343,13 @@ func (q *fsJobQueue) CancelJob(id uuid.UUID) error {
return nil
}
func (q *fsJobQueue) JobStatus(id uuid.UUID) (result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) {
func (q *fsJobQueue) JobStatus(id uuid.UUID) (jobType string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error) {
j, err := q.readJob(id)
if err != nil {
return
}
jobType = j.Type
result = j.Result
queued = j.QueuedAt
started = j.StartedAt

View file

@ -64,7 +64,7 @@ type JobQueue interface {
// finished, respectively.
//
// Lastly, the IDs of the jobs dependencies are returned.
JobStatus(id uuid.UUID) (result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error)
JobStatus(id uuid.UUID) (jobType string, result json.RawMessage, queued, started, finished time.Time, canceled bool, deps []uuid.UUID, err error)
// Job returns all the parameters that define a job (everything provided during Enqueue).
Job(id uuid.UUID) (jobType string, args json.RawMessage, dependencies []uuid.UUID, err error)

View file

@ -188,8 +188,9 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) {
require.ElementsMatch(t, []uuid.UUID{one, two}, r)
j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
_, queued, started, finished, canceled, deps, err := q.JobStatus(j)
jobType, _, queued, started, finished, canceled, deps, err := q.JobStatus(j)
require.NoError(t, err)
require.Equal(t, jobType, "test")
require.True(t, !queued.IsZero())
require.True(t, started.IsZero())
require.True(t, finished.IsZero())
@ -198,8 +199,9 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) {
require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two}))
result, queued, started, finished, canceled, deps, err := q.JobStatus(j)
jobType, result, queued, started, finished, canceled, deps, err := q.JobStatus(j)
require.NoError(t, err)
require.Equal(t, jobType, "test")
require.True(t, !queued.IsZero())
require.True(t, !started.IsZero())
require.True(t, !finished.IsZero())
@ -215,8 +217,9 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) {
two := pushTestJob(t, q, "test", nil, nil)
j := pushTestJob(t, q, "test", nil, []uuid.UUID{one, two})
_, queued, started, finished, canceled, deps, err := q.JobStatus(j)
jobType, _, queued, started, finished, canceled, deps, err := q.JobStatus(j)
require.NoError(t, err)
require.Equal(t, jobType, "test")
require.True(t, !queued.IsZero())
require.True(t, started.IsZero())
require.True(t, finished.IsZero())
@ -230,8 +233,9 @@ func testDependencies(t *testing.T, q jobqueue.JobQueue) {
require.Equal(t, j, finishNextTestJob(t, q, "test", testResult{}, []uuid.UUID{one, two}))
result, queued, started, finished, canceled, deps, err := q.JobStatus(j)
jobType, result, queued, started, finished, canceled, deps, err := q.JobStatus(j)
require.NoError(t, err)
require.Equal(t, jobType, "test")
require.True(t, !queued.IsZero())
require.True(t, !started.IsZero())
require.True(t, !finished.IsZero())
@ -325,8 +329,9 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) {
require.NotEmpty(t, id)
err = q.CancelJob(id)
require.NoError(t, err)
result, _, _, _, canceled, _, err := q.JobStatus(id)
jobType, result, _, _, _, canceled, _, err := q.JobStatus(id)
require.NoError(t, err)
require.Equal(t, jobType, "clownfish")
require.True(t, canceled)
require.Nil(t, result)
err = q.FinishJob(id, &testResult{})
@ -344,8 +349,9 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) {
require.Equal(t, json.RawMessage("null"), args)
err = q.CancelJob(id)
require.NoError(t, err)
result, _, _, _, canceled, _, err = q.JobStatus(id)
jobType, result, _, _, _, canceled, _, err = q.JobStatus(id)
require.NoError(t, err)
require.Equal(t, jobType, "clownfish")
require.True(t, canceled)
require.Nil(t, result)
err = q.FinishJob(id, &testResult{})
@ -366,8 +372,9 @@ func testCancel(t *testing.T, q jobqueue.JobQueue) {
err = q.CancelJob(id)
require.Error(t, err)
require.Equal(t, jobqueue.ErrNotRunning, err)
result, _, _, _, canceled, _, err = q.JobStatus(id)
jobType, result, _, _, _, canceled, _, err = q.JobStatus(id)
require.NoError(t, err)
require.Equal(t, jobType, "clownfish")
require.False(t, canceled)
err = json.Unmarshal(result, &testResult{})
require.NoError(t, err)

View file

@ -207,7 +207,7 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
// changes.
var initResult worker.KojiInitJobResult
for {
status, _, err := h.server.workers.JobStatus(initID, &initResult)
status, _, err := h.server.workers.KojiInitJobStatus(initID, &initResult)
if err != nil {
panic(err)
}
@ -306,23 +306,14 @@ func (h *apiHandlers) GetComposeId(ctx echo.Context, idstr string) error {
return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter id: %s", err))
}
// Make sure id exists and matches a FinalizeJob
if _, _, err := h.getFinalizeJob(id); err != nil {
return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("Job %s not found: %s", idstr, err))
}
var finalizeResult worker.KojiFinalizeJobResult
finalizeStatus, deps, err := h.server.workers.JobStatus(id, &finalizeResult)
finalizeStatus, deps, err := h.server.workers.KojiFinalizeJobStatus(id, &finalizeResult)
if err != nil {
return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("Job %s not found: %s", idstr, err))
}
// Make sure deps[0] matches a KojiInitJob
if _, err := h.getInitJob(deps[0]); err != nil {
panic(err)
}
var initResult worker.KojiInitJobResult
_, _, err = h.server.workers.JobStatus(deps[0], &initResult)
_, _, err = h.server.workers.KojiInitJobStatus(deps[0], &initResult)
if err != nil {
// this is a programming error
panic(err)
@ -331,12 +322,8 @@ func (h *apiHandlers) GetComposeId(ctx echo.Context, idstr string) error {
var buildResults []worker.OSBuildKojiJobResult
var imageStatuses []api.ImageStatus
for i := 1; i < len(deps); i++ {
// Make sure deps[i] matches an OSBuildKojiJob
if _, _, err := h.getBuildJob(deps[i]); err != nil {
panic(err)
}
var buildResult worker.OSBuildKojiJobResult
jobStatus, _, err := h.server.workers.JobStatus(deps[i], &buildResult)
jobStatus, _, err := h.server.workers.OSBuildKojiJobStatus(deps[i], &buildResult)
if err != nil {
// this is a programming error
panic(err)
@ -372,24 +359,14 @@ func (h *apiHandlers) GetComposeIdLogs(ctx echo.Context, idstr string) error {
return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter id: %s", err))
}
// Make sure id exists and matches a FinalizeJob
if _, _, err := h.getFinalizeJob(id); err != nil {
return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("Job %s not found: %s", idstr, err))
}
var finalizeResult worker.KojiFinalizeJobResult
_, deps, err := h.server.workers.JobStatus(id, &finalizeResult)
_, deps, err := h.server.workers.KojiFinalizeJobStatus(id, &finalizeResult)
if err != nil {
return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("Job %s not found: %s", idstr, err))
}
// Make sure deps[0] matches a KojiInitJob
if _, err := h.getInitJob(deps[0]); err != nil {
panic(err)
}
var initResult worker.KojiInitJobResult
_, _, err = h.server.workers.JobStatus(deps[0], &initResult)
_, _, err = h.server.workers.KojiInitJobStatus(deps[0], &initResult)
if err != nil {
// This is a programming error.
panic(err)
@ -397,12 +374,8 @@ func (h *apiHandlers) GetComposeIdLogs(ctx echo.Context, idstr string) error {
var buildResults []interface{}
for i := 1; i < len(deps); i++ {
// Make sure deps[i] matches an OSBuildKojiJob
if _, _, err := h.getBuildJob(deps[i]); err != nil {
panic(err)
}
var buildResult worker.OSBuildKojiJobResult
_, _, err = h.server.workers.JobStatus(deps[i], &buildResult)
_, _, err = h.server.workers.OSBuildKojiJobStatus(deps[i], &buildResult)
if err != nil {
// This is a programming error.
panic(err)
@ -422,53 +395,6 @@ func (h *apiHandlers) GetComposeIdLogs(ctx echo.Context, idstr string) error {
return ctx.JSON(http.StatusOK, response)
}
// getFinalizeJob retrieves a KojiFinalizeJob and the IDs of its dependencies
// from the job queue given its ID. It returns an error if the ID matches a
// job of a different type.
func (h *apiHandlers) getFinalizeJob(id uuid.UUID) (*worker.KojiFinalizeJob, []uuid.UUID, error) {
job := new(worker.KojiFinalizeJob)
jobType, _, deps, err := h.server.workers.Job(id, job)
if err != nil {
return nil, nil, err
}
expType := "koji-finalize"
if jobType != expType {
return nil, nil, fmt.Errorf("expected %q, found %q job instead", expType, jobType)
}
return job, deps, err
}
// getInitJob retrieves a KojiInitJob from the job queue given its ID.
func (h *apiHandlers) getInitJob(id uuid.UUID) (*worker.KojiInitJob, error) {
// It returns an error if the ID matches a job of a different type.
job := new(worker.KojiInitJob)
jobType, _, _, err := h.server.workers.Job(id, job)
if err != nil {
return nil, err
}
expType := "koji-init"
if jobType != expType {
return nil, fmt.Errorf("expected %q, found %q job instead", expType, jobType)
}
return job, err
}
// getBuildJob retrieves a OSBuildKojiJob and the IDs of its dependencies from
// the job queue given its ID. It returns an error if the ID matches a job of
// a different type.
func (h *apiHandlers) getBuildJob(id uuid.UUID) (*worker.OSBuildKojiJob, []uuid.UUID, error) {
job := new(worker.OSBuildKojiJob)
jobType, _, deps, err := h.server.workers.Job(id, job)
if err != nil {
return nil, nil, err
}
expType := "osbuild-koji"
if !strings.HasPrefix(jobType, expType) { // Build jobs get automatic arch suffix: Check prefix
return nil, nil, fmt.Errorf("expected %q, found %q job instead", expType, jobType)
}
return job, deps, nil
}
// GetComposeIdManifests returns the Manifests for a given Compose (one for each image).
func (h *apiHandlers) GetComposeIdManifests(ctx echo.Context, idstr string) error {
id, err := uuid.Parse(idstr)
@ -476,19 +402,20 @@ func (h *apiHandlers) GetComposeIdManifests(ctx echo.Context, idstr string) erro
return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter id: %s", err))
}
_, deps, err := h.getFinalizeJob(id)
var finalizeResult worker.KojiFinalizeJobResult
_, deps, err := h.server.workers.KojiFinalizeJobStatus(id, &finalizeResult)
if err != nil {
return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("Job %s not found: %s", idstr, err))
}
manifests := make([]distro.Manifest, len(deps)-1)
for i, id := range deps[1:] {
buildJob, _, err := h.getBuildJob(id)
var manifests []distro.Manifest
for _, id := range deps[1:] {
var buildJob worker.OSBuildKojiJob
err = h.server.workers.OSBuildKojiJob(id, &buildJob)
if err != nil {
// This is a programming error.
panic(err)
return echo.NewHTTPError(http.StatusNotFound, fmt.Sprintf("Job %s could not be deserialized: %s", idstr, err))
}
manifests[i] = buildJob.Manifest
manifests = append(manifests, buildJob.Manifest)
}
return ctx.JSON(http.StatusOK, manifests)

View file

@ -345,8 +345,7 @@ func (api *API) getComposeStatus(compose store.Compose) *composeStatus {
// All jobs are "osbuild" jobs.
var result worker.OSBuildJobResult
jobStatus, _, err := api.workers.JobStatus(jobId, &result)
jobStatus, _, err := api.workers.OSBuildJobStatus(jobId, &result)
if err != nil {
panic(err)
}

View file

@ -54,7 +54,7 @@ func TestComposeStatusFromLegacyError(t *testing.T) {
err = api.workers.FinishJob(token, rawResult)
require.NoError(t, err)
jobStatus, _, err := api.workers.JobStatus(jobId, &jobResult)
jobStatus, _, err := api.workers.OSBuildJobStatus(jobId, &jobResult)
require.NoError(t, err)
state := composeStateFromJobStatus(jobStatus, &jobResult)
@ -101,7 +101,7 @@ func TestComposeStatusFromJobError(t *testing.T) {
err = api.workers.FinishJob(token, rawResult)
require.NoError(t, err)
jobStatus, _, err := api.workers.JobStatus(jobId, &jobResult)
jobStatus, _, err := api.workers.OSBuildJobStatus(jobId, &jobResult)
require.NoError(t, err)
state := composeStateFromJobStatus(jobStatus, &jobResult)

View file

@ -12,6 +12,7 @@ import (
"os"
"path"
"strconv"
"strings"
"time"
"github.com/google/uuid"
@ -123,60 +124,143 @@ func (s *Server) EnqueueManifestJobByID(job *ManifestJobByID, parent uuid.UUID)
return s.jobs.Enqueue("manifest-id-only", job, []uuid.UUID{parent})
}
func (s *Server) JobStatus(id uuid.UUID, result interface{}) (*JobStatus, []uuid.UUID, error) {
rawResult, queued, started, finished, canceled, deps, err := s.jobs.JobStatus(id)
func (s *Server) OSBuildJobStatus(id uuid.UUID, result *OSBuildJobResult) (*JobStatus, []uuid.UUID, error) {
jobType, status, deps, err := s.jobStatus(id, result)
if err != nil {
return nil, nil, err
}
if !finished.IsZero() && !canceled {
if !strings.HasPrefix(jobType, "osbuild:") { // Build jobs get automatic arch suffix: Check prefix
return nil, nil, fmt.Errorf("expected osbuild:*, found %q job instead", jobType)
}
if result.JobError == nil {
if result.OSBuildOutput == nil {
result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorBuildJob, "osbuild build failed")
} else if len(result.OSBuildOutput.Error) > 0 {
result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorOldResultCompatible, string(result.OSBuildOutput.Error))
} else if len(result.TargetErrors) > 0 {
result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorOldResultCompatible, result.TargetErrors[0])
}
}
// For backwards compatibility: OSBuildJobResult didn't use to have a
// top-level `Success` flag. Override it here by looking into the job.
if !result.Success && result.OSBuildOutput != nil {
result.Success = result.OSBuildOutput.Success && result.JobError == nil
}
return status, deps, nil
}
func (s *Server) OSBuildKojiJobStatus(id uuid.UUID, result *OSBuildKojiJobResult) (*JobStatus, []uuid.UUID, error) {
jobType, status, deps, err := s.jobStatus(id, result)
if err != nil {
return nil, nil, err
}
if !strings.HasPrefix(jobType, "osbuild-koji:") { // Build jobs get automatic arch suffix: Check prefix
return nil, nil, fmt.Errorf("expected \"osbuild-koji:*\", found %q job instead", jobType)
}
if result.JobError == nil {
if result.OSBuildOutput == nil {
result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorBuildJob, "osbuild build failed")
} else if len(result.OSBuildOutput.Error) > 0 {
result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorOldResultCompatible, string(result.OSBuildOutput.Error))
} else if result.KojiError != "" {
result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorOldResultCompatible, result.KojiError)
}
}
return status, deps, nil
}
func (s *Server) KojiInitJobStatus(id uuid.UUID, result *KojiInitJobResult) (*JobStatus, []uuid.UUID, error) {
jobType, status, deps, err := s.jobStatus(id, result)
if err != nil {
return nil, nil, err
}
if jobType != "koji-init" {
return nil, nil, fmt.Errorf("expected \"koji-init\", found %q job instead", jobType)
}
if result.JobError == nil && result.KojiError != "" {
result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorOldResultCompatible, result.KojiError)
}
return status, deps, nil
}
func (s *Server) KojiFinalizeJobStatus(id uuid.UUID, result *KojiFinalizeJobResult) (*JobStatus, []uuid.UUID, error) {
jobType, status, deps, err := s.jobStatus(id, result)
if err != nil {
return nil, nil, err
}
if jobType != "koji-finalize" {
return nil, nil, fmt.Errorf("expected \"koji-finalize\", found %q job instead", jobType)
}
if result.JobError == nil && result.KojiError != "" {
result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorOldResultCompatible, result.KojiError)
}
return status, deps, nil
}
func (s *Server) DepsolveJobStatus(id uuid.UUID, result *DepsolveJobResult) (*JobStatus, []uuid.UUID, error) {
jobType, status, deps, err := s.jobStatus(id, result)
if err != nil {
return nil, nil, err
}
if jobType != "depsolve" {
return nil, nil, fmt.Errorf("expected \"depsolve\", found %q job instead", jobType)
}
if result.JobError == nil && result.Error != "" {
if result.ErrorType == DepsolveErrorType {
result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorDNFDepsolveError, result.Error)
} else {
result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorRPMMDError, result.Error)
}
}
return status, deps, nil
}
func (s *Server) ManifestByIdJobStatus(id uuid.UUID, result *ManifestJobByIDResult) (*JobStatus, []uuid.UUID, error) {
jobType, status, deps, err := s.jobStatus(id, result)
if err != nil {
return nil, nil, err
}
if jobType != "manifest-by-id" {
return nil, nil, fmt.Errorf("expected \"koji-init\", found %q job instead", jobType)
}
if result.JobError == nil && result.Error != "" {
result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorOldResultCompatible, result.Error)
}
return status, deps, nil
}
func (s *Server) jobStatus(id uuid.UUID, result interface{}) (string, *JobStatus, []uuid.UUID, error) {
jobType, rawResult, queued, started, finished, canceled, deps, err := s.jobs.JobStatus(id)
if err != nil {
return "", nil, nil, err
}
if result != nil && !finished.IsZero() && !canceled {
err = json.Unmarshal(rawResult, result)
if err != nil {
return nil, nil, fmt.Errorf("error unmarshaling result for job '%s': %v", id, err)
return "", nil, nil, fmt.Errorf("error unmarshaling result for job '%s': %v", id, err)
}
}
switch r := result.(type) {
case *KojiInitJobResult:
if r.JobError == nil && r.KojiError != "" {
r.JobError = clienterrors.WorkerClientError(clienterrors.ErrorOldResultCompatible, r.KojiError)
}
case *KojiFinalizeJobResult:
if r.JobError == nil && r.KojiError != "" {
r.JobError = clienterrors.WorkerClientError(clienterrors.ErrorOldResultCompatible, r.KojiError)
}
case *OSBuildKojiJobResult:
if r.JobError == nil && r.KojiError != "" {
r.JobError = clienterrors.WorkerClientError(clienterrors.ErrorOldResultCompatible, r.KojiError)
}
case *ManifestJobByIDResult:
if r.JobError == nil && r.Error != "" {
r.JobError = clienterrors.WorkerClientError(clienterrors.ErrorOldResultCompatible, r.Error)
}
case *DepsolveJobResult:
if r.JobError == nil && r.Error != "" {
if r.ErrorType == DepsolveErrorType {
r.JobError = clienterrors.WorkerClientError(clienterrors.ErrorDNFDepsolveError, r.Error)
} else {
r.JobError = clienterrors.WorkerClientError(clienterrors.ErrorRPMMDError, r.Error)
}
}
case *OSBuildJobResult:
if r.JobError == nil && len(r.TargetErrors) > 0 || (r.OSBuildOutput != nil && len(r.OSBuildOutput.Error) > 0) {
if r.OSBuildOutput != nil && len(r.OSBuildOutput.Error) > 0 {
r.JobError = clienterrors.WorkerClientError(clienterrors.ErrorOldResultCompatible, string(r.OSBuildOutput.Error))
} else if len(r.TargetErrors) > 0 {
r.JobError = clienterrors.WorkerClientError(clienterrors.ErrorOldResultCompatible, r.TargetErrors[0])
}
}
// For backwards compatibility: OSBuildJobResult didn't use to have a
// top-level `Success` flag. Override it here by looking into the job.
if !r.Success && r.OSBuildOutput != nil {
r.Success = r.OSBuildOutput.Success && r.JobError == nil
}
}
return &JobStatus{
return jobType, &JobStatus{
Queued: queued,
Started: started,
Finished: finished,
@ -184,20 +268,48 @@ func (s *Server) JobStatus(id uuid.UUID, result interface{}) (*JobStatus, []uuid
}, deps, nil
}
// Job provides access to all the parameters of a job.
func (s *Server) Job(id uuid.UUID, job interface{}) (string, json.RawMessage, []uuid.UUID, error) {
jobType, rawArgs, deps, err := s.jobs.Job(id)
// OSBuildJob returns the parameters of an OSBuildJob
func (s *Server) OSBuildJob(id uuid.UUID, job *OSBuildJob) error {
jobType, rawArgs, _, err := s.jobs.Job(id)
if err != nil {
return "", nil, nil, err
return err
}
if job != nil {
if err := json.Unmarshal(rawArgs, job); err != nil {
return "", nil, nil, fmt.Errorf("error unmarshaling arguments for job '%s': %v", id, err)
}
if !strings.HasPrefix(jobType, "osbuild:") { // Build jobs get automatic arch suffix: Check prefix
return fmt.Errorf("expected osbuild:*, found %q job instead for job '%s'", jobType, id)
}
return jobType, rawArgs, deps, nil
if err := json.Unmarshal(rawArgs, job); err != nil {
return fmt.Errorf("error unmarshaling arguments for job '%s': %v", id, err)
}
return nil
}
// OSBuildKojiJob returns the parameters of an OSBuildKojiJob
func (s *Server) OSBuildKojiJob(id uuid.UUID, job *OSBuildKojiJob) error {
jobType, rawArgs, _, err := s.jobs.Job(id)
if err != nil {
return err
}
if !strings.HasPrefix(jobType, "osbuild-koji:") { // Build jobs get automatic arch suffix: Check prefix
return fmt.Errorf("expected osbuild-koji:*, found %q job instead for job '%s'", jobType, id)
}
if err := json.Unmarshal(rawArgs, job); err != nil {
return fmt.Errorf("error unmarshaling arguments for job '%s': %v", id, err)
}
return nil
}
// JobType returns the type of the job
func (s *Server) JobType(id uuid.UUID) (string, error) {
jobType, _, _, err := s.jobs.Job(id)
// the architecture is internally encdode in the job type, but hide that
// from this API
return strings.Split(jobType, ":")[0], err
}
func (s *Server) Cancel(id uuid.UUID) error {
@ -211,7 +323,7 @@ func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error
return nil, 0, errors.New("Artifacts not enabled")
}
status, _, err := s.JobStatus(id, &json.RawMessage{})
_, status, _, err := s.jobStatus(id, nil)
if err != nil {
return nil, 0, err
}
@ -240,7 +352,7 @@ func (s *Server) DeleteArtifacts(id uuid.UUID) error {
return errors.New("Artifacts not enabled")
}
status, _, err := s.JobStatus(id, &json.RawMessage{})
_, status, _, err := s.jobStatus(id, nil)
if err != nil {
return err
}
@ -295,7 +407,8 @@ func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string,
}
for _, depID := range depIDs {
result, _, _, _, _, _, _ := s.jobs.JobStatus(depID)
// TODO: include type of arguments
_, result, _, _, _, _, _, _ := s.jobs.JobStatus(depID)
dynamicArgs = append(dynamicArgs, result)
}
@ -336,12 +449,6 @@ func (s *Server) FinishJob(token uuid.UUID, result json.RawMessage) error {
}
}
var jobResult OSBuildJobResult
_, _, err = s.JobStatus(jobId, &jobResult)
if err != nil {
return fmt.Errorf("error finding job status: %v", err)
}
// Move artifacts from the temporary location to the final job
// location. Log any errors, but do not treat them as fatal. The job is
// already finished.
@ -470,7 +577,7 @@ func (h *apiHandlers) GetJob(ctx echo.Context, tokenstr string) error {
h.server.jobs.RefreshHeartbeat(token)
status, _, err := h.server.JobStatus(jobId, &json.RawMessage{})
_, status, _, err := h.server.jobStatus(jobId, nil)
if err != nil {
return api.HTTPErrorWithInternal(api.ErrorRetrievingJobStatus, err)
}

View file

@ -246,12 +246,9 @@ func TestArgs(t *testing.T) {
require.NotNil(t, args)
var jobArgs worker.OSBuildJob
jobType, rawArgs, deps, err := server.Job(jobId, &jobArgs)
err = server.OSBuildJob(jobId, &jobArgs)
require.NoError(t, err)
require.Equal(t, args, rawArgs)
require.Equal(t, job, jobArgs)
require.Equal(t, jobType, "osbuild:"+arch.Name())
require.Equal(t, []uuid.UUID(nil), deps)
}
func TestUpload(t *testing.T) {
@ -489,8 +486,8 @@ func TestMixedOSBuildJob(t *testing.T) {
newJobID, err := server.EnqueueOSBuild("x", &newJob)
require.NoError(err)
oldJobRead := new(worker.OSBuildJob)
_, _, _, err = server.Job(oldJobID, oldJobRead)
var oldJobRead worker.OSBuildJob
err = server.OSBuildJob(oldJobID, &oldJobRead)
require.NoError(err)
require.NotNil(oldJobRead.PipelineNames)
// OldJob gets default pipeline names when read
@ -501,8 +498,8 @@ func TestMixedOSBuildJob(t *testing.T) {
require.NotEqual(oldJob, oldJobRead)
// NewJob the same when read back
newJobRead := new(worker.OSBuildJob)
_, _, _, err = server.Job(newJobID, newJobRead)
var newJobRead worker.OSBuildJob
err = server.OSBuildJob(newJobID, &newJobRead)
require.NoError(err)
require.NotNil(newJobRead.PipelineNames)
require.Equal(newJob.PipelineNames, newJobRead.PipelineNames)
@ -557,7 +554,7 @@ func TestMixedOSBuildJob(t *testing.T) {
require.NoError(err)
oldJobResultRead := new(worker.OSBuildJobResult)
_, _, err = server.JobStatus(oldJobID, oldJobResultRead)
_, _, err = server.OSBuildJobStatus(oldJobID, oldJobResultRead)
require.NoError(err)
// oldJobResultRead should have PipelineNames now
@ -595,7 +592,7 @@ func TestMixedOSBuildJob(t *testing.T) {
require.NoError(err)
newJobResultRead := new(worker.OSBuildJobResult)
_, _, err = server.JobStatus(newJobID, newJobResultRead)
_, _, err = server.OSBuildJobStatus(newJobID, newJobResultRead)
require.NoError(err)
require.Equal(newJobResult, newJobResultRead)
}
@ -637,8 +634,8 @@ func TestMixedOSBuildKojiJob(t *testing.T) {
}
newJobID := enqueueKojiJob(&newJob)
oldJobRead := new(worker.OSBuildKojiJob)
_, _, _, err = server.Job(oldJobID, oldJobRead)
var oldJobRead worker.OSBuildKojiJob
err = server.OSBuildKojiJob(oldJobID, &oldJobRead)
require.NoError(err)
require.NotNil(oldJobRead.PipelineNames)
// OldJob gets default pipeline names when read
@ -649,8 +646,8 @@ func TestMixedOSBuildKojiJob(t *testing.T) {
require.NotEqual(oldJob, oldJobRead)
// NewJob the same when read back
newJobRead := new(worker.OSBuildKojiJob)
_, _, _, err = server.Job(newJobID, newJobRead)
var newJobRead worker.OSBuildKojiJob
err = server.OSBuildKojiJob(newJobID, &newJobRead)
require.NoError(err)
require.NotNil(newJobRead.PipelineNames)
require.Equal(newJob.PipelineNames, newJobRead.PipelineNames)
@ -715,7 +712,7 @@ func TestMixedOSBuildKojiJob(t *testing.T) {
require.NoError(err)
oldJobResultRead := new(worker.OSBuildKojiJobResult)
_, _, err = server.JobStatus(oldJobID, oldJobResultRead)
_, _, err = server.OSBuildKojiJobStatus(oldJobID, oldJobResultRead)
require.NoError(err)
// oldJobResultRead should have PipelineNames now
@ -755,7 +752,7 @@ func TestMixedOSBuildKojiJob(t *testing.T) {
require.NoError(err)
newJobResultRead := new(worker.OSBuildKojiJobResult)
_, _, err = server.JobStatus(newJobID, newJobResultRead)
_, _, err = server.OSBuildKojiJobStatus(newJobID, newJobResultRead)
require.NoError(err)
require.Equal(newJobResult, newJobResultRead)
}
@ -794,7 +791,7 @@ func TestDepsolveLegacyErrorConversion(t *testing.T) {
ErrorType: errType,
}
_, _, err = server.JobStatus(depsolveJobId, &depsolveJobResult)
_, _, err = server.DepsolveJobStatus(depsolveJobId, &depsolveJobResult)
require.NoError(t, err)
require.Equal(t, expectedResult, depsolveJobResult)
}
@ -830,14 +827,14 @@ func TestMixedOSBuildJobErrors(t *testing.T) {
require.NoError(err)
oldJobRead := new(worker.OSBuildJob)
_, _, _, err = server.Job(oldJobID, oldJobRead)
err = server.OSBuildJob(oldJobID, oldJobRead)
require.NoError(err)
// Not entirely equal
require.NotEqual(oldJob, oldJobRead)
// NewJob the same when read back
newJobRead := new(worker.OSBuildJob)
_, _, _, err = server.Job(newJobID, newJobRead)
err = server.OSBuildJob(newJobID, newJobRead)
require.NoError(err)
// Dequeue the jobs (via RequestJob) to get their tokens and update them to
@ -876,7 +873,7 @@ func TestMixedOSBuildJobErrors(t *testing.T) {
require.NoError(err)
oldJobResultRead := new(worker.OSBuildJobResult)
_, _, err = server.JobStatus(oldJobID, oldJobResultRead)
_, _, err = server.OSBuildJobStatus(oldJobID, oldJobResultRead)
require.NoError(err)
require.NotEqual(oldJobResult, oldJobResultRead)
@ -899,7 +896,7 @@ func TestMixedOSBuildJobErrors(t *testing.T) {
require.NoError(err)
newJobResultRead := new(worker.OSBuildJobResult)
_, _, err = server.JobStatus(newJobID, newJobResultRead)
_, _, err = server.OSBuildJobStatus(newJobID, newJobResultRead)
require.NoError(err)
require.Equal(newJobResult, newJobResultRead)
require.Equal(newJobResult.Success, false)
@ -943,14 +940,14 @@ func TestMixedOSBuildKojiJobErrors(t *testing.T) {
newJobID := enqueueKojiJob(&newJob)
oldJobRead := new(worker.OSBuildKojiJob)
_, _, _, err = server.Job(oldJobID, oldJobRead)
err = server.OSBuildKojiJob(oldJobID, oldJobRead)
require.NoError(err)
// Not entirely equal
require.NotEqual(oldJob, oldJobRead)
// NewJob the same when read back
newJobRead := new(worker.OSBuildKojiJob)
_, _, _, err = server.Job(newJobID, newJobRead)
err = server.OSBuildKojiJob(newJobID, newJobRead)
require.NoError(err)
// Dequeue the jobs (via RequestJob) to get their tokens and update them to
@ -998,7 +995,7 @@ func TestMixedOSBuildKojiJobErrors(t *testing.T) {
require.NoError(err)
oldJobResultRead := new(worker.OSBuildKojiJobResult)
_, _, err = server.JobStatus(oldJobID, oldJobResultRead)
_, _, err = server.OSBuildKojiJobStatus(oldJobID, oldJobResultRead)
require.NoError(err)
// oldJobResultRead should have PipelineNames now
@ -1020,7 +1017,7 @@ func TestMixedOSBuildKojiJobErrors(t *testing.T) {
require.NoError(err)
newJobResultRead := new(worker.OSBuildKojiJobResult)
_, _, err = server.JobStatus(newJobID, newJobResultRead)
_, _, err = server.OSBuildKojiJobStatus(newJobID, newJobResultRead)
require.NoError(err)
require.Equal(newJobResult, newJobResultRead)
}