From e06076ed8c1e2729cb5bba3edea730d245bfbe82 Mon Sep 17 00:00:00 2001 From: Lars Karlitski Date: Tue, 26 May 2020 20:40:14 +0200 Subject: [PATCH] worker: add JobArtifact() and DeleteJobArtifacts() This allows removing the `artifactsDir` from `weldr.API`. It makes more sense to deal with that directory in one place only. --- cmd/osbuild-composer/main.go | 2 +- internal/client/unit_test.go | 9 +------ internal/weldr/api.go | 48 +++++++++++++++++++----------------- internal/weldr/api_test.go | 9 +------ internal/worker/server.go | 40 ++++++++++++++++++++++++++++++ 5 files changed, 69 insertions(+), 39 deletions(-) diff --git a/cmd/osbuild-composer/main.go b/cmd/osbuild-composer/main.go index 96022ffd5..f6dcde882 100644 --- a/cmd/osbuild-composer/main.go +++ b/cmd/osbuild-composer/main.go @@ -137,7 +137,7 @@ func main() { compatOutputDir := path.Join(stateDir, "outputs") workers := worker.NewServer(logger, jobs, artifactsDir) - weldrAPI := weldr.New(rpm, arch, distribution, repoMap[common.CurrentArch()], logger, store, workers, artifactsDir, compatOutputDir) + weldrAPI := weldr.New(rpm, arch, distribution, repoMap[common.CurrentArch()], logger, store, workers, compatOutputDir) go func() { err := workers.Serve(jobListener) diff --git a/internal/client/unit_test.go b/internal/client/unit_test.go index 75dc24966..255b2704e 100644 --- a/internal/client/unit_test.go +++ b/internal/client/unit_test.go @@ -11,7 +11,6 @@ import ( "net" "net/http" "os" - "path" "testing" "github.com/osbuild/osbuild-composer/internal/distro/fedoratest" @@ -37,12 +36,6 @@ func executeTests(m *testing.M) int { panic(err) } - artifactsDir := path.Join(tmpdir, "artifacts") - err = os.Mkdir(artifactsDir, 0755) - if err != nil { - panic(err) - } - // Create a mock API server listening on the temporary socket fixture := rpmmd_mock.BaseFixture() rpm := rpmmd_mock.NewRPMMDMock(fixture) @@ -53,7 +46,7 @@ func executeTests(m *testing.M) int { } repos := []rpmmd.RepoConfig{{Id: "test-system-repo", BaseURL: "http://example.com/test/os/test_arch"}} logger := log.New(os.Stdout, "", 0) - api := weldr.New(rpm, arch, distro, repos, logger, fixture.Store, fixture.Workers, artifactsDir, "") + api := weldr.New(rpm, arch, distro, repos, logger, fixture.Store, fixture.Workers, "") server := http.Server{Handler: api} defer server.Close() diff --git a/internal/weldr/api.go b/internal/weldr/api.go index 475e54847..f41f37cc0 100644 --- a/internal/weldr/api.go +++ b/internal/weldr/api.go @@ -26,6 +26,7 @@ import ( "github.com/osbuild/osbuild-composer/internal/blueprint" "github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/distro" + "github.com/osbuild/osbuild-composer/internal/jobqueue" "github.com/osbuild/osbuild-composer/internal/rpmmd" "github.com/osbuild/osbuild-composer/internal/store" "github.com/osbuild/osbuild-composer/internal/target" @@ -44,13 +45,12 @@ type API struct { logger *log.Logger router *httprouter.Router - artifactsDir string compatOutputDir string } var ValidBlueprintName = regexp.MustCompile(`^[a-zA-Z0-9._-]+$`) -func New(rpmmd rpmmd.RPMMD, arch distro.Arch, distro distro.Distro, repos []rpmmd.RepoConfig, logger *log.Logger, store *store.Store, workers *worker.Server, artifactsDir, compatOutputDir string) *API { +func New(rpmmd rpmmd.RPMMD, arch distro.Arch, distro distro.Distro, repos []rpmmd.RepoConfig, logger *log.Logger, store *store.Store, workers *worker.Server, compatOutputDir string) *API { api := &API{ store: store, workers: workers, @@ -59,7 +59,6 @@ func New(rpmmd rpmmd.RPMMD, arch distro.Arch, distro distro.Distro, repos []rpmm distro: distro, repos: repos, logger: logger, - artifactsDir: artifactsDir, compatOutputDir: compatOutputDir, } @@ -198,30 +197,34 @@ func (api *API) getComposeStatus(compose store.Compose) *composeStatus { } } -// Opens the image file for `compose`. This looks under `{artifacts}/{jobId}` -// first, and then under `{outputs}/{composeId}/{imageBuildId}` for backwards -// compatibility. +// Opens the image file for `compose`. This asks the worker server for the +// artifact first, and then falls back to looking in +// `{outputs}/{composeId}/{imageBuildId}` for backwards compatibility. func (api *API) openImageFile(composeId uuid.UUID, compose store.Compose) (io.Reader, int64, error) { - p := path.Join(api.artifactsDir, compose.ImageBuild.JobID.String(), compose.ImageBuild.ImageType.Filename()) + name := compose.ImageBuild.ImageType.Filename() - f, err := os.Open(p) + reader, size, err := api.workers.JobArtifact(compose.ImageBuild.JobID, name) if err != nil { - if api.compatOutputDir == "" || !os.IsNotExist(err) { + if api.compatOutputDir == "" || err != jobqueue.ErrNotExist { return nil, 0, err } - p = path.Join(api.compatOutputDir, composeId.String(), "0") - f, err = os.Open(p) + + p := path.Join(api.compatOutputDir, composeId.String(), "0", name) + f, err := os.Open(p) if err != nil { return nil, 0, err } + + info, err := f.Stat() + if err != nil { + return nil, 0, err + } + + reader = f + size = info.Size() } - info, err := f.Stat() - if err != nil { - return nil, 0, err - } - - return f, info.Size(), nil + return reader, size, nil } func verifyRequestVersion(writer http.ResponseWriter, params httprouter.Params, minVersion uint) bool { @@ -1754,11 +1757,12 @@ func (api *API) composeDeleteHandler(writer http.ResponseWriter, request *http.R continue } - // Delete artifacts from jobs and the compat output dir. Ignore - // errors, because there's no point of reporting them to the - // client after the compose itself has already been deleted. - _ = os.RemoveAll(path.Join(api.artifactsDir, compose.ImageBuild.JobID.String())) - if api.compatOutputDir != "" { + // Delete artifacts from the worker server or — if that doesn't + // have this job — the compat output dir. Ignore errors, + // because there's no point of reporting them to the client + // after the compose itself has already been deleted. + err = api.workers.DeleteArtifacts(compose.ImageBuild.JobID) + if err == jobqueue.ErrNotExist && api.compatOutputDir != "" { _ = os.RemoveAll(path.Join(api.compatOutputDir, id.String())) } diff --git a/internal/weldr/api_test.go b/internal/weldr/api_test.go index c5ac0d474..e99f24df4 100644 --- a/internal/weldr/api_test.go +++ b/internal/weldr/api_test.go @@ -4,7 +4,6 @@ import ( "archive/tar" "bytes" "io" - "io/ioutil" "math/rand" "net/http" "net/http/httptest" @@ -38,13 +37,7 @@ func createWeldrAPI(fixtureGenerator rpmmd_mock.FixtureGenerator) (*API, *store. panic(err) } - artifactsDir, err := ioutil.TempDir("", "client_test-") - if err != nil { - panic(err) - } - defer os.RemoveAll(artifactsDir) - - return New(rpm, arch, d, repos, nil, fixture.Store, fixture.Workers, artifactsDir, ""), fixture.Store + return New(rpm, arch, d, repos, nil, fixture.Store, fixture.Workers, ""), fixture.Store } func TestBasic(t *testing.T) { diff --git a/internal/worker/server.go b/internal/worker/server.go index 00f9ba418..9e5f1e8d2 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -119,6 +119,46 @@ func (s *Server) JobStatus(id uuid.UUID) (*JobStatus, error) { }, nil } +// Provides access to artifacts of a job. Returns an io.Reader for the artifact +// and the artifact's size. +func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error) { + status, err := s.JobStatus(id) + if err != nil { + return nil, 0, err + } + + if status.Finished.IsZero() { + return nil, 0, fmt.Errorf("Cannot access artifacts before job is finished: %s", id) + } + + p := path.Join(s.artifactsDir, id.String(), name) + f, err := os.Open(p) + if err != nil { + return nil, 0, fmt.Errorf("Error accessing artifact %s for job %s: %v", name, id, err) + } + + info, err := f.Stat() + if err != nil { + return nil, 0, fmt.Errorf("Error getting size of artifact %s for job %s: %v", name, id, err) + } + + return f, info.Size(), nil +} + +// Deletes all artifacts for job `id`. +func (s *Server) DeleteArtifacts(id uuid.UUID) error { + status, err := s.JobStatus(id) + if err != nil { + return err + } + + if status.Finished.IsZero() { + return fmt.Errorf("Cannot delete artifacts before job is finished: %s", id) + } + + return os.RemoveAll(path.Join(s.artifactsDir, id.String())) +} + // jsonErrorf() is similar to http.Error(), but returns the message in a json // object with a "message" field. func jsonErrorf(writer http.ResponseWriter, code int, message string, args ...interface{}) {