From b5769add2c4edd3da3ca08dc0974550ade193852 Mon Sep 17 00:00:00 2001 From: Lars Karlitski Date: Sun, 3 May 2020 17:44:22 +0200 Subject: [PATCH] store: move queue out of the store The store is responsible for two things: user state and the compose queue. This is problematic, because the rcm API has slightly different semantics from weldr and only used the queue part of the store. Also, the store is simply too complex. This commit splits the queue part out, using the new jobqueue package in both the weldr and the rcm package. The queue is saved to a new directory `queue/`. The weldr package now also has access to a worker server to enqueue and list jobs. Its store continues to track composes, but the `QueueStatus` for each compose (and image build) is deprecated. The field in `ImageBuild` is kept for backwards compatibility for composes which finished before this change, but a lot of code dealing with it in package compose is dropped. store.PushCompose() is degraded to storing a new compose. It should probably be renamed in the future. store.PopJob() is removed. Job ids are now independent of compose ids. Because of that, the local target gains ComposeId and ImageBuildId fields, because a worker cannot infer those from a job anymore. This also necessitates a change in the worker API: the job routes are changed to expect that instead of a (compose id, image build id) pair. The route that accepts built images keeps that pair, because it reports the image back to weldr. worker.Server() now interacts with a job queue instead of the store. It gains public functions that allow enqueuing an osbuild job and getting its status, because only it knows about the specific argument and result types in the job queue (OSBuildJob and OSBuildJobResult). One oddity remains: it needs to report an uploaded image to weldr. Do this with a function that's passed in for now, so that the dependency to the store can be dropped completely. The rcm API drops its dependencies to package blueprint and store, because it too interacts only with the worker server now. Fixes #342 --- cmd/osbuild-composer/main.go | 28 ++++- cmd/osbuild-worker/main.go | 9 +- internal/client/unit_test.go | 2 +- internal/compose/compose.go | 74 +++---------- internal/compose/compose_test.go | 125 ---------------------- internal/mocks/rpmmd/fixtures.go | 11 ++ internal/mocks/rpmmd/rpmmd_mock.go | 2 + internal/rcm/api.go | 28 +++-- internal/rcm/api_test.go | 40 ++++++- internal/store/store.go | 105 +++++------------- internal/target/local_target.go | 6 +- internal/weldr/api.go | 166 ++++++++++++++++++++--------- internal/weldr/api_test.go | 2 +- internal/weldr/compose.go | 36 ++++--- internal/worker/client.go | 16 ++- internal/worker/json.go | 24 ++++- internal/worker/server.go | 133 ++++++++++++++++------- internal/worker/server_test.go | 51 ++++----- 18 files changed, 415 insertions(+), 443 deletions(-) delete mode 100644 internal/compose/compose_test.go diff --git a/cmd/osbuild-composer/main.go b/cmd/osbuild-composer/main.go index abb903cbe..8a7cbeb32 100644 --- a/cmd/osbuild-composer/main.go +++ b/cmd/osbuild-composer/main.go @@ -15,6 +15,7 @@ import ( "github.com/osbuild/osbuild-composer/internal/distro/rhel81" "github.com/osbuild/osbuild-composer/internal/distro/rhel82" "github.com/osbuild/osbuild-composer/internal/distro/rhel83" + "github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue" "github.com/osbuild/osbuild-composer/internal/rcm" "github.com/osbuild/osbuild-composer/internal/common" @@ -118,11 +119,28 @@ func main() { store := store.New(&stateDir) - workerAPI := worker.NewServer(logger, store) - weldrAPI := weldr.New(rpm, arch, distribution, repoMap[common.CurrentArch()], logger, store) + queueDir := path.Join(stateDir, "jobs") + err = os.Mkdir(queueDir, 0700) + if err != nil && !os.IsExist(err) { + log.Fatalf("cannot create queue directory: %v", err) + } + + jobs, err := fsjobqueue.New(queueDir) + if err != nil { + log.Fatalf("cannot create jobqueue: %v", err) + } + + outputDir := path.Join(stateDir, "outputs") + err = os.Mkdir(outputDir, 0755) + if err != nil && !os.IsExist(err) { + log.Fatalf("cannot create output directory: %v", err) + } + + workers := worker.NewServer(logger, jobs, store.AddImageToImageUpload) + weldrAPI := weldr.New(rpm, arch, distribution, repoMap[common.CurrentArch()], logger, store, workers) go func() { - err := workerAPI.Serve(jobListener) + err := workers.Serve(jobListener) common.PanicOnError(err) }() @@ -133,7 +151,7 @@ func main() { log.Fatal("The RCM API socket unit is misconfigured. It should contain only one socket.") } rcmListener := rcmApiListeners[0] - rcmAPI := rcm.New(logger, store, rpm, distros) + rcmAPI := rcm.New(logger, workers, rpm, distros) go func() { err := rcmAPI.Serve(rcmListener) // If the RCM API fails, take down the whole process, not just a single gorutine @@ -158,7 +176,7 @@ func main() { listener := tls.NewListener(listener, tlsConfig) go func() { - err := workerAPI.Serve(listener) + err := workers.Serve(listener) common.PanicOnError(err) }() } diff --git a/cmd/osbuild-worker/main.go b/cmd/osbuild-worker/main.go index 87a8a5caa..ce1c856cf 100644 --- a/cmd/osbuild-worker/main.go +++ b/cmd/osbuild-worker/main.go @@ -12,6 +12,7 @@ import ( "os" "path" + "github.com/google/uuid" "github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/target" "github.com/osbuild/osbuild-composer/internal/upload/awsupload" @@ -62,7 +63,7 @@ func (e *TargetsError) Error() string { return errString } -func RunJob(job *worker.Job, uploadFunc func(*worker.Job, io.Reader) error) (*common.ComposeResult, error) { +func RunJob(job *worker.Job, uploadFunc func(uuid.UUID, int, io.Reader) error) (*common.ComposeResult, error) { tmpStore, err := ioutil.TempDir("/var/tmp", "osbuild-store") if err != nil { return nil, fmt.Errorf("error setting up osbuild store: %v", err) @@ -86,7 +87,7 @@ func RunJob(job *worker.Job, uploadFunc func(*worker.Job, io.Reader) error) (*co continue } - err = uploadFunc(job, f) + err = uploadFunc(options.ComposeId, options.ImageBuildId, f) if err != nil { r = append(r, err) continue @@ -101,7 +102,7 @@ func RunJob(job *worker.Job, uploadFunc func(*worker.Job, io.Reader) error) (*co } if options.Key == "" { - options.Key = job.ComposeID.String() + options.Key = job.Id.String() } _, err = a.Upload(path.Join(tmpStore, "refs", result.OutputID, options.Filename), options.Bucket, options.Key) @@ -191,7 +192,7 @@ func main() { log.Fatal(err) } - fmt.Printf("Running job %s\n", job.ComposeID.String()) + fmt.Printf("Running job %s\n", job.Id) var status common.ImageBuildState result, err := RunJob(job, client.UploadImage) diff --git a/internal/client/unit_test.go b/internal/client/unit_test.go index d2230cc50..156f70356 100644 --- a/internal/client/unit_test.go +++ b/internal/client/unit_test.go @@ -47,7 +47,7 @@ func executeTests(m *testing.M) int { } repos := []rpmmd.RepoConfig{{Id: "test-system-repo", BaseURL: "http://example.com/test/os/test_arch"}} logger := log.New(os.Stdout, "", 0) - api := weldr.New(rpm, arch, distro, repos, logger, fixture.Store) + api := weldr.New(rpm, arch, distro, repos, logger, fixture.Store, fixture.Workers) server := http.Server{Handler: api} defer server.Close() diff --git a/internal/compose/compose.go b/internal/compose/compose.go index d43e1aba0..ac56a09fd 100644 --- a/internal/compose/compose.go +++ b/internal/compose/compose.go @@ -5,6 +5,7 @@ package compose import ( "time" + "github.com/google/uuid" "github.com/osbuild/osbuild-composer/internal/blueprint" "github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/osbuild" @@ -21,15 +22,20 @@ func (ste *StateTransitionError) Error() string { // ImageBuild represents a single image build inside a compose type ImageBuild struct { - Id int `json:"id"` - QueueStatus common.ImageBuildState `json:"queue_status"` - ImageType common.ImageType `json:"image_type"` - Manifest *osbuild.Manifest `json:"manifest"` - Targets []*target.Target `json:"targets"` - JobCreated time.Time `json:"job_created"` - JobStarted time.Time `json:"job_started"` - JobFinished time.Time `json:"job_finished"` - Size uint64 `json:"size"` + Id int `json:"id"` + ImageType common.ImageType `json:"image_type"` + Manifest *osbuild.Manifest `json:"manifest"` + Targets []*target.Target `json:"targets"` + JobCreated time.Time `json:"job_created"` + JobStarted time.Time `json:"job_started"` + JobFinished time.Time `json:"job_finished"` + Size uint64 `json:"size"` + JobId uuid.UUID `json:"jobid,omitempty"` + + // Kept for backwards compatibility. Image builds which were done + // before the move to the job queue use this to store whether they + // finished successfully. + QueueStatus common.ImageBuildState `json:"queue_status,omitempty"` } // DeepCopy creates a copy of the ImageBuild structure @@ -55,6 +61,7 @@ func (ib *ImageBuild) DeepCopy() ImageBuild { JobStarted: ib.JobStarted, JobFinished: ib.JobFinished, Size: ib.Size, + JobId: ib.JobId, } } @@ -94,55 +101,6 @@ func (c *Compose) DeepCopy() Compose { } } -func anyImageBuild(fn func(common.ImageBuildState) bool, list []common.ImageBuildState) bool { - acc := false - for _, i := range list { - if fn(i) { - acc = true - } - } - return acc -} - -func allImageBuilds(fn func(common.ImageBuildState) bool, list []common.ImageBuildState) bool { - acc := true - for _, i := range list { - if !fn(i) { - acc = false - } - } - return acc -} - -// GetState returns a state of the whole compose which is derived from the states of -// individual image builds inside the compose -func (c *Compose) GetState() common.ComposeState { - var imageBuildsStates []common.ImageBuildState - for _, ib := range c.ImageBuilds { - imageBuildsStates = append(imageBuildsStates, ib.QueueStatus) - } - // In case all states are the same - if allImageBuilds(func(ib common.ImageBuildState) bool { return ib == common.IBWaiting }, imageBuildsStates) { - return common.CWaiting - } - if allImageBuilds(func(ib common.ImageBuildState) bool { return ib == common.IBFinished }, imageBuildsStates) { - return common.CFinished - } - if allImageBuilds(func(ib common.ImageBuildState) bool { return ib == common.IBFailed }, imageBuildsStates) { - return common.CFailed - } - // In case the states are mixed - // TODO: can this condition be removed because it is already covered by the default? - if anyImageBuild(func(ib common.ImageBuildState) bool { return ib == common.IBRunning }, imageBuildsStates) { - return common.CRunning - } - if allImageBuilds(func(ib common.ImageBuildState) bool { return ib == common.IBFailed || ib == common.IBFinished }, imageBuildsStates) { - return common.CFailed - } - // Default value - return common.CRunning -} - // UpdateState changes a state of a single image build inside the Compose func (c *Compose) UpdateState(imageBuildId int, newState common.ImageBuildState) error { switch newState { diff --git a/internal/compose/compose_test.go b/internal/compose/compose_test.go deleted file mode 100644 index 21f86440e..000000000 --- a/internal/compose/compose_test.go +++ /dev/null @@ -1,125 +0,0 @@ -package compose - -import ( - "github.com/osbuild/osbuild-composer/internal/common" - "testing" -) - -func TestGetState(t *testing.T) { - cases := []struct { - compose Compose - expecedStatus common.ComposeState - }{ - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBWaiting}, - }, - }, - expecedStatus: common.CWaiting, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBRunning}, - }, - }, - expecedStatus: common.CRunning, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBFailed}, - }, - }, - expecedStatus: common.CFailed, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBFinished}, - }, - }, - expecedStatus: common.CFinished, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBWaiting}, - {QueueStatus: common.IBWaiting}, - }, - }, - expecedStatus: common.CWaiting, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBWaiting}, - {QueueStatus: common.IBRunning}, - }, - }, - expecedStatus: common.CRunning, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBRunning}, - {QueueStatus: common.IBRunning}, - }, - }, - expecedStatus: common.CRunning, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBRunning}, - {QueueStatus: common.IBFailed}, - }, - }, - expecedStatus: common.CRunning, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBWaiting}, - {QueueStatus: common.IBFailed}, - }, - }, - expecedStatus: common.CRunning, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBFailed}, - {QueueStatus: common.IBFailed}, - }, - }, - expecedStatus: common.CFailed, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBFinished}, - {QueueStatus: common.IBFinished}, - }, - }, - expecedStatus: common.CFinished, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBFinished}, - {QueueStatus: common.IBFailed}, - }, - }, - expecedStatus: common.CFailed, - }, - } - for n, c := range cases { - got := c.compose.GetState() - wanted := c.expecedStatus - if got != wanted { - t.Error("Compose", n, "should be in", wanted.ToString(), "state, but it is:", got.ToString()) - } - } -} diff --git a/internal/mocks/rpmmd/fixtures.go b/internal/mocks/rpmmd/fixtures.go index e34ebee37..2600af9dd 100644 --- a/internal/mocks/rpmmd/fixtures.go +++ b/internal/mocks/rpmmd/fixtures.go @@ -7,6 +7,8 @@ import ( "github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/compose" + "github.com/osbuild/osbuild-composer/internal/jobqueue/testjobqueue" + "github.com/osbuild/osbuild-composer/internal/worker" "github.com/google/uuid" "github.com/osbuild/osbuild-composer/internal/blueprint" @@ -150,6 +152,10 @@ func createBaseStoreFixture() *store.Store { return s } +func createBaseWorkersFixture() *worker.Server { + return worker.NewServer(nil, testjobqueue.New(), nil) +} + func createBaseDepsolveFixture() []rpmmd.PackageSpec { return []rpmmd.PackageSpec{ { @@ -207,6 +213,7 @@ func BaseFixture() Fixture { nil, }, createBaseStoreFixture(), + createBaseWorkersFixture(), } } @@ -223,6 +230,7 @@ func NoComposesFixture() Fixture { nil, }, createStoreWithoutComposesFixture(), + createBaseWorkersFixture(), } } @@ -242,6 +250,7 @@ func NonExistingPackage() Fixture { }, }, createBaseStoreFixture(), + createBaseWorkersFixture(), } } @@ -261,6 +270,7 @@ func BadDepsolve() Fixture { }, }, createBaseStoreFixture(), + createBaseWorkersFixture(), } } @@ -283,5 +293,6 @@ func BadFetch() Fixture { }, }, createBaseStoreFixture(), + createBaseWorkersFixture(), } } diff --git a/internal/mocks/rpmmd/rpmmd_mock.go b/internal/mocks/rpmmd/rpmmd_mock.go index 3cf3013a0..49049f72c 100644 --- a/internal/mocks/rpmmd/rpmmd_mock.go +++ b/internal/mocks/rpmmd/rpmmd_mock.go @@ -3,6 +3,7 @@ package rpmmd_mock import ( "github.com/osbuild/osbuild-composer/internal/rpmmd" "github.com/osbuild/osbuild-composer/internal/store" + "github.com/osbuild/osbuild-composer/internal/worker" ) type fetchPackageList struct { @@ -20,6 +21,7 @@ type Fixture struct { fetchPackageList depsolve *store.Store + Workers *worker.Server } type rpmmdMock struct { diff --git a/internal/rcm/api.go b/internal/rcm/api.go index f57f4b377..b677dfd3a 100644 --- a/internal/rcm/api.go +++ b/internal/rcm/api.go @@ -10,19 +10,18 @@ import ( "net/http" "github.com/osbuild/osbuild-composer/internal/rpmmd" + "github.com/osbuild/osbuild-composer/internal/worker" "github.com/google/uuid" "github.com/julienschmidt/httprouter" - "github.com/osbuild/osbuild-composer/internal/blueprint" "github.com/osbuild/osbuild-composer/internal/distro" - "github.com/osbuild/osbuild-composer/internal/store" ) // API encapsulates RCM-specific API that is exposed over a separate TCP socket type API struct { - logger *log.Logger - store *store.Store - router *httprouter.Router + logger *log.Logger + workers *worker.Server + router *httprouter.Router // rpmMetadata is an interface to dnf-json and we include it here so that we can // mock it in the unit tests rpmMetadata rpmmd.RPMMD @@ -30,10 +29,10 @@ type API struct { } // New creates new RCM API -func New(logger *log.Logger, store *store.Store, rpmMetadata rpmmd.RPMMD, distros *distro.Registry) *API { +func New(logger *log.Logger, workers *worker.Server, rpmMetadata rpmmd.RPMMD, distros *distro.Registry) *API { api := &API{ logger: logger, - store: store, + workers: workers, router: httprouter.New(), rpmMetadata: rpmMetadata, distros: distros, @@ -215,8 +214,7 @@ func (api *API) submit(writer http.ResponseWriter, request *http.Request, _ http return } - // Push the requested compose to the store - composeID, err := api.store.PushCompose(manifest, imageType.Name(), &blueprint.Blueprint{}, 0, nil) + composeID, err := api.workers.Enqueue(manifest, nil) if err != nil { if api.logger != nil { api.logger.Println("RCM API failed to push compose:", err) @@ -254,22 +252,20 @@ func (api *API) status(writer http.ResponseWriter, request *http.Request, params } // Check that the compose exists - compose, exists := api.store.GetCompose(id) - if !exists { + status, _, err := api.workers.JobResult(id) + if err != nil { writer.WriteHeader(http.StatusBadRequest) - errorReason.Error = "Compose UUID does not exist" + errorReason.Error = err.Error() // TODO: handle error _ = json.NewEncoder(writer).Encode(errorReason) return } // JSON structure with success response - var reply struct { + type reply struct { Status string `json:"status"` } - // TODO: return per-job status like Koji does (requires changes in the store) - reply.Status = compose.GetState().ToString() // TODO: handle error - _ = json.NewEncoder(writer).Encode(reply) + _ = json.NewEncoder(writer).Encode(reply{Status: status.ToString()}) } diff --git a/internal/rcm/api_test.go b/internal/rcm/api_test.go index 7f3a9597e..d34cc6a20 100644 --- a/internal/rcm/api_test.go +++ b/internal/rcm/api_test.go @@ -6,15 +6,18 @@ import ( "io/ioutil" "net/http" "net/http/httptest" + "os" "regexp" "testing" "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/osbuild/osbuild-composer/internal/jobqueue/testjobqueue" distro_mock "github.com/osbuild/osbuild-composer/internal/mocks/distro" rpmmd_mock "github.com/osbuild/osbuild-composer/internal/mocks/rpmmd" "github.com/osbuild/osbuild-composer/internal/rcm" - "github.com/osbuild/osbuild-composer/internal/store" + "github.com/osbuild/osbuild-composer/internal/worker" ) type API interface { @@ -30,6 +33,21 @@ func internalRequest(api API, method, path, body, contentType string) *http.Resp return resp.Result() } +func newTestWorkerServer(t *testing.T) (*worker.Server, string) { + dir, err := ioutil.TempDir("", "rcm-test-") + require.NoError(t, err) + + w := worker.NewServer(nil, testjobqueue.New(), nil) + require.NotNil(t, w) + + return w, dir +} + +func cleanupTempDir(t *testing.T, dir string) { + err := os.RemoveAll(dir) + require.NoError(t, err) +} + func TestBasicRcmAPI(t *testing.T) { // Test the HTTP API responses // This test mainly focuses on HTTP status codes and JSON structures, not necessarily on their content @@ -47,14 +65,18 @@ func TestBasicRcmAPI(t *testing.T) { {"POST", "/v1/compose", `{"status":"RUNNING"}`, "text/plain", http.StatusBadRequest, ``}, {"POST", "/v1/compose", `{"image_builds":[]}`, "application/json", http.StatusBadRequest, ""}, {"POST", "/v1/compose/111-222-333", `{"status":"RUNNING"}`, "application/json", http.StatusMethodNotAllowed, ``}, - {"GET", "/v1/compose/7802c476-9cd1-41b7-ba81-43c1906bce73", `{"status":"RUNNING"}`, "application/json", http.StatusBadRequest, `{"error_reason":"Compose UUID does not exist"}`}, + {"GET", "/v1/compose/7802c476-9cd1-41b7-ba81-43c1906bce73", `{"status":"RUNNING"}`, "application/json", http.StatusBadRequest, ``}, } registry, err := distro_mock.NewDefaultRegistry() if err != nil { t.Fatal(err) } - api := rcm.New(nil, store.New(nil), rpmmd_mock.NewRPMMDMock(rpmmd_mock.BaseFixture()), registry) + + workers, dir := newTestWorkerServer(t) + defer cleanupTempDir(t, dir) + + api := rcm.New(nil, workers, rpmmd_mock.NewRPMMDMock(rpmmd_mock.BaseFixture()), registry) for _, c := range cases { resp := internalRequest(api, c.Method, c.Path, c.Body, c.ContentType) @@ -79,7 +101,11 @@ func TestSubmit(t *testing.T) { if err != nil { t.Fatal(err) } - api := rcm.New(nil, store.New(nil), rpmmd_mock.NewRPMMDMock(rpmmd_mock.BaseFixture()), registry) + + workers, dir := newTestWorkerServer(t) + defer cleanupTempDir(t, dir) + + api := rcm.New(nil, workers, rpmmd_mock.NewRPMMDMock(rpmmd_mock.BaseFixture()), registry) var submit_reply struct { UUID uuid.UUID `json:"compose_id"` @@ -207,7 +233,11 @@ func TestStatus(t *testing.T) { if err != nil { t.Fatal(err) } - api := rcm.New(nil, store.New(nil), rpmmd_mock.NewRPMMDMock(rpmmd_mock.BaseFixture()), registry) + + workers, dir := newTestWorkerServer(t) + defer cleanupTempDir(t, dir) + + api := rcm.New(nil, workers, rpmmd_mock.NewRPMMDMock(rpmmd_mock.BaseFixture()), registry) var submit_reply struct { UUID uuid.UUID `json:"compose_id"` diff --git a/internal/store/store.go b/internal/store/store.go index 1d7904ca5..a10e1aa79 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -84,14 +84,6 @@ func (e *NotPendingError) Error() string { return e.message } -type NotRunningError struct { - message string -} - -func (e *NotRunningError) Error() string { - return e.message -} - type InvalidRequestError struct { message string } @@ -136,27 +128,23 @@ func New(stateDir *string) *Store { if s.Composes == nil { s.Composes = make(map[uuid.UUID]compose.Compose) } else { + // Backwards compatibility: fail all builds that are queued or + // running. Jobs status is now handled outside of the store + // (and the compose). The fields are kept so that previously + // succeeded builds still show up correctly. for composeID, compose := range s.Composes { if len(compose.ImageBuilds) == 0 { panic("the was a compose with zero image builds, that is forbidden") } for imgID, imgBuild := range compose.ImageBuilds { switch imgBuild.QueueStatus { - case common.IBRunning: - // We do not support resuming an in-flight build + case common.IBRunning, common.IBWaiting: compose.ImageBuilds[imgID].QueueStatus = common.IBFailed s.Composes[composeID] = compose - case common.IBWaiting: - // Push waiting composes back into the pending jobs queue - s.pendingJobs <- Job{ - ComposeID: composeID, - ImageBuildID: imgBuild.Id, - Manifest: imgBuild.Manifest, - Targets: imgBuild.Targets, - } } } } + } if s.Sources == nil { s.Sources = make(map[string]SourceConfig) @@ -501,13 +489,15 @@ func (s *Store) getImageBuildDirectory(composeID uuid.UUID, imageBuildID int) st return fmt.Sprintf("%s/%d", s.getComposeDirectory(composeID), imageBuildID) } -func (s *Store) PushCompose(manifest *osbuild.Manifest, imageType distro.ImageType, bp *blueprint.Blueprint, size uint64, targets []*target.Target) (uuid.UUID, error) { +func (s *Store) PushCompose(composeID uuid.UUID, manifest *osbuild.Manifest, imageType distro.ImageType, bp *blueprint.Blueprint, size uint64, targets []*target.Target, jobId uuid.UUID) error { + if _, exists := s.GetCompose(composeID); exists { + panic("a compose with this id already exists") + } + if targets == nil { targets = []*target.Target{} } - composeID := uuid.New() - // Compatibility layer for image types in Weldr API v0 imageTypeCommon, exists := common.ImageTypeFromCompatString(imageType.Name()) if !exists { @@ -519,7 +509,7 @@ func (s *Store) PushCompose(manifest *osbuild.Manifest, imageType distro.ImageTy err := os.MkdirAll(outputDir, 0755) if err != nil { - return uuid.Nil, fmt.Errorf("cannot create output directory for job %v: %#v", composeID, err) + return fmt.Errorf("cannot create output directory for job %v: %#v", composeID, err) } } @@ -529,37 +519,28 @@ func (s *Store) PushCompose(manifest *osbuild.Manifest, imageType distro.ImageTy Blueprint: bp, ImageBuilds: []compose.ImageBuild{ { - QueueStatus: common.IBWaiting, - Manifest: manifest, - ImageType: imageTypeCommon, - Targets: targets, - JobCreated: time.Now(), - Size: size, + Manifest: manifest, + ImageType: imageTypeCommon, + Targets: targets, + JobCreated: time.Now(), + Size: size, + JobId: jobId, }, }, } return nil }) - s.pendingJobs <- Job{ - ComposeID: composeID, - ImageBuildID: 0, - Manifest: manifest, - Targets: targets, - } - - return composeID, nil + return nil } // PushTestCompose is used for testing // Set testSuccess to create a fake successful compose, otherwise it will create a failed compose // It does not actually run a compose job -func (s *Store) PushTestCompose(manifest *osbuild.Manifest, imageType distro.ImageType, bp *blueprint.Blueprint, size uint64, targets []*target.Target, testSuccess bool) (uuid.UUID, error) { +func (s *Store) PushTestCompose(composeID uuid.UUID, manifest *osbuild.Manifest, imageType distro.ImageType, bp *blueprint.Blueprint, size uint64, targets []*target.Target, testSuccess bool) error { if targets == nil { targets = []*target.Target{} } - composeID := uuid.New() - // Compatibility layer for image types in Weldr API v0 imageTypeCommon, exists := common.ImageTypeFromCompatString(imageType.Name()) if !exists { @@ -571,7 +552,7 @@ func (s *Store) PushTestCompose(manifest *osbuild.Manifest, imageType distro.Ima err := os.MkdirAll(outputDir, 0755) if err != nil { - return uuid.Nil, fmt.Errorf("cannot create output directory for job %v: %#v", composeID, err) + return fmt.Errorf("cannot create output directory for job %v: %#v", composeID, err) } } @@ -607,33 +588,20 @@ func (s *Store) PushTestCompose(manifest *osbuild.Manifest, imageType distro.Ima // Instead of starting the job, immediately set a final status err := s.UpdateImageBuildInCompose(composeID, 0, status, &result) if err != nil { - return uuid.Nil, err + return err } - return composeID, nil + return nil } // DeleteCompose deletes the compose from the state file and also removes all files on disk that are // associated with this compose func (s *Store) DeleteCompose(id uuid.UUID) error { return s.change(func() error { - compose, exists := s.Composes[id] - - if !exists { + if _, exists := s.Composes[id]; !exists { return &NotFoundError{} } - // If any of the image builds have build artifacts, remove them - invalidRequest := true - for _, imageBuild := range compose.ImageBuilds { - if imageBuild.QueueStatus == common.IBFinished || imageBuild.QueueStatus == common.IBFailed { - invalidRequest = false - } - } - if invalidRequest { - return &InvalidRequestError{fmt.Sprintf("Compose %s is not in FINISHED or FAILED.", id)} - } - delete(s.Composes, id) var err error @@ -648,31 +616,6 @@ func (s *Store) DeleteCompose(id uuid.UUID) error { }) } -// PopJob returns a job from the job queue and changes the status of the corresponding image build to running -func (s *Store) PopJob() Job { - job := <-s.pendingJobs - // FIXME: handle or comment this possible error - _ = s.change(func() error { - // Get the compose from the map - compose, exists := s.Composes[job.ComposeID] - // Check that it exists - if !exists { - panic("Invalid job in queue.") - } - // Change queue status to running for the image build as well as for the targets - compose.ImageBuilds[job.ImageBuildID].QueueStatus = common.IBRunning - compose.ImageBuilds[job.ImageBuildID].JobStarted = time.Now() - for m := range compose.ImageBuilds[job.ImageBuildID].Targets { - compose.ImageBuilds[job.ImageBuildID].Targets[m].Status = common.IBRunning - } - // Replace the compose struct with the new one - // TODO: I'm not sure this is needed, but I don't know what is the golang semantics in this case - s.Composes[job.ComposeID] = compose - return nil - }) - return job -} - // UpdateImageBuildInCompose sets the status and optionally also the final image. func (s *Store) UpdateImageBuildInCompose(composeID uuid.UUID, imageBuildID int, status common.ImageBuildState, result *common.ComposeResult) error { return s.change(func() error { diff --git a/internal/target/local_target.go b/internal/target/local_target.go index 48f25ccb2..6e57f5d20 100644 --- a/internal/target/local_target.go +++ b/internal/target/local_target.go @@ -1,7 +1,11 @@ package target +import "github.com/google/uuid" + type LocalTargetOptions struct { - Filename string `json:"filename"` + ComposeId uuid.UUID `json:"compose_id"` + ImageBuildId int `json:"image_build_id"` + Filename string `json:"filename"` } func (LocalTargetOptions) isTargetOptions() {} diff --git a/internal/weldr/api.go b/internal/weldr/api.go index a0fa422ec..f080493f4 100644 --- a/internal/weldr/api.go +++ b/internal/weldr/api.go @@ -14,6 +14,7 @@ import ( "sort" "strconv" "strings" + "time" "github.com/BurntSushi/toml" "github.com/google/uuid" @@ -21,14 +22,17 @@ import ( "github.com/osbuild/osbuild-composer/internal/blueprint" "github.com/osbuild/osbuild-composer/internal/common" + "github.com/osbuild/osbuild-composer/internal/compose" "github.com/osbuild/osbuild-composer/internal/distro" "github.com/osbuild/osbuild-composer/internal/rpmmd" "github.com/osbuild/osbuild-composer/internal/store" "github.com/osbuild/osbuild-composer/internal/target" + "github.com/osbuild/osbuild-composer/internal/worker" ) type API struct { - store *store.Store + store *store.Store + workers *worker.Server rpmmd rpmmd.RPMMD arch distro.Arch @@ -39,14 +43,15 @@ type API struct { router *httprouter.Router } -func New(rpmmd rpmmd.RPMMD, arch distro.Arch, distro distro.Distro, repos []rpmmd.RepoConfig, logger *log.Logger, store *store.Store) *API { +func New(rpmmd rpmmd.RPMMD, arch distro.Arch, distro distro.Distro, repos []rpmmd.RepoConfig, logger *log.Logger, store *store.Store, workers *worker.Server) *API { api := &API{ - store: store, - rpmmd: rpmmd, - arch: arch, - distro: distro, - repos: repos, - logger: logger, + store: store, + workers: workers, + rpmmd: rpmmd, + arch: arch, + distro: distro, + repos: repos, + logger: logger, } api.router = httprouter.New() @@ -135,6 +140,40 @@ func (api *API) ServeHTTP(writer http.ResponseWriter, request *http.Request) { api.router.ServeHTTP(writer, request) } +// Returns the state of the image in `compose` and the times the job was +// queued, started, and finished. Assumes that there's only one image in the +// compose. Returns CWaiting on error. +func (api *API) getComposeState(compose compose.Compose) (state common.ComposeState, queued, started, finished time.Time) { + if len(compose.ImageBuilds) == 0 { + return + } + + jobId := compose.ImageBuilds[0].JobId + + // backwards compatibility: composes that were around before splitting + // the job queue from the store still contain their valid status and + // times. Return those here as a fallback. + if jobId == uuid.Nil { + switch compose.ImageBuilds[0].QueueStatus { + case common.IBWaiting: + state = common.CWaiting + case common.IBRunning: + state = common.CRunning + case common.IBFinished: + state = common.CFinished + case common.IBFailed: + state = common.CFailed + } + queued = compose.ImageBuilds[0].JobCreated + started = compose.ImageBuilds[0].JobStarted + finished = compose.ImageBuilds[0].JobFinished + return + } + + state, queued, started, finished, _ = api.workers.JobStatus(jobId) + return +} + func verifyRequestVersion(writer http.ResponseWriter, params httprouter.Params, minVersion uint) bool { versionString := params.ByName("version") @@ -1433,6 +1472,8 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request return } + composeID := uuid.New() + var targets []*target.Target if isRequestVersionAtLeast(params, 1) && cr.Upload != nil { t := uploadRequestToTarget(*cr.Upload, imageType) @@ -1441,7 +1482,9 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request targets = append(targets, target.NewLocalTarget( &target.LocalTargetOptions{ - Filename: imageType.Filename(), + ComposeId: composeID, + ImageBuildId: 0, + Filename: imageType.Filename(), }, )) @@ -1465,8 +1508,6 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request return } - var composeID uuid.UUID - // Check for test parameter q, err := url.ParseQuery(request.URL.RawQuery) if err != nil { @@ -1492,12 +1533,17 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request testMode := q.Get("test") if testMode == "1" { // Create a failed compose - composeID, err = api.store.PushTestCompose(manifest, imageType, bp, size, targets, false) + err = api.store.PushTestCompose(composeID, manifest, imageType, bp, size, targets, false) } else if testMode == "2" { // Create a successful compose - composeID, err = api.store.PushTestCompose(manifest, imageType, bp, size, targets, true) + err = api.store.PushTestCompose(composeID, manifest, imageType, bp, size, targets, true) } else { - composeID, err = api.store.PushCompose(manifest, imageType, bp, size, targets) + var jobId uuid.UUID + + jobId, err = api.workers.Enqueue(manifest, targets) + if err == nil { + err = api.store.PushCompose(composeID, manifest, imageType, bp, size, targets, jobId) + } } // TODO: we should probably do some kind of blueprint validation in future @@ -1549,29 +1595,34 @@ func (api *API) composeDeleteHandler(writer http.ResponseWriter, request *http.R continue } - err = api.store.DeleteCompose(id) - - if err != nil { - switch err.(type) { - case *store.NotFoundError: - errors = append(errors, composeDeleteError{ - "UnknownUUID", - fmt.Sprintf("compose %s doesn't exist", id), - }) - case *store.InvalidRequestError: - errors = append(errors, composeDeleteError{ - "BuildInWrongState", - err.Error(), - }) - default: - errors = append(errors, composeDeleteError{ - "ComposeError", - fmt.Sprintf("%s: %s", id, err.Error()), - }) - } - } else { - results = append(results, composeDeleteStatus{id, true}) + compose, exists := api.store.GetCompose(id) + if !exists { + errors = append(errors, composeDeleteError{ + "UnknownUUID", + fmt.Sprintf("compose %s doesn't exist", id), + }) + continue } + + state, _, _, _ := api.getComposeState(compose) + if state != common.CFinished && state != common.CFailed { + errors = append(errors, composeDeleteError{ + "BuildInWrongState", + fmt.Sprintf("Compose %s is not in FINISHED or FAILED.", id), + }) + continue + } + + err = api.store.DeleteCompose(id) + if err != nil { + errors = append(errors, composeDeleteError{ + "ComposeError", + fmt.Sprintf("%s: %s", id, err.Error()), + }) + continue + } + + results = append(results, composeDeleteStatus{id, true}) } reply := struct { @@ -1614,13 +1665,16 @@ func (api *API) composeQueueHandler(writer http.ResponseWriter, request *http.Re Run []*ComposeEntry `json:"run"` }{[]*ComposeEntry{}, []*ComposeEntry{}} + includeUploads := isRequestVersionAtLeast(params, 1) + composes := api.store.GetAllComposes() for id, compose := range composes { - switch compose.GetState() { + state, queued, started, finished := api.getComposeState(compose) + switch state { case common.CWaiting: - reply.New = append(reply.New, composeToComposeEntry(id, compose, isRequestVersionAtLeast(params, 1))) + reply.New = append(reply.New, composeToComposeEntry(id, compose, common.CWaiting, queued, started, finished, includeUploads)) case common.CRunning: - reply.Run = append(reply.Run, composeToComposeEntry(id, compose, isRequestVersionAtLeast(params, 1))) + reply.Run = append(reply.Run, composeToComposeEntry(id, compose, common.CRunning, queued, started, finished, includeUploads)) } } @@ -1682,9 +1736,10 @@ func (api *API) composeStatusHandler(writer http.ResponseWriter, request *http.R if !exists { continue } + state, _, _, _ := api.getComposeState(compose) if filterBlueprint != "" && compose.Blueprint.Name != filterBlueprint { continue - } else if filterStatus != "" && compose.ImageBuilds[0].QueueStatus.ToString() != filterStatus { + } else if filterStatus != "" && state.ToString() != filterStatus { continue } else if filterImageTypeExists && compose.ImageBuilds[0].ImageType != filterImageType { continue @@ -1696,7 +1751,8 @@ func (api *API) composeStatusHandler(writer http.ResponseWriter, request *http.R includeUploads := isRequestVersionAtLeast(params, 1) for _, id := range filteredUUIDs { if compose, exists := composes[id]; exists { - reply.UUIDs = append(reply.UUIDs, composeToComposeEntry(id, compose, includeUploads)) + state, queued, started, finished := api.getComposeState(compose) + reply.UUIDs = append(reply.UUIDs, composeToComposeEntry(id, compose, state, queued, started, finished, includeUploads)) } } sortComposeEntries(reply.UUIDs) @@ -1755,8 +1811,9 @@ func (api *API) composeInfoHandler(writer http.ResponseWriter, request *http.Req } // Weldr API assumes only one image build per compose, that's why only the // 1st build is considered + state, _, _, _ := api.getComposeState(compose) reply.ComposeType, _ = compose.ImageBuilds[0].ImageType.ToCompatString() - reply.QueueStatus = compose.GetState().ToString() + reply.QueueStatus = state.ToString() reply.ImageSize = compose.ImageBuilds[0].Size if isRequestVersionAtLeast(params, 1) { @@ -1793,10 +1850,11 @@ func (api *API) composeImageHandler(writer http.ResponseWriter, request *http.Re return } - if compose.GetState() != common.CFinished { + state, _, _, _ := api.getComposeState(compose) + if state != common.CFinished { errors := responseError{ ID: "BuildInWrongState", - Msg: fmt.Sprintf("Build %s is in wrong state: %s", uuidString, compose.GetState().ToString()), + Msg: fmt.Sprintf("Build %s is in wrong state: %s", uuidString, state.ToString()), } statusResponseError(writer, http.StatusBadRequest, errors) return @@ -1863,7 +1921,8 @@ func (api *API) composeLogsHandler(writer http.ResponseWriter, request *http.Req return } - if compose.GetState() != common.CFinished && compose.GetState() != common.CFailed { + state, _, _, _ := api.getComposeState(compose) + if state != common.CFinished && state != common.CFailed { errors := responseError{ ID: "BuildInWrongState", Msg: fmt.Sprintf("Build %s not in FINISHED or FAILED state.", uuidString), @@ -1950,7 +2009,8 @@ func (api *API) composeLogHandler(writer http.ResponseWriter, request *http.Requ return } - if compose.GetState() == common.CWaiting { + state, _, _, _ := api.getComposeState(compose) + if state == common.CWaiting { errors := responseError{ ID: "BuildInWrongState", Msg: fmt.Sprintf("Build %s has not started yet. No logs to view.", uuidString), @@ -1959,7 +2019,7 @@ func (api *API) composeLogHandler(writer http.ResponseWriter, request *http.Requ return } - if compose.GetState() == common.CRunning { + if state == common.CRunning { fmt.Fprintf(writer, "Running...\n") return } @@ -2004,9 +2064,11 @@ func (api *API) composeFinishedHandler(writer http.ResponseWriter, request *http includeUploads := isRequestVersionAtLeast(params, 1) for id, compose := range api.store.GetAllComposes() { - if compose.ImageBuilds[0].QueueStatus == common.IBFinished { - reply.Finished = append(reply.Finished, composeToComposeEntry(id, compose, includeUploads)) + state, queued, started, finished := api.getComposeState(compose) + if state != common.CFinished { + continue } + reply.Finished = append(reply.Finished, composeToComposeEntry(id, compose, common.CFinished, queued, started, finished, includeUploads)) } sortComposeEntries(reply.Finished) @@ -2025,9 +2087,11 @@ func (api *API) composeFailedHandler(writer http.ResponseWriter, request *http.R includeUploads := isRequestVersionAtLeast(params, 1) for id, compose := range api.store.GetAllComposes() { - if compose.ImageBuilds[0].QueueStatus == common.IBFailed { - reply.Failed = append(reply.Failed, composeToComposeEntry(id, compose, includeUploads)) + state, queued, started, finished := api.getComposeState(compose) + if state != common.CFailed { + continue } + reply.Failed = append(reply.Failed, composeToComposeEntry(id, compose, common.CFailed, queued, started, finished, includeUploads)) } sortComposeEntries(reply.Failed) diff --git a/internal/weldr/api_test.go b/internal/weldr/api_test.go index 749fd8802..b33bc1db9 100644 --- a/internal/weldr/api_test.go +++ b/internal/weldr/api_test.go @@ -38,7 +38,7 @@ func createWeldrAPI(fixtureGenerator rpmmd_mock.FixtureGenerator) (*API, *store. panic(err) } - return New(rpm, arch, d, repos, nil, fixture.Store), fixture.Store + return New(rpm, arch, d, repos, nil, fixture.Store, fixture.Workers), fixture.Store } func TestBasic(t *testing.T) { diff --git a/internal/weldr/compose.go b/internal/weldr/compose.go index c2649c0ef..ca0f7f6a3 100644 --- a/internal/weldr/compose.go +++ b/internal/weldr/compose.go @@ -2,6 +2,7 @@ package weldr import ( "sort" + "time" "github.com/google/uuid" "github.com/osbuild/osbuild-composer/internal/common" @@ -21,37 +22,40 @@ type ComposeEntry struct { Uploads []uploadResponse `json:"uploads,omitempty"` } -func composeToComposeEntry(id uuid.UUID, compose compose.Compose, includeUploads bool) *ComposeEntry { +func composeToComposeEntry(id uuid.UUID, compose compose.Compose, state common.ComposeState, queued, started, finished time.Time, includeUploads bool) *ComposeEntry { var composeEntry ComposeEntry composeEntry.ID = id composeEntry.Blueprint = compose.Blueprint.Name composeEntry.Version = compose.Blueprint.Version composeEntry.ComposeType = compose.ImageBuilds[0].ImageType - composeEntry.QueueStatus = compose.ImageBuilds[0].QueueStatus if includeUploads { composeEntry.Uploads = targetsToUploadResponses(compose.ImageBuilds[0].Targets) } - switch compose.ImageBuilds[0].QueueStatus { - case common.IBWaiting: - composeEntry.JobCreated = float64(compose.ImageBuilds[0].JobCreated.UnixNano()) / 1000000000 + switch state { + case common.CWaiting: + composeEntry.QueueStatus = common.IBWaiting + composeEntry.JobCreated = float64(queued.UnixNano()) / 1000000000 - case common.IBRunning: - composeEntry.JobCreated = float64(compose.ImageBuilds[0].JobCreated.UnixNano()) / 1000000000 - composeEntry.JobStarted = float64(compose.ImageBuilds[0].JobStarted.UnixNano()) / 1000000000 + case common.CRunning: + composeEntry.QueueStatus = common.IBRunning + composeEntry.JobCreated = float64(queued.UnixNano()) / 1000000000 + composeEntry.JobStarted = float64(started.UnixNano()) / 1000000000 - case common.IBFinished: + case common.CFinished: + composeEntry.QueueStatus = common.IBFinished composeEntry.ImageSize = compose.ImageBuilds[0].Size - composeEntry.JobCreated = float64(compose.ImageBuilds[0].JobCreated.UnixNano()) / 1000000000 - composeEntry.JobStarted = float64(compose.ImageBuilds[0].JobStarted.UnixNano()) / 1000000000 - composeEntry.JobFinished = float64(compose.ImageBuilds[0].JobFinished.UnixNano()) / 1000000000 + composeEntry.JobCreated = float64(queued.UnixNano()) / 1000000000 + composeEntry.JobStarted = float64(started.UnixNano()) / 1000000000 + composeEntry.JobFinished = float64(finished.UnixNano()) / 1000000000 - case common.IBFailed: - composeEntry.JobCreated = float64(compose.ImageBuilds[0].JobCreated.UnixNano()) / 1000000000 - composeEntry.JobStarted = float64(compose.ImageBuilds[0].JobStarted.UnixNano()) / 1000000000 - composeEntry.JobFinished = float64(compose.ImageBuilds[0].JobFinished.UnixNano()) / 1000000000 + case common.CFailed: + composeEntry.QueueStatus = common.IBFailed + composeEntry.JobCreated = float64(queued.UnixNano()) / 1000000000 + composeEntry.JobStarted = float64(started.UnixNano()) / 1000000000 + composeEntry.JobFinished = float64(finished.UnixNano()) / 1000000000 default: panic("invalid compose state") } diff --git a/internal/worker/client.go b/internal/worker/client.go index 385ea9141..e325280d1 100644 --- a/internal/worker/client.go +++ b/internal/worker/client.go @@ -25,10 +25,9 @@ type Client struct { } type Job struct { - ComposeID uuid.UUID - ImageBuildID int - Manifest *osbuild.Manifest - Targets []*target.Target + Id uuid.UUID + Manifest *osbuild.Manifest + Targets []*target.Target } func NewClient(address string, conf *tls.Config) *Client { @@ -86,8 +85,7 @@ func (c *Client) AddJob() (*Job, error) { } return &Job{ - jr.ComposeID, - jr.ImageBuildID, + jr.Id, jr.Manifest, jr.Targets, }, nil @@ -99,7 +97,7 @@ func (c *Client) UpdateJob(job *Job, status common.ImageBuildState, result *comm if err != nil { panic(err) } - urlPath := fmt.Sprintf("/job-queue/v1/jobs/%s/builds/%d", job.ComposeID.String(), job.ImageBuildID) + urlPath := fmt.Sprintf("/job-queue/v1/jobs/%s", job.Id) url := c.createURL(urlPath) req, err := http.NewRequest("PATCH", url, &b) if err != nil { @@ -120,9 +118,9 @@ func (c *Client) UpdateJob(job *Job, status common.ImageBuildState, result *comm return nil } -func (c *Client) UploadImage(job *Job, reader io.Reader) error { +func (c *Client) UploadImage(composeId uuid.UUID, imageBuildId int, reader io.Reader) error { // content type doesn't really matter - url := c.createURL(fmt.Sprintf("/job-queue/v1/jobs/%s/builds/%d/image", job.ComposeID.String(), job.ImageBuildID)) + url := c.createURL(fmt.Sprintf("/job-queue/v1/jobs/%s/builds/%d/image", composeId, imageBuildId)) _, err := c.client.Post(url, "application/octet-stream", reader) return err diff --git a/internal/worker/json.go b/internal/worker/json.go index 561910221..7fdc8f8f9 100644 --- a/internal/worker/json.go +++ b/internal/worker/json.go @@ -8,6 +8,23 @@ import ( "github.com/osbuild/osbuild-composer/internal/target" ) +// +// JSON-serializable types for the jobqueue +// + +type OSBuildJob struct { + Manifest *osbuild.Manifest `json:"manifest"` + Targets []*target.Target `json:"targets,omitempty"` +} + +type OSBuildJobResult struct { + OSBuildOutput *common.ComposeResult `json:"osbuild_output,omitempty"` +} + +// +// JSON-serializable types for the HTTP API +// + type errorResponse struct { Message string `json:"message"` } @@ -16,10 +33,9 @@ type addJobRequest struct { } type addJobResponse struct { - ComposeID uuid.UUID `json:"compose_id"` - ImageBuildID int `json:"image_build_id"` - Manifest *osbuild.Manifest `json:"manifest"` - Targets []*target.Target `json:"targets"` + Id uuid.UUID `json:"id"` + Manifest *osbuild.Manifest `json:"manifest"` + Targets []*target.Target `json:"targets,omitempty"` } type updateJobRequest struct { diff --git a/internal/worker/server.go b/internal/worker/server.go index 8630c5516..b13345c8e 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -3,27 +3,37 @@ package worker import ( "encoding/json" "fmt" + "io" + "io/ioutil" "log" "net" "net/http" "strconv" + "time" "github.com/google/uuid" "github.com/julienschmidt/httprouter" - "github.com/osbuild/osbuild-composer/internal/store" + "github.com/osbuild/osbuild-composer/internal/common" + "github.com/osbuild/osbuild-composer/internal/jobqueue" + "github.com/osbuild/osbuild-composer/internal/osbuild" + "github.com/osbuild/osbuild-composer/internal/target" ) type Server struct { - logger *log.Logger - store *store.Store - router *httprouter.Router + logger *log.Logger + jobs jobqueue.JobQueue + router *httprouter.Router + imageWriter WriteImageFunc } -func NewServer(logger *log.Logger, store *store.Store) *Server { +type WriteImageFunc func(composeID uuid.UUID, imageBuildID int, reader io.Reader) error + +func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, imageWriter WriteImageFunc) *Server { s := &Server{ - logger: logger, - store: store, + logger: logger, + jobs: jobs, + imageWriter: imageWriter, } s.router = httprouter.New() @@ -33,7 +43,7 @@ func NewServer(logger *log.Logger, store *store.Store) *Server { s.router.NotFound = http.HandlerFunc(notFoundHandler) s.router.POST("/job-queue/v1/jobs", s.addJobHandler) - s.router.PATCH("/job-queue/v1/jobs/:job_id/builds/:build_id", s.updateJobHandler) + s.router.PATCH("/job-queue/v1/jobs/:job_id", s.updateJobHandler) s.router.POST("/job-queue/v1/jobs/:job_id/builds/:build_id/image", s.addJobImageHandler) return s @@ -59,6 +69,38 @@ func (s *Server) ServeHTTP(writer http.ResponseWriter, request *http.Request) { s.router.ServeHTTP(writer, request) } +func (s *Server) Enqueue(manifest *osbuild.Manifest, targets []*target.Target) (uuid.UUID, error) { + job := OSBuildJob{ + Manifest: manifest, + Targets: targets, + } + + return s.jobs.Enqueue("osbuild", job, nil) +} + +func (s *Server) JobStatus(id uuid.UUID) (state common.ComposeState, queued, started, finished time.Time, err error) { + var result OSBuildJobResult + var status jobqueue.JobStatus + + status, queued, started, finished, err = s.jobs.JobStatus(id, &result) + if err != nil { + return + } + + state = composeStateFromJobStatus(status, result.OSBuildOutput) + return +} + +func (s *Server) JobResult(id uuid.UUID) (common.ComposeState, *common.ComposeResult, error) { + var result OSBuildJobResult + status, _, _, _, err := s.jobs.JobStatus(id, &result) + if err != nil { + return common.CWaiting, nil, err + } + + return composeStateFromJobStatus(status, result.OSBuildOutput), result.OSBuildOutput, nil +} + // jsonErrorf() is similar to http.Error(), but returns the message in a json // object with a "message" field. func jsonErrorf(writer http.ResponseWriter, code int, message string, args ...interface{}) { @@ -92,15 +134,19 @@ func (s *Server) addJobHandler(writer http.ResponseWriter, request *http.Request return } - nextJob := s.store.PopJob() + var job OSBuildJob + id, err := s.jobs.Dequeue(request.Context(), []string{"osbuild"}, &job) + if err != nil { + jsonErrorf(writer, http.StatusInternalServerError, "%v", err) + return + } writer.WriteHeader(http.StatusCreated) // FIXME: handle or comment this possible error _ = json.NewEncoder(writer).Encode(addJobResponse{ - ComposeID: nextJob.ComposeID, - ImageBuildID: nextJob.ImageBuildID, - Manifest: nextJob.Manifest, - Targets: nextJob.Targets, + Id: id, + Manifest: job.Manifest, + Targets: job.Targets, }) } @@ -117,13 +163,6 @@ func (s *Server) updateJobHandler(writer http.ResponseWriter, request *http.Requ return } - imageBuildId, err := strconv.Atoi(params.ByName("build_id")) - - if err != nil { - jsonErrorf(writer, http.StatusBadRequest, "cannot parse image build id: %v", err) - return - } - var body updateJobRequest err = json.NewDecoder(request.Body).Decode(&body) if err != nil { @@ -131,13 +170,21 @@ func (s *Server) updateJobHandler(writer http.ResponseWriter, request *http.Requ return } - err = s.store.UpdateImageBuildInCompose(id, imageBuildId, body.Status, body.Result) + // The jobqueue doesn't support setting the status before a job is + // finished. This branch should never be hit, because the worker + // doesn't attempt this. Change the API to remove this awkwardness. + if body.Status != common.IBFinished && body.Status != common.IBFailed { + jsonErrorf(writer, http.StatusBadRequest, "setting status of a job to waiting or running is not supported") + return + } + + err = s.jobs.FinishJob(id, OSBuildJobResult{OSBuildOutput: body.Result}) if err != nil { - switch err.(type) { - case *store.NotFoundError, *store.NotPendingError: - jsonErrorf(writer, http.StatusNotFound, "%v", err) - case *store.NotRunningError, *store.InvalidRequestError: - jsonErrorf(writer, http.StatusBadRequest, "%v", err) + switch err { + case jobqueue.ErrNotExist: + jsonErrorf(writer, http.StatusNotFound, "job does not exist: %s", id) + case jobqueue.ErrNotRunning: + jsonErrorf(writer, http.StatusBadRequest, "job is not running: %s", id) default: jsonErrorf(writer, http.StatusInternalServerError, "%v", err) } @@ -155,23 +202,33 @@ func (s *Server) addJobImageHandler(writer http.ResponseWriter, request *http.Re } imageBuildId, err := strconv.Atoi(params.ByName("build_id")) - if err != nil { jsonErrorf(writer, http.StatusBadRequest, "cannot parse image build id: %v", err) return } - err = s.store.AddImageToImageUpload(id, imageBuildId, request.Body) - + if s.imageWriter == nil { + _, err = io.Copy(ioutil.Discard, request.Body) + } else { + err = s.imageWriter(id, imageBuildId, request.Body) + } if err != nil { - switch err.(type) { - case *store.NotFoundError: - jsonErrorf(writer, http.StatusNotFound, "%v", err) - case *store.NoLocalTargetError: - jsonErrorf(writer, http.StatusBadRequest, "%v", err) - default: - jsonErrorf(writer, http.StatusInternalServerError, "%v", err) - } - return + jsonErrorf(writer, http.StatusInternalServerError, "%v", err) } } + +func composeStateFromJobStatus(status jobqueue.JobStatus, output *common.ComposeResult) common.ComposeState { + switch status { + case jobqueue.JobPending: + return common.CWaiting + case jobqueue.JobRunning: + return common.CRunning + case jobqueue.JobFinished: + if output.Success { + return common.CFinished + } else { + return common.CFailed + } + } + return common.CWaiting +} diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go index 5a28d54ac..494d0b9d7 100644 --- a/internal/worker/server_test.go +++ b/internal/worker/server_test.go @@ -5,10 +5,10 @@ import ( "testing" "github.com/google/uuid" + "github.com/stretchr/testify/require" - "github.com/osbuild/osbuild-composer/internal/blueprint" "github.com/osbuild/osbuild-composer/internal/distro/fedoratest" - "github.com/osbuild/osbuild-composer/internal/store" + "github.com/osbuild/osbuild-composer/internal/jobqueue/testjobqueue" "github.com/osbuild/osbuild-composer/internal/test" "github.com/osbuild/osbuild-composer/internal/worker" ) @@ -27,15 +27,15 @@ func TestErrors(t *testing.T) { // Wrong method {"GET", "/job-queue/v1/jobs", ``, http.StatusMethodNotAllowed}, // Update job with invalid ID - {"PATCH", "/job-queue/v1/jobs/foo/builds/0", `{"status":"RUNNING"}`, http.StatusBadRequest}, + {"PATCH", "/job-queue/v1/jobs/foo", `{"status":"FINISHED"}`, http.StatusBadRequest}, // Update job that does not exist, with invalid body - {"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa/builds/0", ``, http.StatusBadRequest}, + {"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", ``, http.StatusBadRequest}, // Update job that does not exist - {"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa/builds/0", `{"status":"RUNNING"}`, http.StatusNotFound}, + {"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", `{"status":"FINISHED"}`, http.StatusNotFound}, } for _, c := range cases { - server := worker.NewServer(nil, store.New(nil)) + server := worker.NewServer(nil, testjobqueue.New(), nil) test.TestRoute(t, server, false, c.Method, c.Path, c.Body, c.ExpectedStatus, "{}", "message") } } @@ -50,21 +50,18 @@ func TestCreate(t *testing.T) { if err != nil { t.Fatalf("error getting image type from arch") } - store := store.New(nil) - server := worker.NewServer(nil, store) + server := worker.NewServer(nil, testjobqueue.New(), nil) manifest, err := imageType.Manifest(nil, nil, nil, nil, imageType.Size(0)) if err != nil { t.Fatalf("error creating osbuild manifest") } - id, err := store.PushCompose(manifest, imageType.Name(), &blueprint.Blueprint{}, 0, nil) - if err != nil { - t.Fatalf("error pushing compose: %v", err) - } + id, err := server.Enqueue(manifest, nil) + require.NoError(t, err) test.TestRoute(t, server, false, "POST", "/job-queue/v1/jobs", `{}`, http.StatusCreated, - `{"compose_id":"`+id.String()+`","image_build_id":0,"manifest":{"sources":{},"pipeline":{}},"targets":[]}`, "created") + `{"id":"`+id.String()+`","manifest":{"sources":{},"pipeline":{}}}`, "created") } func testUpdateTransition(t *testing.T, from, to string, expectedStatus int) { @@ -77,8 +74,7 @@ func testUpdateTransition(t *testing.T, from, to string, expectedStatus int) { if err != nil { t.Fatalf("error getting image type from arch") } - store := store.New(nil) - server := worker.NewServer(nil, store) + server := worker.NewServer(nil, testjobqueue.New(), nil) id := uuid.Nil if from != "VOID" { @@ -87,20 +83,18 @@ func testUpdateTransition(t *testing.T, from, to string, expectedStatus int) { t.Fatalf("error creating osbuild manifest") } - id, err = store.PushCompose(manifest, imageType.Name(), &blueprint.Blueprint{}, 0, nil) - if err != nil { - t.Fatalf("error pushing compose: %v", err) - } + id, err = server.Enqueue(manifest, nil) + require.NoError(t, err) if from != "WAITING" { test.SendHTTP(server, false, "POST", "/job-queue/v1/jobs", `{}`) if from != "RUNNING" { - test.SendHTTP(server, false, "PATCH", "/job-queue/v1/jobs/"+id.String()+"/builds/0", `{"status":"`+from+`"}`) + test.SendHTTP(server, false, "PATCH", "/job-queue/v1/jobs/"+id.String(), `{"status":"`+from+`"}`) } } } - test.TestRoute(t, server, false, "PATCH", "/job-queue/v1/jobs/"+id.String()+"/builds/0", `{"status":"`+to+`"}`, expectedStatus, "{}", "message") + test.TestRoute(t, server, false, "PATCH", "/job-queue/v1/jobs/"+id.String(), `{"status":"`+to+`"}`, expectedStatus, "{}", "message") } func TestUpdate(t *testing.T) { @@ -109,16 +103,16 @@ func TestUpdate(t *testing.T) { To string ExpectedStatus int }{ - {"VOID", "WAITING", http.StatusNotFound}, - {"VOID", "RUNNING", http.StatusNotFound}, + {"VOID", "WAITING", http.StatusBadRequest}, + {"VOID", "RUNNING", http.StatusBadRequest}, {"VOID", "FINISHED", http.StatusNotFound}, {"VOID", "FAILED", http.StatusNotFound}, - {"WAITING", "WAITING", http.StatusNotFound}, - {"WAITING", "RUNNING", http.StatusNotFound}, - {"WAITING", "FINISHED", http.StatusNotFound}, - {"WAITING", "FAILED", http.StatusNotFound}, + {"WAITING", "WAITING", http.StatusBadRequest}, + {"WAITING", "RUNNING", http.StatusBadRequest}, + {"WAITING", "FINISHED", http.StatusBadRequest}, + {"WAITING", "FAILED", http.StatusBadRequest}, {"RUNNING", "WAITING", http.StatusBadRequest}, - {"RUNNING", "RUNNING", http.StatusOK}, + {"RUNNING", "RUNNING", http.StatusBadRequest}, {"RUNNING", "FINISHED", http.StatusOK}, {"RUNNING", "FAILED", http.StatusOK}, {"FINISHED", "WAITING", http.StatusBadRequest}, @@ -132,6 +126,7 @@ func TestUpdate(t *testing.T) { } for _, c := range cases { + t.Log(c) testUpdateTransition(t, c.From, c.To, c.ExpectedStatus) } }