worker: introduce job artifact directory

The `jobs/:job_id/builds/:build_id/image` route was awkward: the
`:jobid` was actually weldr's compose id and `:build_id` was always `0`.

Change it to `jobs/:job_id/artifacts/:name`, where `:job_id` is now a
job id, and `:name` is the name of the artifact to upload. In the
future, it could support uploading more than one artifact.

This allows removing outputs from `store`, which is now back to being a
pure JSON-store. Take care that `weldr` returns (and deletes) images
from the new (or for backwards compatibility, the old) location.

The `org.osbuild.local` target continues to exist as a marker for the
worker to know whether it should upload artifacts.
This commit is contained in:
Lars Karlitski 2020-05-26 00:55:47 +02:00 committed by Tom Gundersen
parent 8f7a9b3439
commit a1cf3984dc
11 changed files with 122 additions and 146 deletions

View file

@ -128,14 +128,16 @@ func main() {
log.Fatalf("cannot create jobqueue: %v", err)
}
outputDir := path.Join(stateDir, "outputs")
err = os.Mkdir(outputDir, 0755)
artifactsDir := path.Join(stateDir, "artifacts")
err = os.Mkdir(artifactsDir, 0755)
if err != nil && !os.IsExist(err) {
log.Fatalf("cannot create output directory: %v", err)
log.Fatalf("cannot create artifacts directory: %v", err)
}
workers := worker.NewServer(logger, jobs, store.AddImageToImageUpload)
weldrAPI := weldr.New(rpm, arch, distribution, repoMap[common.CurrentArch()], logger, store, workers)
compatOutputDir := path.Join(stateDir, "outputs")
workers := worker.NewServer(logger, jobs, artifactsDir)
weldrAPI := weldr.New(rpm, arch, distribution, repoMap[common.CurrentArch()], logger, store, workers, artifactsDir, compatOutputDir)
go func() {
err := workers.Serve(jobListener)

View file

@ -63,7 +63,7 @@ func (e *TargetsError) Error() string {
return errString
}
func RunJob(job *worker.Job, uploadFunc func(uuid.UUID, int, io.Reader) error) (*common.ComposeResult, error) {
func RunJob(job *worker.Job, uploadFunc func(uuid.UUID, string, io.Reader) error) (*common.ComposeResult, error) {
tmpOutput, err := ioutil.TempDir("/var/tmp", "osbuild-output-*")
if err != nil {
return nil, fmt.Errorf("error setting up osbuild output directory: %v", err)
@ -87,7 +87,7 @@ func RunJob(job *worker.Job, uploadFunc func(uuid.UUID, int, io.Reader) error) (
continue
}
err = uploadFunc(options.ComposeId, options.ImageBuildId, f)
err = uploadFunc(job.Id, options.Filename, f)
if err != nil {
r = append(r, err)
continue

View file

@ -11,6 +11,7 @@ import (
"net"
"net/http"
"os"
"path"
"testing"
"github.com/osbuild/osbuild-composer/internal/distro/fedoratest"
@ -36,6 +37,12 @@ func executeTests(m *testing.M) int {
panic(err)
}
artifactsDir := path.Join(tmpdir, "artifacts")
err = os.Mkdir(artifactsDir, 0755)
if err != nil {
panic(err)
}
// Create a mock API server listening on the temporary socket
fixture := rpmmd_mock.BaseFixture()
rpm := rpmmd_mock.NewRPMMDMock(fixture)
@ -46,7 +53,7 @@ func executeTests(m *testing.M) int {
}
repos := []rpmmd.RepoConfig{{Id: "test-system-repo", BaseURL: "http://example.com/test/os/test_arch"}}
logger := log.New(os.Stdout, "", 0)
api := weldr.New(rpm, arch, distro, repos, logger, fixture.Store, fixture.Workers)
api := weldr.New(rpm, arch, distro, repos, logger, fixture.Store, fixture.Workers, artifactsDir, "")
server := http.Server{Handler: api}
defer server.Close()

View file

@ -53,7 +53,7 @@ func generatePackageList() rpmmd.PackageList {
}
func createBaseWorkersFixture() *worker.Server {
return worker.NewServer(nil, testjobqueue.New(), nil)
return worker.NewServer(nil, testjobqueue.New(), "")
}
func createBaseDepsolveFixture() []rpmmd.PackageSpec {

View file

@ -37,7 +37,7 @@ func newTestWorkerServer(t *testing.T) (*worker.Server, string) {
dir, err := ioutil.TempDir("", "rcm-test-")
require.NoError(t, err)
w := worker.NewServer(nil, testjobqueue.New(), nil)
w := worker.NewServer(nil, testjobqueue.New(), "")
require.NotNil(t, w)
return w, dir

View file

@ -8,9 +8,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"io"
"log"
"os"
"sort"
"sync"
"time"
@ -75,13 +73,8 @@ func New(stateDir *string, arch distro.Arch, log *log.Logger) *Store {
var db *jsondb.JSONDatabase
if stateDir != nil {
err := os.Mkdir(*stateDir+"/"+"outputs", 0700)
if err != nil && !os.IsExist(err) {
log.Fatalf("cannot create output directory")
}
db = jsondb.New(*stateDir, 0600)
_, err = db.Read(StoreDBName, &storeStruct)
_, err := db.Read(StoreDBName, &storeStruct)
if err != nil {
log.Fatalf("cannot read state: %v", err)
}
@ -339,45 +332,6 @@ func (s *Store) GetAllComposes() map[uuid.UUID]Compose {
return composes
}
func (s *Store) GetImageBuildImage(composeId uuid.UUID) (io.ReadCloser, int64, error) {
c, ok := s.composes[composeId]
if !ok {
return nil, 0, &NotFoundError{"compose does not exist"}
}
localTargetOptions := c.ImageBuild.GetLocalTargetOptions()
if localTargetOptions == nil {
return nil, 0, &NoLocalTargetError{"compose does not have local target"}
}
path := fmt.Sprintf("%s/%s", s.getImageBuildDirectory(composeId), localTargetOptions.Filename)
f, err := os.Open(path)
if err != nil {
return nil, 0, err
}
fileInfo, err := f.Stat()
if err != nil {
return nil, 0, err
}
return f, fileInfo.Size(), err
}
func (s *Store) getComposeDirectory(composeID uuid.UUID) string {
return fmt.Sprintf("%s/outputs/%s", *s.stateDir, composeID.String())
}
func (s *Store) getImageBuildDirectory(composeID uuid.UUID) string {
// only one image build is supported per compose
return fmt.Sprintf("%s/0", s.getComposeDirectory(composeID))
}
func (s *Store) PushCompose(composeID uuid.UUID, manifest *osbuild.Manifest, imageType distro.ImageType, bp *blueprint.Blueprint, size uint64, targets []*target.Target, jobId uuid.UUID) error {
if _, exists := s.GetCompose(composeID); exists {
panic("a compose with this id already exists")
@ -387,15 +341,6 @@ func (s *Store) PushCompose(composeID uuid.UUID, manifest *osbuild.Manifest, ima
targets = []*target.Target{}
}
if s.stateDir != nil {
outputDir := s.getImageBuildDirectory(composeID)
err := os.MkdirAll(outputDir, 0755)
if err != nil {
return fmt.Errorf("cannot create output directory for job %v: %#v", composeID, err)
}
}
// FIXME: handle or comment this possible error
_ = s.change(func() error {
s.composes[composeID] = Compose{
@ -459,47 +404,8 @@ func (s *Store) DeleteCompose(id uuid.UUID) error {
delete(s.composes, id)
var err error
if s.stateDir != nil {
err = os.RemoveAll(s.getComposeDirectory(id))
if err != nil {
return err
}
}
return err
})
}
func (s *Store) AddImageToImageUpload(composeID uuid.UUID, imageBuildID int, reader io.Reader) error {
if imageBuildID != 0 {
return &NotFoundError{"image build does not exist"}
}
currentCompose, exists := s.composes[composeID]
if !exists {
return &NotFoundError{"compose does not exist"}
}
localTargetOptions := currentCompose.ImageBuild.GetLocalTargetOptions()
if localTargetOptions == nil {
return &NoLocalTargetError{fmt.Sprintf("image upload requested for compse %s, but it has no local target", composeID.String())}
}
path := fmt.Sprintf("%s/%s", s.getImageBuildDirectory(composeID), localTargetOptions.Filename)
f, err := os.Create(path)
if err != nil {
return err
}
_, err = io.Copy(f, reader)
if err != nil {
return err
}
return nil
})
}
func (s *Store) PushSource(source SourceConfig) {

View file

@ -11,6 +11,8 @@ import (
"net"
"net/http"
"net/url"
"os"
"path"
"regexp"
"sort"
"strconv"
@ -41,11 +43,14 @@ type API struct {
logger *log.Logger
router *httprouter.Router
artifactsDir string
compatOutputDir string
}
var ValidBlueprintName = regexp.MustCompile(`^[a-zA-Z0-9._-]+$`)
func New(rpmmd rpmmd.RPMMD, arch distro.Arch, distro distro.Distro, repos []rpmmd.RepoConfig, logger *log.Logger, store *store.Store, workers *worker.Server) *API {
func New(rpmmd rpmmd.RPMMD, arch distro.Arch, distro distro.Distro, repos []rpmmd.RepoConfig, logger *log.Logger, store *store.Store, workers *worker.Server, artifactsDir, compatOutputDir string) *API {
api := &API{
store: store,
workers: workers,
@ -54,6 +59,8 @@ func New(rpmmd rpmmd.RPMMD, arch distro.Arch, distro distro.Distro, repos []rpmm
distro: distro,
repos: repos,
logger: logger,
artifactsDir: artifactsDir,
compatOutputDir: compatOutputDir,
}
api.router = httprouter.New()
@ -191,6 +198,32 @@ func (api *API) getComposeStatus(compose store.Compose) *composeStatus {
}
}
// Opens the image file for `compose`. This looks under `{artifacts}/{jobId}`
// first, and then under `{outputs}/{composeId}/{imageBuildId}` for backwards
// compatibility.
func (api *API) openImageFile(composeId uuid.UUID, compose store.Compose) (io.Reader, int64, error) {
p := path.Join(api.artifactsDir, compose.ImageBuild.JobID.String(), compose.ImageBuild.ImageType.Filename())
f, err := os.Open(p)
if err != nil {
if api.compatOutputDir == "" || !os.IsNotExist(err) {
return nil, 0, err
}
p = path.Join(api.compatOutputDir, composeId.String(), "0")
f, err = os.Open(p)
if err != nil {
return nil, 0, err
}
}
info, err := f.Stat()
if err != nil {
return nil, 0, err
}
return f, info.Size(), nil
}
func verifyRequestVersion(writer http.ResponseWriter, params httprouter.Params, minVersion uint) bool {
versionString := params.ByName("version")
@ -1721,6 +1754,14 @@ func (api *API) composeDeleteHandler(writer http.ResponseWriter, request *http.R
continue
}
// Delete artifacts from jobs and the compat output dir. Ignore
// errors, because there's no point of reporting them to the
// client after the compose itself has already been deleted.
_ = os.RemoveAll(path.Join(api.artifactsDir, compose.ImageBuild.JobID.String()))
if api.compatOutputDir != "" {
_ = os.RemoveAll(path.Join(api.compatOutputDir, id.String()))
}
results = append(results, composeDeleteStatus{id, true})
}
@ -1966,13 +2007,11 @@ func (api *API) composeImageHandler(writer http.ResponseWriter, request *http.Re
imageName := compose.ImageBuild.ImageType.Filename()
imageMime := compose.ImageBuild.ImageType.MIMEType()
reader, fileSize, err := api.store.GetImageBuildImage(uuid)
// TODO: this might return misleading error
reader, fileSize, err := api.openImageFile(uuid, compose)
if err != nil {
errors := responseError{
ID: "BuildMissingFile",
Msg: fmt.Sprintf("Build %s is missing file %s!", uuidString, imageName),
ID: "InternalServerError",
Msg: fmt.Sprintf("Error accessing image file for compose %s: %v", uuid, err),
}
statusResponseError(writer, http.StatusBadRequest, errors)
return

View file

@ -4,6 +4,7 @@ import (
"archive/tar"
"bytes"
"io"
"io/ioutil"
"math/rand"
"net/http"
"net/http/httptest"
@ -37,7 +38,13 @@ func createWeldrAPI(fixtureGenerator rpmmd_mock.FixtureGenerator) (*API, *store.
panic(err)
}
return New(rpm, arch, d, repos, nil, fixture.Store, fixture.Workers), fixture.Store
artifactsDir, err := ioutil.TempDir("", "client_test-")
if err != nil {
panic(err)
}
defer os.RemoveAll(artifactsDir)
return New(rpm, arch, d, repos, nil, fixture.Store, fixture.Workers, artifactsDir, ""), fixture.Store
}
func TestBasic(t *testing.T) {

View file

@ -118,9 +118,8 @@ func (c *Client) UpdateJob(job *Job, status common.ImageBuildState, result *comm
return nil
}
func (c *Client) UploadImage(composeId uuid.UUID, imageBuildId int, reader io.Reader) error {
// content type doesn't really matter
url := c.createURL(fmt.Sprintf("/job-queue/v1/jobs/%s/builds/%d/image", composeId, imageBuildId))
func (c *Client) UploadImage(job uuid.UUID, name string, reader io.Reader) error {
url := c.createURL(fmt.Sprintf("/job-queue/v1/jobs/%s/artifacts/%s", job, name))
_, err := c.client.Post(url, "application/octet-stream", reader)
return err

View file

@ -8,7 +8,8 @@ import (
"log"
"net"
"net/http"
"strconv"
"os"
"path"
"time"
"github.com/google/uuid"
@ -24,7 +25,7 @@ type Server struct {
logger *log.Logger
jobs jobqueue.JobQueue
router *httprouter.Router
imageWriter WriteImageFunc
artifactsDir string
}
type JobStatus struct {
@ -35,13 +36,11 @@ type JobStatus struct {
Result OSBuildJobResult
}
type WriteImageFunc func(composeID uuid.UUID, imageBuildID int, reader io.Reader) error
func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, imageWriter WriteImageFunc) *Server {
func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, artifactsDir string) *Server {
s := &Server{
logger: logger,
jobs: jobs,
imageWriter: imageWriter,
artifactsDir: artifactsDir,
}
s.router = httprouter.New()
@ -52,7 +51,7 @@ func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, imageWriter WriteImag
s.router.POST("/job-queue/v1/jobs", s.addJobHandler)
s.router.PATCH("/job-queue/v1/jobs/:job_id", s.updateJobHandler)
s.router.POST("/job-queue/v1/jobs/:job_id/builds/:build_id/image", s.addJobImageHandler)
s.router.POST("/job-queue/v1/jobs/:job_id/artifacts/:name", s.addJobImageHandler)
return s
}
@ -216,18 +215,35 @@ func (s *Server) addJobImageHandler(writer http.ResponseWriter, request *http.Re
return
}
imageBuildId, err := strconv.Atoi(params.ByName("build_id"))
if err != nil {
jsonErrorf(writer, http.StatusBadRequest, "cannot parse image build id: %v", err)
name := params.ByName("name")
if name == "" {
jsonErrorf(writer, http.StatusBadRequest, "invalid artifact name")
return
}
if s.imageWriter == nil {
_, err = io.Copy(ioutil.Discard, request.Body)
} else {
err = s.imageWriter(id, imageBuildId, request.Body)
}
if s.artifactsDir == "" {
_, err := io.Copy(ioutil.Discard, request.Body)
if err != nil {
jsonErrorf(writer, http.StatusInternalServerError, "%v", err)
jsonErrorf(writer, http.StatusInternalServerError, "error discarding artifact: %v", err)
}
return
}
err = os.Mkdir(path.Join(s.artifactsDir, id.String()), 0700)
if err != nil {
jsonErrorf(writer, http.StatusInternalServerError, "cannot create artifact directory: %v", err)
return
}
f, err := os.Create(path.Join(s.artifactsDir, id.String(), name))
if err != nil {
jsonErrorf(writer, http.StatusInternalServerError, "cannot create artifact file: %v", err)
return
}
_, err = io.Copy(f, request.Body)
if err != nil {
jsonErrorf(writer, http.StatusInternalServerError, "error writing artifact file: %v", err)
return
}
}

View file

@ -36,7 +36,7 @@ func TestErrors(t *testing.T) {
}
for _, c := range cases {
server := worker.NewServer(nil, testjobqueue.New(), nil)
server := worker.NewServer(nil, testjobqueue.New(), "")
test.TestRoute(t, server, false, c.Method, c.Path, c.Body, c.ExpectedStatus, "{}", "message")
}
}
@ -51,7 +51,7 @@ func TestCreate(t *testing.T) {
if err != nil {
t.Fatalf("error getting image type from arch")
}
server := worker.NewServer(nil, testjobqueue.New(), nil)
server := worker.NewServer(nil, testjobqueue.New(), "")
manifest, err := imageType.Manifest(nil, distro.ImageOptions{Size: imageType.Size(0)}, nil, nil, nil)
if err != nil {
@ -75,7 +75,7 @@ func testUpdateTransition(t *testing.T, from, to string, expectedStatus int) {
if err != nil {
t.Fatalf("error getting image type from arch")
}
server := worker.NewServer(nil, testjobqueue.New(), nil)
server := worker.NewServer(nil, testjobqueue.New(), "")
id := uuid.Nil
if from != "VOID" {