From a1cf3984dc9cc9bd5acdbde87e1f5753b2676576 Mon Sep 17 00:00:00 2001 From: Lars Karlitski Date: Tue, 26 May 2020 00:55:47 +0200 Subject: [PATCH] worker: introduce job artifact directory The `jobs/:job_id/builds/:build_id/image` route was awkward: the `:jobid` was actually weldr's compose id and `:build_id` was always `0`. Change it to `jobs/:job_id/artifacts/:name`, where `:job_id` is now a job id, and `:name` is the name of the artifact to upload. In the future, it could support uploading more than one artifact. This allows removing outputs from `store`, which is now back to being a pure JSON-store. Take care that `weldr` returns (and deletes) images from the new (or for backwards compatibility, the old) location. The `org.osbuild.local` target continues to exist as a marker for the worker to know whether it should upload artifacts. --- cmd/osbuild-composer/main.go | 12 ++-- cmd/osbuild-worker/main.go | 4 +- internal/client/unit_test.go | 9 ++- internal/mocks/rpmmd/fixtures.go | 2 +- internal/rcm/api_test.go | 2 +- internal/store/store.go | 98 +------------------------------- internal/weldr/api.go | 65 ++++++++++++++++----- internal/weldr/api_test.go | 9 ++- internal/worker/client.go | 5 +- internal/worker/server.go | 56 +++++++++++------- internal/worker/server_test.go | 6 +- 11 files changed, 122 insertions(+), 146 deletions(-) diff --git a/cmd/osbuild-composer/main.go b/cmd/osbuild-composer/main.go index 264d1be76..96022ffd5 100644 --- a/cmd/osbuild-composer/main.go +++ b/cmd/osbuild-composer/main.go @@ -128,14 +128,16 @@ func main() { log.Fatalf("cannot create jobqueue: %v", err) } - outputDir := path.Join(stateDir, "outputs") - err = os.Mkdir(outputDir, 0755) + artifactsDir := path.Join(stateDir, "artifacts") + err = os.Mkdir(artifactsDir, 0755) if err != nil && !os.IsExist(err) { - log.Fatalf("cannot create output directory: %v", err) + log.Fatalf("cannot create artifacts directory: %v", err) } - workers := worker.NewServer(logger, jobs, store.AddImageToImageUpload) - weldrAPI := weldr.New(rpm, arch, distribution, repoMap[common.CurrentArch()], logger, store, workers) + compatOutputDir := path.Join(stateDir, "outputs") + + workers := worker.NewServer(logger, jobs, artifactsDir) + weldrAPI := weldr.New(rpm, arch, distribution, repoMap[common.CurrentArch()], logger, store, workers, artifactsDir, compatOutputDir) go func() { err := workers.Serve(jobListener) diff --git a/cmd/osbuild-worker/main.go b/cmd/osbuild-worker/main.go index d80cf65f7..5c6a40ece 100644 --- a/cmd/osbuild-worker/main.go +++ b/cmd/osbuild-worker/main.go @@ -63,7 +63,7 @@ func (e *TargetsError) Error() string { return errString } -func RunJob(job *worker.Job, uploadFunc func(uuid.UUID, int, io.Reader) error) (*common.ComposeResult, error) { +func RunJob(job *worker.Job, uploadFunc func(uuid.UUID, string, io.Reader) error) (*common.ComposeResult, error) { tmpOutput, err := ioutil.TempDir("/var/tmp", "osbuild-output-*") if err != nil { return nil, fmt.Errorf("error setting up osbuild output directory: %v", err) @@ -87,7 +87,7 @@ func RunJob(job *worker.Job, uploadFunc func(uuid.UUID, int, io.Reader) error) ( continue } - err = uploadFunc(options.ComposeId, options.ImageBuildId, f) + err = uploadFunc(job.Id, options.Filename, f) if err != nil { r = append(r, err) continue diff --git a/internal/client/unit_test.go b/internal/client/unit_test.go index 880a8680a..75dc24966 100644 --- a/internal/client/unit_test.go +++ b/internal/client/unit_test.go @@ -11,6 +11,7 @@ import ( "net" "net/http" "os" + "path" "testing" "github.com/osbuild/osbuild-composer/internal/distro/fedoratest" @@ -36,6 +37,12 @@ func executeTests(m *testing.M) int { panic(err) } + artifactsDir := path.Join(tmpdir, "artifacts") + err = os.Mkdir(artifactsDir, 0755) + if err != nil { + panic(err) + } + // Create a mock API server listening on the temporary socket fixture := rpmmd_mock.BaseFixture() rpm := rpmmd_mock.NewRPMMDMock(fixture) @@ -46,7 +53,7 @@ func executeTests(m *testing.M) int { } repos := []rpmmd.RepoConfig{{Id: "test-system-repo", BaseURL: "http://example.com/test/os/test_arch"}} logger := log.New(os.Stdout, "", 0) - api := weldr.New(rpm, arch, distro, repos, logger, fixture.Store, fixture.Workers) + api := weldr.New(rpm, arch, distro, repos, logger, fixture.Store, fixture.Workers, artifactsDir, "") server := http.Server{Handler: api} defer server.Close() diff --git a/internal/mocks/rpmmd/fixtures.go b/internal/mocks/rpmmd/fixtures.go index 5858db288..edd07e993 100644 --- a/internal/mocks/rpmmd/fixtures.go +++ b/internal/mocks/rpmmd/fixtures.go @@ -53,7 +53,7 @@ func generatePackageList() rpmmd.PackageList { } func createBaseWorkersFixture() *worker.Server { - return worker.NewServer(nil, testjobqueue.New(), nil) + return worker.NewServer(nil, testjobqueue.New(), "") } func createBaseDepsolveFixture() []rpmmd.PackageSpec { diff --git a/internal/rcm/api_test.go b/internal/rcm/api_test.go index d34cc6a20..d19edd9b8 100644 --- a/internal/rcm/api_test.go +++ b/internal/rcm/api_test.go @@ -37,7 +37,7 @@ func newTestWorkerServer(t *testing.T) (*worker.Server, string) { dir, err := ioutil.TempDir("", "rcm-test-") require.NoError(t, err) - w := worker.NewServer(nil, testjobqueue.New(), nil) + w := worker.NewServer(nil, testjobqueue.New(), "") require.NotNil(t, w) return w, dir diff --git a/internal/store/store.go b/internal/store/store.go index 8e81202af..4994ff4b5 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -8,9 +8,7 @@ import ( "encoding/hex" "errors" "fmt" - "io" "log" - "os" "sort" "sync" "time" @@ -75,13 +73,8 @@ func New(stateDir *string, arch distro.Arch, log *log.Logger) *Store { var db *jsondb.JSONDatabase if stateDir != nil { - err := os.Mkdir(*stateDir+"/"+"outputs", 0700) - if err != nil && !os.IsExist(err) { - log.Fatalf("cannot create output directory") - } - db = jsondb.New(*stateDir, 0600) - _, err = db.Read(StoreDBName, &storeStruct) + _, err := db.Read(StoreDBName, &storeStruct) if err != nil { log.Fatalf("cannot read state: %v", err) } @@ -339,45 +332,6 @@ func (s *Store) GetAllComposes() map[uuid.UUID]Compose { return composes } -func (s *Store) GetImageBuildImage(composeId uuid.UUID) (io.ReadCloser, int64, error) { - c, ok := s.composes[composeId] - - if !ok { - return nil, 0, &NotFoundError{"compose does not exist"} - } - - localTargetOptions := c.ImageBuild.GetLocalTargetOptions() - if localTargetOptions == nil { - return nil, 0, &NoLocalTargetError{"compose does not have local target"} - } - - path := fmt.Sprintf("%s/%s", s.getImageBuildDirectory(composeId), localTargetOptions.Filename) - - f, err := os.Open(path) - - if err != nil { - return nil, 0, err - } - - fileInfo, err := f.Stat() - - if err != nil { - return nil, 0, err - } - - return f, fileInfo.Size(), err - -} - -func (s *Store) getComposeDirectory(composeID uuid.UUID) string { - return fmt.Sprintf("%s/outputs/%s", *s.stateDir, composeID.String()) -} - -func (s *Store) getImageBuildDirectory(composeID uuid.UUID) string { - // only one image build is supported per compose - return fmt.Sprintf("%s/0", s.getComposeDirectory(composeID)) -} - func (s *Store) PushCompose(composeID uuid.UUID, manifest *osbuild.Manifest, imageType distro.ImageType, bp *blueprint.Blueprint, size uint64, targets []*target.Target, jobId uuid.UUID) error { if _, exists := s.GetCompose(composeID); exists { panic("a compose with this id already exists") @@ -387,15 +341,6 @@ func (s *Store) PushCompose(composeID uuid.UUID, manifest *osbuild.Manifest, ima targets = []*target.Target{} } - if s.stateDir != nil { - outputDir := s.getImageBuildDirectory(composeID) - - err := os.MkdirAll(outputDir, 0755) - if err != nil { - return fmt.Errorf("cannot create output directory for job %v: %#v", composeID, err) - } - } - // FIXME: handle or comment this possible error _ = s.change(func() error { s.composes[composeID] = Compose{ @@ -459,49 +404,10 @@ func (s *Store) DeleteCompose(id uuid.UUID) error { delete(s.composes, id) - var err error - if s.stateDir != nil { - err = os.RemoveAll(s.getComposeDirectory(id)) - if err != nil { - return err - } - } - - return err + return nil }) } -func (s *Store) AddImageToImageUpload(composeID uuid.UUID, imageBuildID int, reader io.Reader) error { - if imageBuildID != 0 { - return &NotFoundError{"image build does not exist"} - } - - currentCompose, exists := s.composes[composeID] - if !exists { - return &NotFoundError{"compose does not exist"} - } - - localTargetOptions := currentCompose.ImageBuild.GetLocalTargetOptions() - if localTargetOptions == nil { - return &NoLocalTargetError{fmt.Sprintf("image upload requested for compse %s, but it has no local target", composeID.String())} - } - - path := fmt.Sprintf("%s/%s", s.getImageBuildDirectory(composeID), localTargetOptions.Filename) - f, err := os.Create(path) - - if err != nil { - return err - } - - _, err = io.Copy(f, reader) - - if err != nil { - return err - } - - return nil -} - func (s *Store) PushSource(source SourceConfig) { // FIXME: handle or comment this possible error _ = s.change(func() error { diff --git a/internal/weldr/api.go b/internal/weldr/api.go index bd5d4a864..475e54847 100644 --- a/internal/weldr/api.go +++ b/internal/weldr/api.go @@ -11,6 +11,8 @@ import ( "net" "net/http" "net/url" + "os" + "path" "regexp" "sort" "strconv" @@ -41,19 +43,24 @@ type API struct { logger *log.Logger router *httprouter.Router + + artifactsDir string + compatOutputDir string } var ValidBlueprintName = regexp.MustCompile(`^[a-zA-Z0-9._-]+$`) -func New(rpmmd rpmmd.RPMMD, arch distro.Arch, distro distro.Distro, repos []rpmmd.RepoConfig, logger *log.Logger, store *store.Store, workers *worker.Server) *API { +func New(rpmmd rpmmd.RPMMD, arch distro.Arch, distro distro.Distro, repos []rpmmd.RepoConfig, logger *log.Logger, store *store.Store, workers *worker.Server, artifactsDir, compatOutputDir string) *API { api := &API{ - store: store, - workers: workers, - rpmmd: rpmmd, - arch: arch, - distro: distro, - repos: repos, - logger: logger, + store: store, + workers: workers, + rpmmd: rpmmd, + arch: arch, + distro: distro, + repos: repos, + logger: logger, + artifactsDir: artifactsDir, + compatOutputDir: compatOutputDir, } api.router = httprouter.New() @@ -191,6 +198,32 @@ func (api *API) getComposeStatus(compose store.Compose) *composeStatus { } } +// Opens the image file for `compose`. This looks under `{artifacts}/{jobId}` +// first, and then under `{outputs}/{composeId}/{imageBuildId}` for backwards +// compatibility. +func (api *API) openImageFile(composeId uuid.UUID, compose store.Compose) (io.Reader, int64, error) { + p := path.Join(api.artifactsDir, compose.ImageBuild.JobID.String(), compose.ImageBuild.ImageType.Filename()) + + f, err := os.Open(p) + if err != nil { + if api.compatOutputDir == "" || !os.IsNotExist(err) { + return nil, 0, err + } + p = path.Join(api.compatOutputDir, composeId.String(), "0") + f, err = os.Open(p) + if err != nil { + return nil, 0, err + } + } + + info, err := f.Stat() + if err != nil { + return nil, 0, err + } + + return f, info.Size(), nil +} + func verifyRequestVersion(writer http.ResponseWriter, params httprouter.Params, minVersion uint) bool { versionString := params.ByName("version") @@ -1721,6 +1754,14 @@ func (api *API) composeDeleteHandler(writer http.ResponseWriter, request *http.R continue } + // Delete artifacts from jobs and the compat output dir. Ignore + // errors, because there's no point of reporting them to the + // client after the compose itself has already been deleted. + _ = os.RemoveAll(path.Join(api.artifactsDir, compose.ImageBuild.JobID.String())) + if api.compatOutputDir != "" { + _ = os.RemoveAll(path.Join(api.compatOutputDir, id.String())) + } + results = append(results, composeDeleteStatus{id, true}) } @@ -1966,13 +2007,11 @@ func (api *API) composeImageHandler(writer http.ResponseWriter, request *http.Re imageName := compose.ImageBuild.ImageType.Filename() imageMime := compose.ImageBuild.ImageType.MIMEType() - reader, fileSize, err := api.store.GetImageBuildImage(uuid) - - // TODO: this might return misleading error + reader, fileSize, err := api.openImageFile(uuid, compose) if err != nil { errors := responseError{ - ID: "BuildMissingFile", - Msg: fmt.Sprintf("Build %s is missing file %s!", uuidString, imageName), + ID: "InternalServerError", + Msg: fmt.Sprintf("Error accessing image file for compose %s: %v", uuid, err), } statusResponseError(writer, http.StatusBadRequest, errors) return diff --git a/internal/weldr/api_test.go b/internal/weldr/api_test.go index 0eb5ac703..c5ac0d474 100644 --- a/internal/weldr/api_test.go +++ b/internal/weldr/api_test.go @@ -4,6 +4,7 @@ import ( "archive/tar" "bytes" "io" + "io/ioutil" "math/rand" "net/http" "net/http/httptest" @@ -37,7 +38,13 @@ func createWeldrAPI(fixtureGenerator rpmmd_mock.FixtureGenerator) (*API, *store. panic(err) } - return New(rpm, arch, d, repos, nil, fixture.Store, fixture.Workers), fixture.Store + artifactsDir, err := ioutil.TempDir("", "client_test-") + if err != nil { + panic(err) + } + defer os.RemoveAll(artifactsDir) + + return New(rpm, arch, d, repos, nil, fixture.Store, fixture.Workers, artifactsDir, ""), fixture.Store } func TestBasic(t *testing.T) { diff --git a/internal/worker/client.go b/internal/worker/client.go index e325280d1..7e2f440a0 100644 --- a/internal/worker/client.go +++ b/internal/worker/client.go @@ -118,9 +118,8 @@ func (c *Client) UpdateJob(job *Job, status common.ImageBuildState, result *comm return nil } -func (c *Client) UploadImage(composeId uuid.UUID, imageBuildId int, reader io.Reader) error { - // content type doesn't really matter - url := c.createURL(fmt.Sprintf("/job-queue/v1/jobs/%s/builds/%d/image", composeId, imageBuildId)) +func (c *Client) UploadImage(job uuid.UUID, name string, reader io.Reader) error { + url := c.createURL(fmt.Sprintf("/job-queue/v1/jobs/%s/artifacts/%s", job, name)) _, err := c.client.Post(url, "application/octet-stream", reader) return err diff --git a/internal/worker/server.go b/internal/worker/server.go index 9e6eaac1a..1c6d5bf49 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -8,7 +8,8 @@ import ( "log" "net" "net/http" - "strconv" + "os" + "path" "time" "github.com/google/uuid" @@ -21,10 +22,10 @@ import ( ) type Server struct { - logger *log.Logger - jobs jobqueue.JobQueue - router *httprouter.Router - imageWriter WriteImageFunc + logger *log.Logger + jobs jobqueue.JobQueue + router *httprouter.Router + artifactsDir string } type JobStatus struct { @@ -35,13 +36,11 @@ type JobStatus struct { Result OSBuildJobResult } -type WriteImageFunc func(composeID uuid.UUID, imageBuildID int, reader io.Reader) error - -func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, imageWriter WriteImageFunc) *Server { +func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, artifactsDir string) *Server { s := &Server{ - logger: logger, - jobs: jobs, - imageWriter: imageWriter, + logger: logger, + jobs: jobs, + artifactsDir: artifactsDir, } s.router = httprouter.New() @@ -52,7 +51,7 @@ func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, imageWriter WriteImag s.router.POST("/job-queue/v1/jobs", s.addJobHandler) s.router.PATCH("/job-queue/v1/jobs/:job_id", s.updateJobHandler) - s.router.POST("/job-queue/v1/jobs/:job_id/builds/:build_id/image", s.addJobImageHandler) + s.router.POST("/job-queue/v1/jobs/:job_id/artifacts/:name", s.addJobImageHandler) return s } @@ -216,18 +215,35 @@ func (s *Server) addJobImageHandler(writer http.ResponseWriter, request *http.Re return } - imageBuildId, err := strconv.Atoi(params.ByName("build_id")) - if err != nil { - jsonErrorf(writer, http.StatusBadRequest, "cannot parse image build id: %v", err) + name := params.ByName("name") + if name == "" { + jsonErrorf(writer, http.StatusBadRequest, "invalid artifact name") return } - if s.imageWriter == nil { - _, err = io.Copy(ioutil.Discard, request.Body) - } else { - err = s.imageWriter(id, imageBuildId, request.Body) + if s.artifactsDir == "" { + _, err := io.Copy(ioutil.Discard, request.Body) + if err != nil { + jsonErrorf(writer, http.StatusInternalServerError, "error discarding artifact: %v", err) + } + return } + + err = os.Mkdir(path.Join(s.artifactsDir, id.String()), 0700) if err != nil { - jsonErrorf(writer, http.StatusInternalServerError, "%v", err) + jsonErrorf(writer, http.StatusInternalServerError, "cannot create artifact directory: %v", err) + return + } + + f, err := os.Create(path.Join(s.artifactsDir, id.String(), name)) + if err != nil { + jsonErrorf(writer, http.StatusInternalServerError, "cannot create artifact file: %v", err) + return + } + + _, err = io.Copy(f, request.Body) + if err != nil { + jsonErrorf(writer, http.StatusInternalServerError, "error writing artifact file: %v", err) + return } } diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go index 63f0f0492..1ed3d254e 100644 --- a/internal/worker/server_test.go +++ b/internal/worker/server_test.go @@ -36,7 +36,7 @@ func TestErrors(t *testing.T) { } for _, c := range cases { - server := worker.NewServer(nil, testjobqueue.New(), nil) + server := worker.NewServer(nil, testjobqueue.New(), "") test.TestRoute(t, server, false, c.Method, c.Path, c.Body, c.ExpectedStatus, "{}", "message") } } @@ -51,7 +51,7 @@ func TestCreate(t *testing.T) { if err != nil { t.Fatalf("error getting image type from arch") } - server := worker.NewServer(nil, testjobqueue.New(), nil) + server := worker.NewServer(nil, testjobqueue.New(), "") manifest, err := imageType.Manifest(nil, distro.ImageOptions{Size: imageType.Size(0)}, nil, nil, nil) if err != nil { @@ -75,7 +75,7 @@ func testUpdateTransition(t *testing.T, from, to string, expectedStatus int) { if err != nil { t.Fatalf("error getting image type from arch") } - server := worker.NewServer(nil, testjobqueue.New(), nil) + server := worker.NewServer(nil, testjobqueue.New(), "") id := uuid.Nil if from != "VOID" {