From aca748bc143bae65f6e8d97527f7480b41244d19 Mon Sep 17 00:00:00 2001 From: "Brian C. Lane" Date: Mon, 6 Nov 2023 13:21:18 -0800 Subject: [PATCH] Don't Panic in getComposeStatus and skip invalid jobs in fsjobqueue New This handles corrupt job json files by skipping them. They still exist, and errors are logged, but the system keeps working. If one or more of the json files in /var/lib/osbuild-composer/jobs/ becomes corrupt they can stop the osbuild-composer service from starting, or stop commands like 'composer-cli compose status' from working because they quit on the first error and miss any job that aren't broken. --- internal/jobqueue/fsjobqueue/fsjobqueue.go | 3 +- .../jobqueue/fsjobqueue/fsjobqueue_test.go | 14 ++ internal/mocks/rpmmd/fixtures.go | 20 +++ internal/store/fixtures.go | 149 ++++++++++++++++++ internal/weldr/api.go | 120 ++++++++++++-- internal/weldr/api_test.go | 3 + 6 files changed, 291 insertions(+), 18 deletions(-) diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index 18c81e4c7..57252cb61 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -106,7 +106,8 @@ func New(dir string) (*fsJobQueue, error) { } j, err := q.readJob(jobId) if err != nil { - return nil, err + // Skip invalid jobs, leaving them in place for later examination + continue } // If a job is running, and not cancelled, track the token diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go index aba51cc1e..fe2ce2c3d 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue_test.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue_test.go @@ -1,6 +1,8 @@ package fsjobqueue_test import ( + "os" + "path" "testing" "github.com/osbuild/osbuild-composer/pkg/jobqueue" @@ -28,3 +30,15 @@ func TestNonExistant(t *testing.T) { require.Error(t, err) require.Nil(t, q) } + +func TestJobQueueBadJSON(t *testing.T) { + dir := t.TempDir() + + // Write a purposfully invalid JSON file into the queue + err := os.WriteFile(path.Join(dir, "/4f1cf5f8-525d-46b7-aef4-33c6a919c038.json"), []byte("{invalid json content"), 0600) + require.Nil(t, err) + + q, err := fsjobqueue.New(dir) + require.Nil(t, err) + require.NotNil(t, q) +} diff --git a/internal/mocks/rpmmd/fixtures.go b/internal/mocks/rpmmd/fixtures.go index 964448904..f5c8896fb 100644 --- a/internal/mocks/rpmmd/fixtures.go +++ b/internal/mocks/rpmmd/fixtures.go @@ -1,6 +1,9 @@ package rpmmd_mock import ( + "os" + "path" + "github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue" dnfjson_mock "github.com/osbuild/osbuild-composer/internal/mocks/dnfjson" "github.com/osbuild/osbuild-composer/internal/store" @@ -64,3 +67,20 @@ func OldChangesFixture(tmpdir string) Fixture { dnfjson_mock.Base, } } + +func BadJobJSONFixture(tmpdir string) Fixture { + err := os.Mkdir(path.Join(tmpdir, "/jobs"), 0755) + if err != nil { + panic(err) + } + err = os.WriteFile(path.Join(tmpdir, "/jobs/30000000-0000-0000-0000-000000000005.json"), []byte("{invalid json content"), 0600) + if err != nil { + panic(err) + } + + return Fixture{ + store.FixtureJobs(), + createBaseWorkersFixture(path.Join(tmpdir, "/jobs")), + dnfjson_mock.Base, + } +} diff --git a/internal/store/fixtures.go b/internal/store/fixtures.go index 9e6e92b51..59f76687c 100644 --- a/internal/store/fixtures.go +++ b/internal/store/fixtures.go @@ -354,3 +354,152 @@ func FixtureOldChanges() *Store { return s } + +// Fixture to use for checking job queue files +func FixtureJobs() *Store { + var bName = "test" + var b = blueprint.Blueprint{ + Name: bName, + Version: "0.0.0", + Packages: []blueprint.Package{}, + Modules: []blueprint.Package{}, + Groups: []blueprint.Group{}, + Customizations: nil, + } + + var date = time.Date(2019, 11, 27, 13, 19, 0, 0, time.FixedZone("UTC+1", 60*60)) + + var awsTarget = &target.Target{ + Uuid: uuid.MustParse("10000000-0000-0000-0000-000000000000"), + Name: target.TargetNameAWS, + ImageName: "awsimage", + Created: date, + Status: common.IBWaiting, + Options: &target.AWSTargetOptions{ + Region: "frankfurt", + AccessKeyID: "accesskey", + SecretAccessKey: "secretkey", + Bucket: "clay", + Key: "imagekey", + }, + } + + dr := test_distro.NewRegistry() + d := dr.FromHost() + arch, err := d.GetArch(test_distro.TestArchName) + if err != nil { + panic(fmt.Sprintf("failed to get architecture %s for a test distro: %v", test_distro.TestArchName, err)) + } + imgType, err := arch.GetImageType(test_distro.TestImageTypeName) + if err != nil { + panic(fmt.Sprintf("failed to get image type %s for a test distro architecture: %v", test_distro.TestImageTypeName, err)) + } + manifest, _, err := imgType.Manifest(nil, distro.ImageOptions{}, nil, 0) + if err != nil { + panic(fmt.Sprintf("failed to create a manifest: %v", err)) + } + + mf, err := manifest.Serialize(nil, nil, nil) + if err != nil { + panic(fmt.Sprintf("failed to create a manifest: %v", err)) + } + + s := New(nil, dr, nil) + + pkgs := []rpmmd.PackageSpec{ + { + Name: "test1", + Epoch: 0, + Version: "2.11.2", + Release: "1.fc35", + Arch: test_distro.TestArchName, + }, { + Name: "test2", + Epoch: 3, + Version: "4.2.2", + Release: "1.fc35", + Arch: test_distro.TestArchName, + }} + + s.blueprints[bName] = b + s.composes = map[uuid.UUID]Compose{ + uuid.MustParse("30000000-0000-0000-0000-000000000000"): { + Blueprint: &b, + ImageBuild: ImageBuild{ + QueueStatus: common.IBWaiting, + ImageType: imgType, + Manifest: mf, + Targets: []*target.Target{awsTarget}, + JobCreated: date, + }, + Packages: []rpmmd.PackageSpec{}, + }, + uuid.MustParse("30000000-0000-0000-0000-000000000001"): { + Blueprint: &b, + ImageBuild: ImageBuild{ + QueueStatus: common.IBRunning, + ImageType: imgType, + Manifest: mf, + Targets: []*target.Target{}, + JobCreated: date, + JobStarted: date, + }, + Packages: []rpmmd.PackageSpec{}, + }, + uuid.MustParse("30000000-0000-0000-0000-000000000002"): { + Blueprint: &b, + ImageBuild: ImageBuild{ + QueueStatus: common.IBFinished, + ImageType: imgType, + Manifest: mf, + Targets: []*target.Target{awsTarget}, + JobCreated: date, + JobStarted: date, + JobFinished: date, + }, + Packages: []rpmmd.PackageSpec{}, + }, + uuid.MustParse("30000000-0000-0000-0000-000000000003"): { + Blueprint: &b, + ImageBuild: ImageBuild{ + QueueStatus: common.IBFailed, + ImageType: imgType, + Manifest: mf, + Targets: []*target.Target{awsTarget}, + JobCreated: date, + JobStarted: date, + JobFinished: date, + }, + Packages: []rpmmd.PackageSpec{}, + }, + uuid.MustParse("30000000-0000-0000-0000-000000000004"): { + Blueprint: &b, + ImageBuild: ImageBuild{ + QueueStatus: common.IBFinished, + ImageType: imgType, + Manifest: mf, + Targets: []*target.Target{awsTarget}, + JobCreated: date, + JobStarted: date, + JobFinished: date, + }, + Packages: pkgs, + }, + uuid.MustParse("30000000-0000-0000-0000-000000000005"): { + Blueprint: &b, + ImageBuild: ImageBuild{ + QueueStatus: common.IBFinished, + ImageType: imgType, + Manifest: mf, + Targets: []*target.Target{awsTarget}, + JobCreated: date, + JobStarted: date, + JobFinished: date, + JobID: uuid.MustParse("30000000-0000-0000-0000-000000000005"), + }, + Packages: pkgs, + }, + } + + return s +} diff --git a/internal/weldr/api.go b/internal/weldr/api.go index 70fadc53e..379a7dbfd 100644 --- a/internal/weldr/api.go +++ b/internal/weldr/api.go @@ -364,7 +364,7 @@ func composeStateFromJobStatus(js *worker.JobStatus, result *worker.OSBuildJobRe // Returns the state of the image in `compose` and the times the job was // queued, started, and finished. Assumes that there's only one image in the // compose. -func (api *API) getComposeStatus(compose store.Compose) *composeStatus { +func (api *API) getComposeStatus(compose store.Compose) (*composeStatus, error) { jobId := compose.ImageBuild.JobID // backwards compatibility: composes that were around before splitting @@ -388,14 +388,14 @@ func (api *API) getComposeStatus(compose store.Compose) *composeStatus { Started: compose.ImageBuild.JobStarted, Finished: compose.ImageBuild.JobFinished, Result: &osbuild.Result{}, - } + }, nil } // All jobs are "osbuild" jobs. var result worker.OSBuildJobResult jobInfo, err := api.workers.OSBuildJobInfo(jobId, &result) if err != nil { - panic(err) + return nil, err } return &composeStatus{ @@ -404,7 +404,7 @@ func (api *API) getComposeStatus(compose store.Compose) *composeStatus { Started: jobInfo.JobStatus.Started, Finished: jobInfo.JobStatus.Finished, Result: result.OSBuildOutput, - } + }, nil } // Opens the image file for `compose`. This asks the worker server for the @@ -2659,7 +2659,14 @@ func (api *API) composeDeleteHandler(writer http.ResponseWriter, request *http.R continue } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors = append(errors, composeDeleteError{ + "ComposeStatusError", + fmt.Sprintf("Error getting status of compose %s: %s", id, err), + }) + continue + } if composeStatus.State != ComposeFinished && composeStatus.State != ComposeFailed { errors = append(errors, composeDeleteError{ "BuildInWrongState", @@ -2724,7 +2731,15 @@ func (api *API) composeCancelHandler(writer http.ResponseWriter, request *http.R return } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", id, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } if composeStatus.State != ComposeWaiting && composeStatus.State != ComposeRunning { errors := responseError{ ID: "BuildInWrongState", @@ -2828,7 +2843,11 @@ func (api *API) composeQueueHandler(writer http.ResponseWriter, request *http.Re composes := api.store.GetAllComposes() for id, compose := range composes { - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + log.Printf("Error getting status of compose %s: %s", id, err) + continue + } switch composeStatus.State { case ComposeWaiting: reply.New = append(reply.New, composeToComposeEntry(id, compose, composeStatus, includeUploads)) @@ -2899,7 +2918,12 @@ func (api *API) composeStatusHandler(writer http.ResponseWriter, request *http.R if !exists { continue } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + log.Printf("Error getting status of compose %s: %s", id, err) + continue + } + if filterBlueprint != "" && compose.Blueprint.Name != filterBlueprint { continue } else if filterStatus != "" && composeStatus.State.ToString() != filterStatus { @@ -2914,7 +2938,12 @@ func (api *API) composeStatusHandler(writer http.ResponseWriter, request *http.R includeUploads := isRequestVersionAtLeast(params, 1) for _, id := range filteredUUIDs { if compose, exists := composes[id]; exists { - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + log.Printf("Error getting status of compose %s: %s", id, err) + continue + } + reply.UUIDs = append(reply.UUIDs, composeToComposeEntry(id, compose, composeStatus, includeUploads)) } } @@ -2969,7 +2998,16 @@ func (api *API) composeInfoHandler(writer http.ResponseWriter, request *http.Req reply.Blueprint = compose.Blueprint // Weldr API assumes only one image build per compose, that's why only the // 1st build is considered - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", id, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } + reply.ComposeType = compose.ImageBuild.ImageType.Name() reply.QueueStatus = composeStatus.State.ToString() reply.ImageSize = compose.ImageBuild.Size @@ -3016,7 +3054,15 @@ func (api *API) composeImageHandler(writer http.ResponseWriter, request *http.Re return } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", uuid, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } if composeStatus.State != ComposeFinished { errors := responseError{ ID: "BuildInWrongState", @@ -3074,7 +3120,15 @@ func (api *API) composeMetadataHandler(writer http.ResponseWriter, request *http return } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", uuid, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } if composeStatus.State != ComposeFinished && composeStatus.State != ComposeFailed { errors := responseError{ ID: "BuildInWrongState", @@ -3135,7 +3189,15 @@ func (api *API) composeResultsHandler(writer http.ResponseWriter, request *http. return } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", uuid, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } if composeStatus.State != ComposeFinished && composeStatus.State != ComposeFailed { errors := responseError{ ID: "BuildInWrongState", @@ -3226,7 +3288,15 @@ func (api *API) composeLogsHandler(writer http.ResponseWriter, request *http.Req return } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", id, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } if composeStatus.State != ComposeFinished && composeStatus.State != ComposeFailed { errors := responseError{ ID: "BuildInWrongState", @@ -3290,7 +3360,15 @@ func (api *API) composeLogHandler(writer http.ResponseWriter, request *http.Requ return } - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + errors := responseError{ + ID: "ComposeStatusError", + Msg: fmt.Sprintf("Error getting status of compose %s: %s", id, err), + } + statusResponseError(writer, http.StatusInternalServerError, errors) + return + } if composeStatus.State == ComposeWaiting { errors := responseError{ ID: "BuildInWrongState", @@ -3320,7 +3398,11 @@ func (api *API) composeFinishedHandler(writer http.ResponseWriter, request *http includeUploads := isRequestVersionAtLeast(params, 1) for id, compose := range api.store.GetAllComposes() { - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + log.Printf("Error getting status of compose %s: %s", id, err) + continue + } if composeStatus.State != ComposeFinished { continue } @@ -3343,7 +3425,11 @@ func (api *API) composeFailedHandler(writer http.ResponseWriter, request *http.R includeUploads := isRequestVersionAtLeast(params, 1) for id, compose := range api.store.GetAllComposes() { - composeStatus := api.getComposeStatus(compose) + composeStatus, err := api.getComposeStatus(compose) + if err != nil { + log.Printf("Error getting status of compose %s: %s", id, err) + continue + } if composeStatus.State != ComposeFailed { continue } diff --git a/internal/weldr/api_test.go b/internal/weldr/api_test.go index 9b83eb6ae..aab08bbc2 100644 --- a/internal/weldr/api_test.go +++ b/internal/weldr/api_test.go @@ -1373,6 +1373,8 @@ func TestComposeStatus(t *testing.T) { {rpmmd_mock.BaseFixture, "GET", "/api/v0/compose/status/*?status=FINISHED", ``, http.StatusOK, fmt.Sprintf(`{"uuids":[{"id":"30000000-0000-0000-0000-000000000002","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FINISHED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140},{"id":"30000000-0000-0000-0000-000000000004","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FINISHED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140}]}`, test_distro.TestImageTypeName)}, {rpmmd_mock.BaseFixture, "GET", fmt.Sprintf("/api/v0/compose/status/*?type=%s", test_distro.TestImageTypeName), ``, http.StatusOK, fmt.Sprintf(`{"uuids":[{"id":"30000000-0000-0000-0000-000000000000","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"WAITING","job_created":1574857140},{"id":"30000000-0000-0000-0000-000000000001","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"RUNNING","job_created":1574857140,"job_started":1574857140},{"id":"30000000-0000-0000-0000-000000000002","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FINISHED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140},{"id":"30000000-0000-0000-0000-000000000003","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FAILED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140},{"id":"30000000-0000-0000-0000-000000000004","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FINISHED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140}]}`, test_distro.TestImageTypeName)}, {rpmmd_mock.BaseFixture, "GET", "/api/v1/compose/status/30000000-0000-0000-0000-000000000000", ``, http.StatusOK, fmt.Sprintf(`{"uuids":[{"id":"30000000-0000-0000-0000-000000000000","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"WAITING","job_created":1574857140,"uploads":[{"uuid":"10000000-0000-0000-0000-000000000000","status":"WAITING","provider_name":"aws","image_name":"awsimage","creation_time":1574857140,"settings":{"region":"frankfurt","bucket":"clay","key":"imagekey"}}]}]}`, test_distro.TestImageTypeName)}, + {rpmmd_mock.BadJobJSONFixture, "GET", "/api/v0/compose/status/*", ``, http.StatusOK, fmt.Sprintf(`{"uuids":[{"id":"30000000-0000-0000-0000-000000000000","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"WAITING","job_created":1574857140},{"id":"30000000-0000-0000-0000-000000000001","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"RUNNING","job_created":1574857140,"job_started":1574857140},{"id":"30000000-0000-0000-0000-000000000002","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FINISHED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140},{"id":"30000000-0000-0000-0000-000000000003","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FAILED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140},{"id":"30000000-0000-0000-0000-000000000004","blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"FINISHED","job_created":1574857140,"job_started":1574857140,"job_finished":1574857140}]}`, test_distro.TestImageTypeName)}, + {rpmmd_mock.BadJobJSONFixture, "GET", "/api/v0/compose/status/30000000-0000-0000-0000-000000000005", ``, http.StatusOK, `{"uuids":[]}`}, } if len(os.Getenv("OSBUILD_COMPOSER_TEST_EXTERNAL")) > 0 { @@ -1524,6 +1526,7 @@ func TestComposeQueue(t *testing.T) { {rpmmd_mock.BaseFixture, "GET", "/api/v0/compose/queue", ``, http.StatusOK, fmt.Sprintf(`{"new":[{"blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"WAITING"}],"run":[{"blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"RUNNING"}]}`, test_distro.TestImageTypeName)}, {rpmmd_mock.BaseFixture, "GET", "/api/v1/compose/queue", ``, http.StatusOK, fmt.Sprintf(`{"new":[{"blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"WAITING","uploads":[{"uuid":"10000000-0000-0000-0000-000000000000","status":"WAITING","provider_name":"aws","image_name":"awsimage","creation_time":1574857140,"settings":{"region":"frankfurt","bucket":"clay","key":"imagekey"}}]}],"run":[{"blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"RUNNING"}]}`, test_distro.TestImageTypeName)}, {rpmmd_mock.NoComposesFixture, "GET", "/api/v0/compose/queue", ``, http.StatusOK, `{"new":[],"run":[]}`}, + {rpmmd_mock.BadJobJSONFixture, "GET", "/api/v0/compose/queue", ``, http.StatusOK, fmt.Sprintf(`{"new":[{"blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"WAITING"}],"run":[{"blueprint":"test","version":"0.0.0","compose_type":"%[1]s","image_size":0,"queue_status":"RUNNING"}]}`, test_distro.TestImageTypeName)}, } if len(os.Getenv("OSBUILD_COMPOSER_TEST_EXTERNAL")) > 0 {