worker: add JobArtifact() and DeleteJobArtifacts()

This allows removing the `artifactsDir` from `weldr.API`. It makes more
sense to deal with that directory in one place only.
This commit is contained in:
Lars Karlitski 2020-05-26 20:40:14 +02:00 committed by Ondřej Budai
parent b452a31eed
commit e06076ed8c
5 changed files with 69 additions and 39 deletions

View file

@ -137,7 +137,7 @@ func main() {
compatOutputDir := path.Join(stateDir, "outputs") compatOutputDir := path.Join(stateDir, "outputs")
workers := worker.NewServer(logger, jobs, artifactsDir) workers := worker.NewServer(logger, jobs, artifactsDir)
weldrAPI := weldr.New(rpm, arch, distribution, repoMap[common.CurrentArch()], logger, store, workers, artifactsDir, compatOutputDir) weldrAPI := weldr.New(rpm, arch, distribution, repoMap[common.CurrentArch()], logger, store, workers, compatOutputDir)
go func() { go func() {
err := workers.Serve(jobListener) err := workers.Serve(jobListener)

View file

@ -11,7 +11,6 @@ import (
"net" "net"
"net/http" "net/http"
"os" "os"
"path"
"testing" "testing"
"github.com/osbuild/osbuild-composer/internal/distro/fedoratest" "github.com/osbuild/osbuild-composer/internal/distro/fedoratest"
@ -37,12 +36,6 @@ func executeTests(m *testing.M) int {
panic(err) panic(err)
} }
artifactsDir := path.Join(tmpdir, "artifacts")
err = os.Mkdir(artifactsDir, 0755)
if err != nil {
panic(err)
}
// Create a mock API server listening on the temporary socket // Create a mock API server listening on the temporary socket
fixture := rpmmd_mock.BaseFixture() fixture := rpmmd_mock.BaseFixture()
rpm := rpmmd_mock.NewRPMMDMock(fixture) rpm := rpmmd_mock.NewRPMMDMock(fixture)
@ -53,7 +46,7 @@ func executeTests(m *testing.M) int {
} }
repos := []rpmmd.RepoConfig{{Id: "test-system-repo", BaseURL: "http://example.com/test/os/test_arch"}} repos := []rpmmd.RepoConfig{{Id: "test-system-repo", BaseURL: "http://example.com/test/os/test_arch"}}
logger := log.New(os.Stdout, "", 0) logger := log.New(os.Stdout, "", 0)
api := weldr.New(rpm, arch, distro, repos, logger, fixture.Store, fixture.Workers, artifactsDir, "") api := weldr.New(rpm, arch, distro, repos, logger, fixture.Store, fixture.Workers, "")
server := http.Server{Handler: api} server := http.Server{Handler: api}
defer server.Close() defer server.Close()

View file

@ -26,6 +26,7 @@ import (
"github.com/osbuild/osbuild-composer/internal/blueprint" "github.com/osbuild/osbuild-composer/internal/blueprint"
"github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/distro" "github.com/osbuild/osbuild-composer/internal/distro"
"github.com/osbuild/osbuild-composer/internal/jobqueue"
"github.com/osbuild/osbuild-composer/internal/rpmmd" "github.com/osbuild/osbuild-composer/internal/rpmmd"
"github.com/osbuild/osbuild-composer/internal/store" "github.com/osbuild/osbuild-composer/internal/store"
"github.com/osbuild/osbuild-composer/internal/target" "github.com/osbuild/osbuild-composer/internal/target"
@ -44,13 +45,12 @@ type API struct {
logger *log.Logger logger *log.Logger
router *httprouter.Router router *httprouter.Router
artifactsDir string
compatOutputDir string compatOutputDir string
} }
var ValidBlueprintName = regexp.MustCompile(`^[a-zA-Z0-9._-]+$`) var ValidBlueprintName = regexp.MustCompile(`^[a-zA-Z0-9._-]+$`)
func New(rpmmd rpmmd.RPMMD, arch distro.Arch, distro distro.Distro, repos []rpmmd.RepoConfig, logger *log.Logger, store *store.Store, workers *worker.Server, artifactsDir, compatOutputDir string) *API { func New(rpmmd rpmmd.RPMMD, arch distro.Arch, distro distro.Distro, repos []rpmmd.RepoConfig, logger *log.Logger, store *store.Store, workers *worker.Server, compatOutputDir string) *API {
api := &API{ api := &API{
store: store, store: store,
workers: workers, workers: workers,
@ -59,7 +59,6 @@ func New(rpmmd rpmmd.RPMMD, arch distro.Arch, distro distro.Distro, repos []rpmm
distro: distro, distro: distro,
repos: repos, repos: repos,
logger: logger, logger: logger,
artifactsDir: artifactsDir,
compatOutputDir: compatOutputDir, compatOutputDir: compatOutputDir,
} }
@ -198,30 +197,34 @@ func (api *API) getComposeStatus(compose store.Compose) *composeStatus {
} }
} }
// Opens the image file for `compose`. This looks under `{artifacts}/{jobId}` // Opens the image file for `compose`. This asks the worker server for the
// first, and then under `{outputs}/{composeId}/{imageBuildId}` for backwards // artifact first, and then falls back to looking in
// compatibility. // `{outputs}/{composeId}/{imageBuildId}` for backwards compatibility.
func (api *API) openImageFile(composeId uuid.UUID, compose store.Compose) (io.Reader, int64, error) { func (api *API) openImageFile(composeId uuid.UUID, compose store.Compose) (io.Reader, int64, error) {
p := path.Join(api.artifactsDir, compose.ImageBuild.JobID.String(), compose.ImageBuild.ImageType.Filename()) name := compose.ImageBuild.ImageType.Filename()
f, err := os.Open(p) reader, size, err := api.workers.JobArtifact(compose.ImageBuild.JobID, name)
if err != nil { if err != nil {
if api.compatOutputDir == "" || !os.IsNotExist(err) { if api.compatOutputDir == "" || err != jobqueue.ErrNotExist {
return nil, 0, err return nil, 0, err
} }
p = path.Join(api.compatOutputDir, composeId.String(), "0")
f, err = os.Open(p) p := path.Join(api.compatOutputDir, composeId.String(), "0", name)
f, err := os.Open(p)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
info, err := f.Stat()
if err != nil {
return nil, 0, err
}
reader = f
size = info.Size()
} }
info, err := f.Stat() return reader, size, nil
if err != nil {
return nil, 0, err
}
return f, info.Size(), nil
} }
func verifyRequestVersion(writer http.ResponseWriter, params httprouter.Params, minVersion uint) bool { func verifyRequestVersion(writer http.ResponseWriter, params httprouter.Params, minVersion uint) bool {
@ -1754,11 +1757,12 @@ func (api *API) composeDeleteHandler(writer http.ResponseWriter, request *http.R
continue continue
} }
// Delete artifacts from jobs and the compat output dir. Ignore // Delete artifacts from the worker server or — if that doesn't
// errors, because there's no point of reporting them to the // have this job — the compat output dir. Ignore errors,
// client after the compose itself has already been deleted. // because there's no point of reporting them to the client
_ = os.RemoveAll(path.Join(api.artifactsDir, compose.ImageBuild.JobID.String())) // after the compose itself has already been deleted.
if api.compatOutputDir != "" { err = api.workers.DeleteArtifacts(compose.ImageBuild.JobID)
if err == jobqueue.ErrNotExist && api.compatOutputDir != "" {
_ = os.RemoveAll(path.Join(api.compatOutputDir, id.String())) _ = os.RemoveAll(path.Join(api.compatOutputDir, id.String()))
} }

View file

@ -4,7 +4,6 @@ import (
"archive/tar" "archive/tar"
"bytes" "bytes"
"io" "io"
"io/ioutil"
"math/rand" "math/rand"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
@ -38,13 +37,7 @@ func createWeldrAPI(fixtureGenerator rpmmd_mock.FixtureGenerator) (*API, *store.
panic(err) panic(err)
} }
artifactsDir, err := ioutil.TempDir("", "client_test-") return New(rpm, arch, d, repos, nil, fixture.Store, fixture.Workers, ""), fixture.Store
if err != nil {
panic(err)
}
defer os.RemoveAll(artifactsDir)
return New(rpm, arch, d, repos, nil, fixture.Store, fixture.Workers, artifactsDir, ""), fixture.Store
} }
func TestBasic(t *testing.T) { func TestBasic(t *testing.T) {

View file

@ -119,6 +119,46 @@ func (s *Server) JobStatus(id uuid.UUID) (*JobStatus, error) {
}, nil }, nil
} }
// Provides access to artifacts of a job. Returns an io.Reader for the artifact
// and the artifact's size.
func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error) {
status, err := s.JobStatus(id)
if err != nil {
return nil, 0, err
}
if status.Finished.IsZero() {
return nil, 0, fmt.Errorf("Cannot access artifacts before job is finished: %s", id)
}
p := path.Join(s.artifactsDir, id.String(), name)
f, err := os.Open(p)
if err != nil {
return nil, 0, fmt.Errorf("Error accessing artifact %s for job %s: %v", name, id, err)
}
info, err := f.Stat()
if err != nil {
return nil, 0, fmt.Errorf("Error getting size of artifact %s for job %s: %v", name, id, err)
}
return f, info.Size(), nil
}
// Deletes all artifacts for job `id`.
func (s *Server) DeleteArtifacts(id uuid.UUID) error {
status, err := s.JobStatus(id)
if err != nil {
return err
}
if status.Finished.IsZero() {
return fmt.Errorf("Cannot delete artifacts before job is finished: %s", id)
}
return os.RemoveAll(path.Join(s.artifactsDir, id.String()))
}
// jsonErrorf() is similar to http.Error(), but returns the message in a json // jsonErrorf() is similar to http.Error(), but returns the message in a json
// object with a "message" field. // object with a "message" field.
func jsonErrorf(writer http.ResponseWriter, code int, message string, args ...interface{}) { func jsonErrorf(writer http.ResponseWriter, code int, message string, args ...interface{}) {