worker/server: job status struct

The number of return values from the `jobStatus`
function was growing and getting out of hand. Not
all return values were being used in all cases
and so returning a single struct with the information
and status of a job makes more sense. Then in each case
the resulting fields can be used as needed.
This commit is contained in:
Gianluca Zuccarelli 2022-07-22 13:18:47 +01:00 committed by Tomáš Hozza
parent 9f4e765657
commit 967ac1c35e
7 changed files with 128 additions and 115 deletions

View file

@ -50,6 +50,13 @@ type JobStatus struct {
Canceled bool
}
type JobInfo struct {
JobType string
Channel string
JobStatus *JobStatus
Deps []uuid.UUID
}
var ErrInvalidToken = errors.New("token does not exist")
var ErrJobNotRunning = errors.New("job isn't running")
var ErrInvalidJobType = errors.New("job has invalid type")
@ -161,11 +168,11 @@ func (s *Server) JobDependencyChainErrors(id uuid.UUID) (*clienterrors.Error, er
}
var jobResult *JobResult
var jobDeps []uuid.UUID
var jobInfo *JobInfo
switch jobType {
case JobTypeOSBuild:
var osbuildJR OSBuildJobResult
_, jobDeps, err = s.OSBuildJobStatus(id, &osbuildJR)
jobInfo, err = s.OSBuildJobStatus(id, &osbuildJR)
if err != nil {
return nil, err
}
@ -173,7 +180,7 @@ func (s *Server) JobDependencyChainErrors(id uuid.UUID) (*clienterrors.Error, er
case JobTypeDepsolve:
var depsolveJR DepsolveJobResult
_, jobDeps, err = s.DepsolveJobStatus(id, &depsolveJR)
jobInfo, err = s.DepsolveJobStatus(id, &depsolveJR)
if err != nil {
return nil, err
}
@ -181,7 +188,7 @@ func (s *Server) JobDependencyChainErrors(id uuid.UUID) (*clienterrors.Error, er
case JobTypeManifestIDOnly:
var manifestJR ManifestJobByIDResult
_, jobDeps, err = s.ManifestJobStatus(id, &manifestJR)
jobInfo, err = s.ManifestJobStatus(id, &manifestJR)
if err != nil {
return nil, err
}
@ -189,7 +196,7 @@ func (s *Server) JobDependencyChainErrors(id uuid.UUID) (*clienterrors.Error, er
case JobTypeKojiInit:
var kojiInitJR KojiInitJobResult
_, jobDeps, err = s.KojiInitJobStatus(id, &kojiInitJR)
jobInfo, err = s.KojiInitJobStatus(id, &kojiInitJR)
if err != nil {
return nil, err
}
@ -197,7 +204,7 @@ func (s *Server) JobDependencyChainErrors(id uuid.UUID) (*clienterrors.Error, er
case JobTypeKojiFinalize:
var kojiFinalizeJR KojiFinalizeJobResult
_, jobDeps, err = s.KojiFinalizeJobStatus(id, &kojiFinalizeJR)
jobInfo, err = s.KojiFinalizeJobStatus(id, &kojiFinalizeJR)
if err != nil {
return nil, err
}
@ -205,7 +212,7 @@ func (s *Server) JobDependencyChainErrors(id uuid.UUID) (*clienterrors.Error, er
case JobTypeContainerResolve:
var containerResolveJR ContainerResolveJobResult
_, jobDeps, err = s.ContainerResolveJobStatus(id, &containerResolveJR)
jobInfo, err = s.ContainerResolveJobStatus(id, &containerResolveJR)
if err != nil {
return nil, err
}
@ -219,7 +226,7 @@ func (s *Server) JobDependencyChainErrors(id uuid.UUID) (*clienterrors.Error, er
depErrors := []*clienterrors.Error{}
if jobError.IsDependencyError() {
// check job's dependencies
for _, dep := range jobDeps {
for _, dep := range jobInfo.Deps {
depError, err := s.JobDependencyChainErrors(dep)
if err != nil {
return nil, err
@ -240,17 +247,17 @@ func (s *Server) JobDependencyChainErrors(id uuid.UUID) (*clienterrors.Error, er
return nil, nil
}
func (s *Server) OSBuildJobStatus(id uuid.UUID, result *OSBuildJobResult) (*JobStatus, []uuid.UUID, error) {
jobType, _, status, deps, err := s.jobStatus(id, result)
func (s *Server) OSBuildJobStatus(id uuid.UUID, result *OSBuildJobResult) (*JobInfo, error) {
jobInfo, err := s.jobStatus(id, result)
if err != nil {
return nil, nil, err
return nil, err
}
if !strings.HasPrefix(jobType, JobTypeOSBuild+":") { // Build jobs get automatic arch suffix: Check prefix
return nil, nil, fmt.Errorf("expected \"%s:*\", found %q job instead", JobTypeOSBuild, jobType)
if !strings.HasPrefix(jobInfo.JobType, JobTypeOSBuild+":") { // Build jobs get automatic arch suffix: Check prefix
return nil, fmt.Errorf("expected \"%s:*\", found %q job instead", JobTypeOSBuild, jobInfo.JobType)
}
if result.JobError == nil && !status.Finished.IsZero() {
if result.JobError == nil && !jobInfo.JobStatus.Finished.IsZero() {
if result.OSBuildOutput == nil {
result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorBuildJob, "osbuild build failed")
} else if len(result.OSBuildOutput.Error) > 0 {
@ -265,51 +272,51 @@ func (s *Server) OSBuildJobStatus(id uuid.UUID, result *OSBuildJobResult) (*JobS
result.Success = result.OSBuildOutput.Success && result.JobError == nil
}
return status, deps, nil
return jobInfo, nil
}
func (s *Server) KojiInitJobStatus(id uuid.UUID, result *KojiInitJobResult) (*JobStatus, []uuid.UUID, error) {
jobType, _, status, deps, err := s.jobStatus(id, result)
func (s *Server) KojiInitJobStatus(id uuid.UUID, result *KojiInitJobResult) (*JobInfo, error) {
jobInfo, err := s.jobStatus(id, result)
if err != nil {
return nil, nil, err
return nil, err
}
if jobType != JobTypeKojiInit {
return nil, nil, fmt.Errorf("expected %q, found %q job instead", JobTypeKojiInit, jobType)
if jobInfo.JobType != JobTypeKojiInit {
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeKojiInit, jobInfo.JobType)
}
if result.JobError == nil && result.KojiError != "" {
result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorOldResultCompatible, result.KojiError)
}
return status, deps, nil
return jobInfo, nil
}
func (s *Server) KojiFinalizeJobStatus(id uuid.UUID, result *KojiFinalizeJobResult) (*JobStatus, []uuid.UUID, error) {
jobType, _, status, deps, err := s.jobStatus(id, result)
func (s *Server) KojiFinalizeJobStatus(id uuid.UUID, result *KojiFinalizeJobResult) (*JobInfo, error) {
jobInfo, err := s.jobStatus(id, result)
if err != nil {
return nil, nil, err
return nil, err
}
if jobType != JobTypeKojiFinalize {
return nil, nil, fmt.Errorf("expected %q, found %q job instead", JobTypeKojiFinalize, jobType)
if jobInfo.JobType != JobTypeKojiFinalize {
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeKojiFinalize, jobInfo.JobType)
}
if result.JobError == nil && result.KojiError != "" {
result.JobError = clienterrors.WorkerClientError(clienterrors.ErrorOldResultCompatible, result.KojiError)
}
return status, deps, nil
return jobInfo, nil
}
func (s *Server) DepsolveJobStatus(id uuid.UUID, result *DepsolveJobResult) (*JobStatus, []uuid.UUID, error) {
jobType, _, status, deps, err := s.jobStatus(id, result)
func (s *Server) DepsolveJobStatus(id uuid.UUID, result *DepsolveJobResult) (*JobInfo, error) {
jobInfo, err := s.jobStatus(id, result)
if err != nil {
return nil, nil, err
return nil, err
}
if jobType != JobTypeDepsolve {
return nil, nil, fmt.Errorf("expected %q, found %q job instead", JobTypeDepsolve, jobType)
if jobInfo.JobType != JobTypeDepsolve {
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeDepsolve, jobInfo.JobType)
}
if result.JobError == nil && result.Error != "" {
@ -320,55 +327,60 @@ func (s *Server) DepsolveJobStatus(id uuid.UUID, result *DepsolveJobResult) (*Jo
}
}
return status, deps, nil
return jobInfo, nil
}
func (s *Server) ManifestJobStatus(id uuid.UUID, result *ManifestJobByIDResult) (*JobStatus, []uuid.UUID, error) {
jobType, _, status, deps, err := s.jobStatus(id, result)
func (s *Server) ManifestJobStatus(id uuid.UUID, result *ManifestJobByIDResult) (*JobInfo, error) {
jobInfo, err := s.jobStatus(id, result)
if err != nil {
return nil, nil, err
return nil, err
}
if jobType != JobTypeManifestIDOnly {
return nil, nil, fmt.Errorf("expected %q, found %q job instead", JobTypeManifestIDOnly, jobType)
if jobInfo.JobType != JobTypeManifestIDOnly {
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeManifestIDOnly, jobInfo.JobType)
}
return status, deps, nil
return jobInfo, nil
}
func (s *Server) ContainerResolveJobStatus(id uuid.UUID, result *ContainerResolveJobResult) (*JobStatus, []uuid.UUID, error) {
jobType, _, status, deps, err := s.jobStatus(id, result)
func (s *Server) ContainerResolveJobStatus(id uuid.UUID, result *ContainerResolveJobResult) (*JobInfo, error) {
jobInfo, err := s.jobStatus(id, result)
if err != nil {
return nil, nil, err
return nil, err
}
if jobType != JobTypeContainerResolve {
return nil, nil, fmt.Errorf("expected %q, found %q job instead", JobTypeDepsolve, jobType)
if jobInfo.JobType != JobTypeContainerResolve {
return nil, fmt.Errorf("expected %q, found %q job instead", JobTypeDepsolve, jobInfo.JobType)
}
return status, deps, nil
return jobInfo, nil
}
func (s *Server) jobStatus(id uuid.UUID, result interface{}) (string, string, *JobStatus, []uuid.UUID, error) {
func (s *Server) jobStatus(id uuid.UUID, result interface{}) (*JobInfo, error) {
jobType, channel, rawResult, queued, started, finished, canceled, deps, err := s.jobs.JobStatus(id)
if err != nil {
return "", "", nil, nil, err
return nil, err
}
if result != nil && !finished.IsZero() && !canceled {
err = json.Unmarshal(rawResult, result)
if err != nil {
return "", "", nil, nil, fmt.Errorf("error unmarshaling result for job '%s': %v", id, err)
return nil, fmt.Errorf("error unmarshaling result for job '%s': %v", id, err)
}
}
return jobType, channel, &JobStatus{
Queued: queued,
Started: started,
Finished: finished,
Canceled: canceled,
}, deps, nil
return &JobInfo{
JobType: strings.Split(jobType, ":")[0],
Channel: channel,
JobStatus: &JobStatus{
Queued: queued,
Started: started,
Finished: finished,
Canceled: canceled,
},
Deps: deps,
}, nil
}
// OSBuildJob returns the parameters of an OSBuildJob
@ -403,11 +415,11 @@ func (s *Server) JobType(id uuid.UUID) (string, error) {
}
func (s *Server) Cancel(id uuid.UUID) error {
jobType, channel, status, _, err := s.jobStatus(id, nil)
jobInfo, err := s.jobStatus(id, nil)
if err != nil {
logrus.Errorf("error getting job status: %v", err)
} else {
prometheus.CancelJobMetrics(status.Started, jobType, channel)
prometheus.CancelJobMetrics(jobInfo.JobStatus.Started, jobInfo.JobType, jobInfo.Channel)
}
return s.jobs.CancelJob(id)
}
@ -419,12 +431,12 @@ func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error
return nil, 0, errors.New("Artifacts not enabled")
}
_, _, status, _, err := s.jobStatus(id, nil)
jobInfo, err := s.jobStatus(id, nil)
if err != nil {
return nil, 0, err
}
if status.Finished.IsZero() {
if jobInfo.JobStatus.Finished.IsZero() {
return nil, 0, fmt.Errorf("Cannot access artifacts before job is finished: %s", id)
}
@ -448,12 +460,12 @@ func (s *Server) DeleteArtifacts(id uuid.UUID) error {
return errors.New("Artifacts not enabled")
}
_, _, status, _, err := s.jobStatus(id, nil)
jobInfo, err := s.jobStatus(id, nil)
if err != nil {
return err
}
if status.Finished.IsZero() {
if jobInfo.JobStatus.Finished.IsZero() {
return fmt.Errorf("Cannot delete artifacts before job is finished: %s", id)
}
@ -502,7 +514,7 @@ func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string,
return
}
jobType, channel, status, _, err := s.jobStatus(jobId, nil)
jobInfo, err := s.jobStatus(jobId, nil)
if err != nil {
logrus.Errorf("error retrieving job status: %v", err)
}
@ -511,7 +523,8 @@ func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string,
// long it has been queued for, in case it has no dependencies, or
// how long it has been since all its dependencies finished, if it
// has any.
pending := status.Queued
pending := jobInfo.JobStatus.Queued
jobType = jobInfo.JobType
for _, depID := range depIDs {
// TODO: include type of arguments
@ -536,7 +549,7 @@ func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string,
// TODO: Drop the ':$architecture' for metrics too, first prometheus queries for alerts and
// dashboards need to be adjusted.
prometheus.DequeueJobMetrics(pending, status.Started, jobType, channel)
prometheus.DequeueJobMetrics(pending, jobInfo.JobStatus.Started, jobType, jobInfo.Channel)
if jobType == JobTypeOSBuild+":"+arch {
jobType = JobTypeOSBuild
}
@ -566,13 +579,13 @@ func (s *Server) FinishJob(token uuid.UUID, result json.RawMessage) error {
}
var jobResult JobResult
jobType, channel, status, _, err := s.jobStatus(jobId, &jobResult)
jobInfo, err := s.jobStatus(jobId, &jobResult)
if err != nil {
logrus.Errorf("error finding job status: %v", err)
} else {
statusCode := clienterrors.GetStatusCode(jobResult.JobError)
arch := getBuildJobArch(jobType, result)
prometheus.FinishJobMetrics(status.Started, status.Finished, status.Canceled, jobType, channel, arch, statusCode)
arch := getBuildJobArch(jobInfo.JobType, result)
prometheus.FinishJobMetrics(jobInfo.JobStatus.Started, jobInfo.JobStatus.Finished, jobInfo.JobStatus.Canceled, jobInfo.JobType, jobInfo.Channel, arch, statusCode)
}
// Move artifacts from the temporary location to the final job
@ -723,7 +736,7 @@ func (h *apiHandlers) GetJob(ctx echo.Context, tokenstr string) error {
h.server.jobs.RefreshHeartbeat(token)
_, _, status, _, err := h.server.jobStatus(jobId, nil)
jobInfo, err := h.server.jobStatus(jobId, nil)
if err != nil {
return api.HTTPErrorWithInternal(api.ErrorRetrievingJobStatus, err)
}
@ -734,7 +747,7 @@ func (h *apiHandlers) GetJob(ctx echo.Context, tokenstr string) error {
Id: token.String(),
Kind: "JobStatus",
},
Canceled: status.Canceled,
Canceled: jobInfo.JobStatus.Canceled,
})
}

View file

@ -516,7 +516,7 @@ func TestMixedOSBuildJob(t *testing.T) {
require.NoError(err)
oldJobResultRead := new(worker.OSBuildJobResult)
_, _, err = server.OSBuildJobStatus(oldJobID, oldJobResultRead)
_, err = server.OSBuildJobStatus(oldJobID, oldJobResultRead)
require.NoError(err)
// oldJobResultRead should have PipelineNames now
@ -554,7 +554,7 @@ func TestMixedOSBuildJob(t *testing.T) {
require.NoError(err)
newJobResultRead := new(worker.OSBuildJobResult)
_, _, err = server.OSBuildJobStatus(newJobID, newJobResultRead)
_, err = server.OSBuildJobStatus(newJobID, newJobResultRead)
require.NoError(err)
require.Equal(newJobResult, newJobResultRead)
}
@ -589,7 +589,7 @@ func TestDepsolveLegacyErrorConversion(t *testing.T) {
ErrorType: errType,
}
_, _, err = server.DepsolveJobStatus(depsolveJobId, &depsolveJobResult)
_, err = server.DepsolveJobStatus(depsolveJobId, &depsolveJobResult)
require.NoError(t, err)
require.Equal(t, expectedResult, depsolveJobResult)
}