diff --git a/cmd/osbuild-composer/main.go b/cmd/osbuild-composer/main.go index abb903cbe..8a7cbeb32 100644 --- a/cmd/osbuild-composer/main.go +++ b/cmd/osbuild-composer/main.go @@ -15,6 +15,7 @@ import ( "github.com/osbuild/osbuild-composer/internal/distro/rhel81" "github.com/osbuild/osbuild-composer/internal/distro/rhel82" "github.com/osbuild/osbuild-composer/internal/distro/rhel83" + "github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue" "github.com/osbuild/osbuild-composer/internal/rcm" "github.com/osbuild/osbuild-composer/internal/common" @@ -118,11 +119,28 @@ func main() { store := store.New(&stateDir) - workerAPI := worker.NewServer(logger, store) - weldrAPI := weldr.New(rpm, arch, distribution, repoMap[common.CurrentArch()], logger, store) + queueDir := path.Join(stateDir, "jobs") + err = os.Mkdir(queueDir, 0700) + if err != nil && !os.IsExist(err) { + log.Fatalf("cannot create queue directory: %v", err) + } + + jobs, err := fsjobqueue.New(queueDir) + if err != nil { + log.Fatalf("cannot create jobqueue: %v", err) + } + + outputDir := path.Join(stateDir, "outputs") + err = os.Mkdir(outputDir, 0755) + if err != nil && !os.IsExist(err) { + log.Fatalf("cannot create output directory: %v", err) + } + + workers := worker.NewServer(logger, jobs, store.AddImageToImageUpload) + weldrAPI := weldr.New(rpm, arch, distribution, repoMap[common.CurrentArch()], logger, store, workers) go func() { - err := workerAPI.Serve(jobListener) + err := workers.Serve(jobListener) common.PanicOnError(err) }() @@ -133,7 +151,7 @@ func main() { log.Fatal("The RCM API socket unit is misconfigured. It should contain only one socket.") } rcmListener := rcmApiListeners[0] - rcmAPI := rcm.New(logger, store, rpm, distros) + rcmAPI := rcm.New(logger, workers, rpm, distros) go func() { err := rcmAPI.Serve(rcmListener) // If the RCM API fails, take down the whole process, not just a single gorutine @@ -158,7 +176,7 @@ func main() { listener := tls.NewListener(listener, tlsConfig) go func() { - err := workerAPI.Serve(listener) + err := workers.Serve(listener) common.PanicOnError(err) }() } diff --git a/cmd/osbuild-worker/main.go b/cmd/osbuild-worker/main.go index 87a8a5caa..ce1c856cf 100644 --- a/cmd/osbuild-worker/main.go +++ b/cmd/osbuild-worker/main.go @@ -12,6 +12,7 @@ import ( "os" "path" + "github.com/google/uuid" "github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/target" "github.com/osbuild/osbuild-composer/internal/upload/awsupload" @@ -62,7 +63,7 @@ func (e *TargetsError) Error() string { return errString } -func RunJob(job *worker.Job, uploadFunc func(*worker.Job, io.Reader) error) (*common.ComposeResult, error) { +func RunJob(job *worker.Job, uploadFunc func(uuid.UUID, int, io.Reader) error) (*common.ComposeResult, error) { tmpStore, err := ioutil.TempDir("/var/tmp", "osbuild-store") if err != nil { return nil, fmt.Errorf("error setting up osbuild store: %v", err) @@ -86,7 +87,7 @@ func RunJob(job *worker.Job, uploadFunc func(*worker.Job, io.Reader) error) (*co continue } - err = uploadFunc(job, f) + err = uploadFunc(options.ComposeId, options.ImageBuildId, f) if err != nil { r = append(r, err) continue @@ -101,7 +102,7 @@ func RunJob(job *worker.Job, uploadFunc func(*worker.Job, io.Reader) error) (*co } if options.Key == "" { - options.Key = job.ComposeID.String() + options.Key = job.Id.String() } _, err = a.Upload(path.Join(tmpStore, "refs", result.OutputID, options.Filename), options.Bucket, options.Key) @@ -191,7 +192,7 @@ func main() { log.Fatal(err) } - fmt.Printf("Running job %s\n", job.ComposeID.String()) + fmt.Printf("Running job %s\n", job.Id) var status common.ImageBuildState result, err := RunJob(job, client.UploadImage) diff --git a/internal/client/unit_test.go b/internal/client/unit_test.go index d2230cc50..156f70356 100644 --- a/internal/client/unit_test.go +++ b/internal/client/unit_test.go @@ -47,7 +47,7 @@ func executeTests(m *testing.M) int { } repos := []rpmmd.RepoConfig{{Id: "test-system-repo", BaseURL: "http://example.com/test/os/test_arch"}} logger := log.New(os.Stdout, "", 0) - api := weldr.New(rpm, arch, distro, repos, logger, fixture.Store) + api := weldr.New(rpm, arch, distro, repos, logger, fixture.Store, fixture.Workers) server := http.Server{Handler: api} defer server.Close() diff --git a/internal/compose/compose.go b/internal/compose/compose.go index d43e1aba0..ac56a09fd 100644 --- a/internal/compose/compose.go +++ b/internal/compose/compose.go @@ -5,6 +5,7 @@ package compose import ( "time" + "github.com/google/uuid" "github.com/osbuild/osbuild-composer/internal/blueprint" "github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/osbuild" @@ -21,15 +22,20 @@ func (ste *StateTransitionError) Error() string { // ImageBuild represents a single image build inside a compose type ImageBuild struct { - Id int `json:"id"` - QueueStatus common.ImageBuildState `json:"queue_status"` - ImageType common.ImageType `json:"image_type"` - Manifest *osbuild.Manifest `json:"manifest"` - Targets []*target.Target `json:"targets"` - JobCreated time.Time `json:"job_created"` - JobStarted time.Time `json:"job_started"` - JobFinished time.Time `json:"job_finished"` - Size uint64 `json:"size"` + Id int `json:"id"` + ImageType common.ImageType `json:"image_type"` + Manifest *osbuild.Manifest `json:"manifest"` + Targets []*target.Target `json:"targets"` + JobCreated time.Time `json:"job_created"` + JobStarted time.Time `json:"job_started"` + JobFinished time.Time `json:"job_finished"` + Size uint64 `json:"size"` + JobId uuid.UUID `json:"jobid,omitempty"` + + // Kept for backwards compatibility. Image builds which were done + // before the move to the job queue use this to store whether they + // finished successfully. + QueueStatus common.ImageBuildState `json:"queue_status,omitempty"` } // DeepCopy creates a copy of the ImageBuild structure @@ -55,6 +61,7 @@ func (ib *ImageBuild) DeepCopy() ImageBuild { JobStarted: ib.JobStarted, JobFinished: ib.JobFinished, Size: ib.Size, + JobId: ib.JobId, } } @@ -94,55 +101,6 @@ func (c *Compose) DeepCopy() Compose { } } -func anyImageBuild(fn func(common.ImageBuildState) bool, list []common.ImageBuildState) bool { - acc := false - for _, i := range list { - if fn(i) { - acc = true - } - } - return acc -} - -func allImageBuilds(fn func(common.ImageBuildState) bool, list []common.ImageBuildState) bool { - acc := true - for _, i := range list { - if !fn(i) { - acc = false - } - } - return acc -} - -// GetState returns a state of the whole compose which is derived from the states of -// individual image builds inside the compose -func (c *Compose) GetState() common.ComposeState { - var imageBuildsStates []common.ImageBuildState - for _, ib := range c.ImageBuilds { - imageBuildsStates = append(imageBuildsStates, ib.QueueStatus) - } - // In case all states are the same - if allImageBuilds(func(ib common.ImageBuildState) bool { return ib == common.IBWaiting }, imageBuildsStates) { - return common.CWaiting - } - if allImageBuilds(func(ib common.ImageBuildState) bool { return ib == common.IBFinished }, imageBuildsStates) { - return common.CFinished - } - if allImageBuilds(func(ib common.ImageBuildState) bool { return ib == common.IBFailed }, imageBuildsStates) { - return common.CFailed - } - // In case the states are mixed - // TODO: can this condition be removed because it is already covered by the default? - if anyImageBuild(func(ib common.ImageBuildState) bool { return ib == common.IBRunning }, imageBuildsStates) { - return common.CRunning - } - if allImageBuilds(func(ib common.ImageBuildState) bool { return ib == common.IBFailed || ib == common.IBFinished }, imageBuildsStates) { - return common.CFailed - } - // Default value - return common.CRunning -} - // UpdateState changes a state of a single image build inside the Compose func (c *Compose) UpdateState(imageBuildId int, newState common.ImageBuildState) error { switch newState { diff --git a/internal/compose/compose_test.go b/internal/compose/compose_test.go deleted file mode 100644 index 21f86440e..000000000 --- a/internal/compose/compose_test.go +++ /dev/null @@ -1,125 +0,0 @@ -package compose - -import ( - "github.com/osbuild/osbuild-composer/internal/common" - "testing" -) - -func TestGetState(t *testing.T) { - cases := []struct { - compose Compose - expecedStatus common.ComposeState - }{ - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBWaiting}, - }, - }, - expecedStatus: common.CWaiting, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBRunning}, - }, - }, - expecedStatus: common.CRunning, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBFailed}, - }, - }, - expecedStatus: common.CFailed, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBFinished}, - }, - }, - expecedStatus: common.CFinished, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBWaiting}, - {QueueStatus: common.IBWaiting}, - }, - }, - expecedStatus: common.CWaiting, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBWaiting}, - {QueueStatus: common.IBRunning}, - }, - }, - expecedStatus: common.CRunning, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBRunning}, - {QueueStatus: common.IBRunning}, - }, - }, - expecedStatus: common.CRunning, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBRunning}, - {QueueStatus: common.IBFailed}, - }, - }, - expecedStatus: common.CRunning, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBWaiting}, - {QueueStatus: common.IBFailed}, - }, - }, - expecedStatus: common.CRunning, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBFailed}, - {QueueStatus: common.IBFailed}, - }, - }, - expecedStatus: common.CFailed, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBFinished}, - {QueueStatus: common.IBFinished}, - }, - }, - expecedStatus: common.CFinished, - }, - { - compose: Compose{ - ImageBuilds: []ImageBuild{ - {QueueStatus: common.IBFinished}, - {QueueStatus: common.IBFailed}, - }, - }, - expecedStatus: common.CFailed, - }, - } - for n, c := range cases { - got := c.compose.GetState() - wanted := c.expecedStatus - if got != wanted { - t.Error("Compose", n, "should be in", wanted.ToString(), "state, but it is:", got.ToString()) - } - } -} diff --git a/internal/mocks/rpmmd/fixtures.go b/internal/mocks/rpmmd/fixtures.go index e34ebee37..2600af9dd 100644 --- a/internal/mocks/rpmmd/fixtures.go +++ b/internal/mocks/rpmmd/fixtures.go @@ -7,6 +7,8 @@ import ( "github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/compose" + "github.com/osbuild/osbuild-composer/internal/jobqueue/testjobqueue" + "github.com/osbuild/osbuild-composer/internal/worker" "github.com/google/uuid" "github.com/osbuild/osbuild-composer/internal/blueprint" @@ -150,6 +152,10 @@ func createBaseStoreFixture() *store.Store { return s } +func createBaseWorkersFixture() *worker.Server { + return worker.NewServer(nil, testjobqueue.New(), nil) +} + func createBaseDepsolveFixture() []rpmmd.PackageSpec { return []rpmmd.PackageSpec{ { @@ -207,6 +213,7 @@ func BaseFixture() Fixture { nil, }, createBaseStoreFixture(), + createBaseWorkersFixture(), } } @@ -223,6 +230,7 @@ func NoComposesFixture() Fixture { nil, }, createStoreWithoutComposesFixture(), + createBaseWorkersFixture(), } } @@ -242,6 +250,7 @@ func NonExistingPackage() Fixture { }, }, createBaseStoreFixture(), + createBaseWorkersFixture(), } } @@ -261,6 +270,7 @@ func BadDepsolve() Fixture { }, }, createBaseStoreFixture(), + createBaseWorkersFixture(), } } @@ -283,5 +293,6 @@ func BadFetch() Fixture { }, }, createBaseStoreFixture(), + createBaseWorkersFixture(), } } diff --git a/internal/mocks/rpmmd/rpmmd_mock.go b/internal/mocks/rpmmd/rpmmd_mock.go index 3cf3013a0..49049f72c 100644 --- a/internal/mocks/rpmmd/rpmmd_mock.go +++ b/internal/mocks/rpmmd/rpmmd_mock.go @@ -3,6 +3,7 @@ package rpmmd_mock import ( "github.com/osbuild/osbuild-composer/internal/rpmmd" "github.com/osbuild/osbuild-composer/internal/store" + "github.com/osbuild/osbuild-composer/internal/worker" ) type fetchPackageList struct { @@ -20,6 +21,7 @@ type Fixture struct { fetchPackageList depsolve *store.Store + Workers *worker.Server } type rpmmdMock struct { diff --git a/internal/rcm/api.go b/internal/rcm/api.go index f57f4b377..b677dfd3a 100644 --- a/internal/rcm/api.go +++ b/internal/rcm/api.go @@ -10,19 +10,18 @@ import ( "net/http" "github.com/osbuild/osbuild-composer/internal/rpmmd" + "github.com/osbuild/osbuild-composer/internal/worker" "github.com/google/uuid" "github.com/julienschmidt/httprouter" - "github.com/osbuild/osbuild-composer/internal/blueprint" "github.com/osbuild/osbuild-composer/internal/distro" - "github.com/osbuild/osbuild-composer/internal/store" ) // API encapsulates RCM-specific API that is exposed over a separate TCP socket type API struct { - logger *log.Logger - store *store.Store - router *httprouter.Router + logger *log.Logger + workers *worker.Server + router *httprouter.Router // rpmMetadata is an interface to dnf-json and we include it here so that we can // mock it in the unit tests rpmMetadata rpmmd.RPMMD @@ -30,10 +29,10 @@ type API struct { } // New creates new RCM API -func New(logger *log.Logger, store *store.Store, rpmMetadata rpmmd.RPMMD, distros *distro.Registry) *API { +func New(logger *log.Logger, workers *worker.Server, rpmMetadata rpmmd.RPMMD, distros *distro.Registry) *API { api := &API{ logger: logger, - store: store, + workers: workers, router: httprouter.New(), rpmMetadata: rpmMetadata, distros: distros, @@ -215,8 +214,7 @@ func (api *API) submit(writer http.ResponseWriter, request *http.Request, _ http return } - // Push the requested compose to the store - composeID, err := api.store.PushCompose(manifest, imageType.Name(), &blueprint.Blueprint{}, 0, nil) + composeID, err := api.workers.Enqueue(manifest, nil) if err != nil { if api.logger != nil { api.logger.Println("RCM API failed to push compose:", err) @@ -254,22 +252,20 @@ func (api *API) status(writer http.ResponseWriter, request *http.Request, params } // Check that the compose exists - compose, exists := api.store.GetCompose(id) - if !exists { + status, _, err := api.workers.JobResult(id) + if err != nil { writer.WriteHeader(http.StatusBadRequest) - errorReason.Error = "Compose UUID does not exist" + errorReason.Error = err.Error() // TODO: handle error _ = json.NewEncoder(writer).Encode(errorReason) return } // JSON structure with success response - var reply struct { + type reply struct { Status string `json:"status"` } - // TODO: return per-job status like Koji does (requires changes in the store) - reply.Status = compose.GetState().ToString() // TODO: handle error - _ = json.NewEncoder(writer).Encode(reply) + _ = json.NewEncoder(writer).Encode(reply{Status: status.ToString()}) } diff --git a/internal/rcm/api_test.go b/internal/rcm/api_test.go index 7f3a9597e..d34cc6a20 100644 --- a/internal/rcm/api_test.go +++ b/internal/rcm/api_test.go @@ -6,15 +6,18 @@ import ( "io/ioutil" "net/http" "net/http/httptest" + "os" "regexp" "testing" "github.com/google/uuid" + "github.com/stretchr/testify/require" + "github.com/osbuild/osbuild-composer/internal/jobqueue/testjobqueue" distro_mock "github.com/osbuild/osbuild-composer/internal/mocks/distro" rpmmd_mock "github.com/osbuild/osbuild-composer/internal/mocks/rpmmd" "github.com/osbuild/osbuild-composer/internal/rcm" - "github.com/osbuild/osbuild-composer/internal/store" + "github.com/osbuild/osbuild-composer/internal/worker" ) type API interface { @@ -30,6 +33,21 @@ func internalRequest(api API, method, path, body, contentType string) *http.Resp return resp.Result() } +func newTestWorkerServer(t *testing.T) (*worker.Server, string) { + dir, err := ioutil.TempDir("", "rcm-test-") + require.NoError(t, err) + + w := worker.NewServer(nil, testjobqueue.New(), nil) + require.NotNil(t, w) + + return w, dir +} + +func cleanupTempDir(t *testing.T, dir string) { + err := os.RemoveAll(dir) + require.NoError(t, err) +} + func TestBasicRcmAPI(t *testing.T) { // Test the HTTP API responses // This test mainly focuses on HTTP status codes and JSON structures, not necessarily on their content @@ -47,14 +65,18 @@ func TestBasicRcmAPI(t *testing.T) { {"POST", "/v1/compose", `{"status":"RUNNING"}`, "text/plain", http.StatusBadRequest, ``}, {"POST", "/v1/compose", `{"image_builds":[]}`, "application/json", http.StatusBadRequest, ""}, {"POST", "/v1/compose/111-222-333", `{"status":"RUNNING"}`, "application/json", http.StatusMethodNotAllowed, ``}, - {"GET", "/v1/compose/7802c476-9cd1-41b7-ba81-43c1906bce73", `{"status":"RUNNING"}`, "application/json", http.StatusBadRequest, `{"error_reason":"Compose UUID does not exist"}`}, + {"GET", "/v1/compose/7802c476-9cd1-41b7-ba81-43c1906bce73", `{"status":"RUNNING"}`, "application/json", http.StatusBadRequest, ``}, } registry, err := distro_mock.NewDefaultRegistry() if err != nil { t.Fatal(err) } - api := rcm.New(nil, store.New(nil), rpmmd_mock.NewRPMMDMock(rpmmd_mock.BaseFixture()), registry) + + workers, dir := newTestWorkerServer(t) + defer cleanupTempDir(t, dir) + + api := rcm.New(nil, workers, rpmmd_mock.NewRPMMDMock(rpmmd_mock.BaseFixture()), registry) for _, c := range cases { resp := internalRequest(api, c.Method, c.Path, c.Body, c.ContentType) @@ -79,7 +101,11 @@ func TestSubmit(t *testing.T) { if err != nil { t.Fatal(err) } - api := rcm.New(nil, store.New(nil), rpmmd_mock.NewRPMMDMock(rpmmd_mock.BaseFixture()), registry) + + workers, dir := newTestWorkerServer(t) + defer cleanupTempDir(t, dir) + + api := rcm.New(nil, workers, rpmmd_mock.NewRPMMDMock(rpmmd_mock.BaseFixture()), registry) var submit_reply struct { UUID uuid.UUID `json:"compose_id"` @@ -207,7 +233,11 @@ func TestStatus(t *testing.T) { if err != nil { t.Fatal(err) } - api := rcm.New(nil, store.New(nil), rpmmd_mock.NewRPMMDMock(rpmmd_mock.BaseFixture()), registry) + + workers, dir := newTestWorkerServer(t) + defer cleanupTempDir(t, dir) + + api := rcm.New(nil, workers, rpmmd_mock.NewRPMMDMock(rpmmd_mock.BaseFixture()), registry) var submit_reply struct { UUID uuid.UUID `json:"compose_id"` diff --git a/internal/store/store.go b/internal/store/store.go index 1d7904ca5..a10e1aa79 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -84,14 +84,6 @@ func (e *NotPendingError) Error() string { return e.message } -type NotRunningError struct { - message string -} - -func (e *NotRunningError) Error() string { - return e.message -} - type InvalidRequestError struct { message string } @@ -136,27 +128,23 @@ func New(stateDir *string) *Store { if s.Composes == nil { s.Composes = make(map[uuid.UUID]compose.Compose) } else { + // Backwards compatibility: fail all builds that are queued or + // running. Jobs status is now handled outside of the store + // (and the compose). The fields are kept so that previously + // succeeded builds still show up correctly. for composeID, compose := range s.Composes { if len(compose.ImageBuilds) == 0 { panic("the was a compose with zero image builds, that is forbidden") } for imgID, imgBuild := range compose.ImageBuilds { switch imgBuild.QueueStatus { - case common.IBRunning: - // We do not support resuming an in-flight build + case common.IBRunning, common.IBWaiting: compose.ImageBuilds[imgID].QueueStatus = common.IBFailed s.Composes[composeID] = compose - case common.IBWaiting: - // Push waiting composes back into the pending jobs queue - s.pendingJobs <- Job{ - ComposeID: composeID, - ImageBuildID: imgBuild.Id, - Manifest: imgBuild.Manifest, - Targets: imgBuild.Targets, - } } } } + } if s.Sources == nil { s.Sources = make(map[string]SourceConfig) @@ -501,13 +489,15 @@ func (s *Store) getImageBuildDirectory(composeID uuid.UUID, imageBuildID int) st return fmt.Sprintf("%s/%d", s.getComposeDirectory(composeID), imageBuildID) } -func (s *Store) PushCompose(manifest *osbuild.Manifest, imageType distro.ImageType, bp *blueprint.Blueprint, size uint64, targets []*target.Target) (uuid.UUID, error) { +func (s *Store) PushCompose(composeID uuid.UUID, manifest *osbuild.Manifest, imageType distro.ImageType, bp *blueprint.Blueprint, size uint64, targets []*target.Target, jobId uuid.UUID) error { + if _, exists := s.GetCompose(composeID); exists { + panic("a compose with this id already exists") + } + if targets == nil { targets = []*target.Target{} } - composeID := uuid.New() - // Compatibility layer for image types in Weldr API v0 imageTypeCommon, exists := common.ImageTypeFromCompatString(imageType.Name()) if !exists { @@ -519,7 +509,7 @@ func (s *Store) PushCompose(manifest *osbuild.Manifest, imageType distro.ImageTy err := os.MkdirAll(outputDir, 0755) if err != nil { - return uuid.Nil, fmt.Errorf("cannot create output directory for job %v: %#v", composeID, err) + return fmt.Errorf("cannot create output directory for job %v: %#v", composeID, err) } } @@ -529,37 +519,28 @@ func (s *Store) PushCompose(manifest *osbuild.Manifest, imageType distro.ImageTy Blueprint: bp, ImageBuilds: []compose.ImageBuild{ { - QueueStatus: common.IBWaiting, - Manifest: manifest, - ImageType: imageTypeCommon, - Targets: targets, - JobCreated: time.Now(), - Size: size, + Manifest: manifest, + ImageType: imageTypeCommon, + Targets: targets, + JobCreated: time.Now(), + Size: size, + JobId: jobId, }, }, } return nil }) - s.pendingJobs <- Job{ - ComposeID: composeID, - ImageBuildID: 0, - Manifest: manifest, - Targets: targets, - } - - return composeID, nil + return nil } // PushTestCompose is used for testing // Set testSuccess to create a fake successful compose, otherwise it will create a failed compose // It does not actually run a compose job -func (s *Store) PushTestCompose(manifest *osbuild.Manifest, imageType distro.ImageType, bp *blueprint.Blueprint, size uint64, targets []*target.Target, testSuccess bool) (uuid.UUID, error) { +func (s *Store) PushTestCompose(composeID uuid.UUID, manifest *osbuild.Manifest, imageType distro.ImageType, bp *blueprint.Blueprint, size uint64, targets []*target.Target, testSuccess bool) error { if targets == nil { targets = []*target.Target{} } - composeID := uuid.New() - // Compatibility layer for image types in Weldr API v0 imageTypeCommon, exists := common.ImageTypeFromCompatString(imageType.Name()) if !exists { @@ -571,7 +552,7 @@ func (s *Store) PushTestCompose(manifest *osbuild.Manifest, imageType distro.Ima err := os.MkdirAll(outputDir, 0755) if err != nil { - return uuid.Nil, fmt.Errorf("cannot create output directory for job %v: %#v", composeID, err) + return fmt.Errorf("cannot create output directory for job %v: %#v", composeID, err) } } @@ -607,33 +588,20 @@ func (s *Store) PushTestCompose(manifest *osbuild.Manifest, imageType distro.Ima // Instead of starting the job, immediately set a final status err := s.UpdateImageBuildInCompose(composeID, 0, status, &result) if err != nil { - return uuid.Nil, err + return err } - return composeID, nil + return nil } // DeleteCompose deletes the compose from the state file and also removes all files on disk that are // associated with this compose func (s *Store) DeleteCompose(id uuid.UUID) error { return s.change(func() error { - compose, exists := s.Composes[id] - - if !exists { + if _, exists := s.Composes[id]; !exists { return &NotFoundError{} } - // If any of the image builds have build artifacts, remove them - invalidRequest := true - for _, imageBuild := range compose.ImageBuilds { - if imageBuild.QueueStatus == common.IBFinished || imageBuild.QueueStatus == common.IBFailed { - invalidRequest = false - } - } - if invalidRequest { - return &InvalidRequestError{fmt.Sprintf("Compose %s is not in FINISHED or FAILED.", id)} - } - delete(s.Composes, id) var err error @@ -648,31 +616,6 @@ func (s *Store) DeleteCompose(id uuid.UUID) error { }) } -// PopJob returns a job from the job queue and changes the status of the corresponding image build to running -func (s *Store) PopJob() Job { - job := <-s.pendingJobs - // FIXME: handle or comment this possible error - _ = s.change(func() error { - // Get the compose from the map - compose, exists := s.Composes[job.ComposeID] - // Check that it exists - if !exists { - panic("Invalid job in queue.") - } - // Change queue status to running for the image build as well as for the targets - compose.ImageBuilds[job.ImageBuildID].QueueStatus = common.IBRunning - compose.ImageBuilds[job.ImageBuildID].JobStarted = time.Now() - for m := range compose.ImageBuilds[job.ImageBuildID].Targets { - compose.ImageBuilds[job.ImageBuildID].Targets[m].Status = common.IBRunning - } - // Replace the compose struct with the new one - // TODO: I'm not sure this is needed, but I don't know what is the golang semantics in this case - s.Composes[job.ComposeID] = compose - return nil - }) - return job -} - // UpdateImageBuildInCompose sets the status and optionally also the final image. func (s *Store) UpdateImageBuildInCompose(composeID uuid.UUID, imageBuildID int, status common.ImageBuildState, result *common.ComposeResult) error { return s.change(func() error { diff --git a/internal/target/local_target.go b/internal/target/local_target.go index 48f25ccb2..6e57f5d20 100644 --- a/internal/target/local_target.go +++ b/internal/target/local_target.go @@ -1,7 +1,11 @@ package target +import "github.com/google/uuid" + type LocalTargetOptions struct { - Filename string `json:"filename"` + ComposeId uuid.UUID `json:"compose_id"` + ImageBuildId int `json:"image_build_id"` + Filename string `json:"filename"` } func (LocalTargetOptions) isTargetOptions() {} diff --git a/internal/weldr/api.go b/internal/weldr/api.go index a0fa422ec..f080493f4 100644 --- a/internal/weldr/api.go +++ b/internal/weldr/api.go @@ -14,6 +14,7 @@ import ( "sort" "strconv" "strings" + "time" "github.com/BurntSushi/toml" "github.com/google/uuid" @@ -21,14 +22,17 @@ import ( "github.com/osbuild/osbuild-composer/internal/blueprint" "github.com/osbuild/osbuild-composer/internal/common" + "github.com/osbuild/osbuild-composer/internal/compose" "github.com/osbuild/osbuild-composer/internal/distro" "github.com/osbuild/osbuild-composer/internal/rpmmd" "github.com/osbuild/osbuild-composer/internal/store" "github.com/osbuild/osbuild-composer/internal/target" + "github.com/osbuild/osbuild-composer/internal/worker" ) type API struct { - store *store.Store + store *store.Store + workers *worker.Server rpmmd rpmmd.RPMMD arch distro.Arch @@ -39,14 +43,15 @@ type API struct { router *httprouter.Router } -func New(rpmmd rpmmd.RPMMD, arch distro.Arch, distro distro.Distro, repos []rpmmd.RepoConfig, logger *log.Logger, store *store.Store) *API { +func New(rpmmd rpmmd.RPMMD, arch distro.Arch, distro distro.Distro, repos []rpmmd.RepoConfig, logger *log.Logger, store *store.Store, workers *worker.Server) *API { api := &API{ - store: store, - rpmmd: rpmmd, - arch: arch, - distro: distro, - repos: repos, - logger: logger, + store: store, + workers: workers, + rpmmd: rpmmd, + arch: arch, + distro: distro, + repos: repos, + logger: logger, } api.router = httprouter.New() @@ -135,6 +140,40 @@ func (api *API) ServeHTTP(writer http.ResponseWriter, request *http.Request) { api.router.ServeHTTP(writer, request) } +// Returns the state of the image in `compose` and the times the job was +// queued, started, and finished. Assumes that there's only one image in the +// compose. Returns CWaiting on error. +func (api *API) getComposeState(compose compose.Compose) (state common.ComposeState, queued, started, finished time.Time) { + if len(compose.ImageBuilds) == 0 { + return + } + + jobId := compose.ImageBuilds[0].JobId + + // backwards compatibility: composes that were around before splitting + // the job queue from the store still contain their valid status and + // times. Return those here as a fallback. + if jobId == uuid.Nil { + switch compose.ImageBuilds[0].QueueStatus { + case common.IBWaiting: + state = common.CWaiting + case common.IBRunning: + state = common.CRunning + case common.IBFinished: + state = common.CFinished + case common.IBFailed: + state = common.CFailed + } + queued = compose.ImageBuilds[0].JobCreated + started = compose.ImageBuilds[0].JobStarted + finished = compose.ImageBuilds[0].JobFinished + return + } + + state, queued, started, finished, _ = api.workers.JobStatus(jobId) + return +} + func verifyRequestVersion(writer http.ResponseWriter, params httprouter.Params, minVersion uint) bool { versionString := params.ByName("version") @@ -1433,6 +1472,8 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request return } + composeID := uuid.New() + var targets []*target.Target if isRequestVersionAtLeast(params, 1) && cr.Upload != nil { t := uploadRequestToTarget(*cr.Upload, imageType) @@ -1441,7 +1482,9 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request targets = append(targets, target.NewLocalTarget( &target.LocalTargetOptions{ - Filename: imageType.Filename(), + ComposeId: composeID, + ImageBuildId: 0, + Filename: imageType.Filename(), }, )) @@ -1465,8 +1508,6 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request return } - var composeID uuid.UUID - // Check for test parameter q, err := url.ParseQuery(request.URL.RawQuery) if err != nil { @@ -1492,12 +1533,17 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request testMode := q.Get("test") if testMode == "1" { // Create a failed compose - composeID, err = api.store.PushTestCompose(manifest, imageType, bp, size, targets, false) + err = api.store.PushTestCompose(composeID, manifest, imageType, bp, size, targets, false) } else if testMode == "2" { // Create a successful compose - composeID, err = api.store.PushTestCompose(manifest, imageType, bp, size, targets, true) + err = api.store.PushTestCompose(composeID, manifest, imageType, bp, size, targets, true) } else { - composeID, err = api.store.PushCompose(manifest, imageType, bp, size, targets) + var jobId uuid.UUID + + jobId, err = api.workers.Enqueue(manifest, targets) + if err == nil { + err = api.store.PushCompose(composeID, manifest, imageType, bp, size, targets, jobId) + } } // TODO: we should probably do some kind of blueprint validation in future @@ -1549,29 +1595,34 @@ func (api *API) composeDeleteHandler(writer http.ResponseWriter, request *http.R continue } - err = api.store.DeleteCompose(id) - - if err != nil { - switch err.(type) { - case *store.NotFoundError: - errors = append(errors, composeDeleteError{ - "UnknownUUID", - fmt.Sprintf("compose %s doesn't exist", id), - }) - case *store.InvalidRequestError: - errors = append(errors, composeDeleteError{ - "BuildInWrongState", - err.Error(), - }) - default: - errors = append(errors, composeDeleteError{ - "ComposeError", - fmt.Sprintf("%s: %s", id, err.Error()), - }) - } - } else { - results = append(results, composeDeleteStatus{id, true}) + compose, exists := api.store.GetCompose(id) + if !exists { + errors = append(errors, composeDeleteError{ + "UnknownUUID", + fmt.Sprintf("compose %s doesn't exist", id), + }) + continue } + + state, _, _, _ := api.getComposeState(compose) + if state != common.CFinished && state != common.CFailed { + errors = append(errors, composeDeleteError{ + "BuildInWrongState", + fmt.Sprintf("Compose %s is not in FINISHED or FAILED.", id), + }) + continue + } + + err = api.store.DeleteCompose(id) + if err != nil { + errors = append(errors, composeDeleteError{ + "ComposeError", + fmt.Sprintf("%s: %s", id, err.Error()), + }) + continue + } + + results = append(results, composeDeleteStatus{id, true}) } reply := struct { @@ -1614,13 +1665,16 @@ func (api *API) composeQueueHandler(writer http.ResponseWriter, request *http.Re Run []*ComposeEntry `json:"run"` }{[]*ComposeEntry{}, []*ComposeEntry{}} + includeUploads := isRequestVersionAtLeast(params, 1) + composes := api.store.GetAllComposes() for id, compose := range composes { - switch compose.GetState() { + state, queued, started, finished := api.getComposeState(compose) + switch state { case common.CWaiting: - reply.New = append(reply.New, composeToComposeEntry(id, compose, isRequestVersionAtLeast(params, 1))) + reply.New = append(reply.New, composeToComposeEntry(id, compose, common.CWaiting, queued, started, finished, includeUploads)) case common.CRunning: - reply.Run = append(reply.Run, composeToComposeEntry(id, compose, isRequestVersionAtLeast(params, 1))) + reply.Run = append(reply.Run, composeToComposeEntry(id, compose, common.CRunning, queued, started, finished, includeUploads)) } } @@ -1682,9 +1736,10 @@ func (api *API) composeStatusHandler(writer http.ResponseWriter, request *http.R if !exists { continue } + state, _, _, _ := api.getComposeState(compose) if filterBlueprint != "" && compose.Blueprint.Name != filterBlueprint { continue - } else if filterStatus != "" && compose.ImageBuilds[0].QueueStatus.ToString() != filterStatus { + } else if filterStatus != "" && state.ToString() != filterStatus { continue } else if filterImageTypeExists && compose.ImageBuilds[0].ImageType != filterImageType { continue @@ -1696,7 +1751,8 @@ func (api *API) composeStatusHandler(writer http.ResponseWriter, request *http.R includeUploads := isRequestVersionAtLeast(params, 1) for _, id := range filteredUUIDs { if compose, exists := composes[id]; exists { - reply.UUIDs = append(reply.UUIDs, composeToComposeEntry(id, compose, includeUploads)) + state, queued, started, finished := api.getComposeState(compose) + reply.UUIDs = append(reply.UUIDs, composeToComposeEntry(id, compose, state, queued, started, finished, includeUploads)) } } sortComposeEntries(reply.UUIDs) @@ -1755,8 +1811,9 @@ func (api *API) composeInfoHandler(writer http.ResponseWriter, request *http.Req } // Weldr API assumes only one image build per compose, that's why only the // 1st build is considered + state, _, _, _ := api.getComposeState(compose) reply.ComposeType, _ = compose.ImageBuilds[0].ImageType.ToCompatString() - reply.QueueStatus = compose.GetState().ToString() + reply.QueueStatus = state.ToString() reply.ImageSize = compose.ImageBuilds[0].Size if isRequestVersionAtLeast(params, 1) { @@ -1793,10 +1850,11 @@ func (api *API) composeImageHandler(writer http.ResponseWriter, request *http.Re return } - if compose.GetState() != common.CFinished { + state, _, _, _ := api.getComposeState(compose) + if state != common.CFinished { errors := responseError{ ID: "BuildInWrongState", - Msg: fmt.Sprintf("Build %s is in wrong state: %s", uuidString, compose.GetState().ToString()), + Msg: fmt.Sprintf("Build %s is in wrong state: %s", uuidString, state.ToString()), } statusResponseError(writer, http.StatusBadRequest, errors) return @@ -1863,7 +1921,8 @@ func (api *API) composeLogsHandler(writer http.ResponseWriter, request *http.Req return } - if compose.GetState() != common.CFinished && compose.GetState() != common.CFailed { + state, _, _, _ := api.getComposeState(compose) + if state != common.CFinished && state != common.CFailed { errors := responseError{ ID: "BuildInWrongState", Msg: fmt.Sprintf("Build %s not in FINISHED or FAILED state.", uuidString), @@ -1950,7 +2009,8 @@ func (api *API) composeLogHandler(writer http.ResponseWriter, request *http.Requ return } - if compose.GetState() == common.CWaiting { + state, _, _, _ := api.getComposeState(compose) + if state == common.CWaiting { errors := responseError{ ID: "BuildInWrongState", Msg: fmt.Sprintf("Build %s has not started yet. No logs to view.", uuidString), @@ -1959,7 +2019,7 @@ func (api *API) composeLogHandler(writer http.ResponseWriter, request *http.Requ return } - if compose.GetState() == common.CRunning { + if state == common.CRunning { fmt.Fprintf(writer, "Running...\n") return } @@ -2004,9 +2064,11 @@ func (api *API) composeFinishedHandler(writer http.ResponseWriter, request *http includeUploads := isRequestVersionAtLeast(params, 1) for id, compose := range api.store.GetAllComposes() { - if compose.ImageBuilds[0].QueueStatus == common.IBFinished { - reply.Finished = append(reply.Finished, composeToComposeEntry(id, compose, includeUploads)) + state, queued, started, finished := api.getComposeState(compose) + if state != common.CFinished { + continue } + reply.Finished = append(reply.Finished, composeToComposeEntry(id, compose, common.CFinished, queued, started, finished, includeUploads)) } sortComposeEntries(reply.Finished) @@ -2025,9 +2087,11 @@ func (api *API) composeFailedHandler(writer http.ResponseWriter, request *http.R includeUploads := isRequestVersionAtLeast(params, 1) for id, compose := range api.store.GetAllComposes() { - if compose.ImageBuilds[0].QueueStatus == common.IBFailed { - reply.Failed = append(reply.Failed, composeToComposeEntry(id, compose, includeUploads)) + state, queued, started, finished := api.getComposeState(compose) + if state != common.CFailed { + continue } + reply.Failed = append(reply.Failed, composeToComposeEntry(id, compose, common.CFailed, queued, started, finished, includeUploads)) } sortComposeEntries(reply.Failed) diff --git a/internal/weldr/api_test.go b/internal/weldr/api_test.go index 749fd8802..b33bc1db9 100644 --- a/internal/weldr/api_test.go +++ b/internal/weldr/api_test.go @@ -38,7 +38,7 @@ func createWeldrAPI(fixtureGenerator rpmmd_mock.FixtureGenerator) (*API, *store. panic(err) } - return New(rpm, arch, d, repos, nil, fixture.Store), fixture.Store + return New(rpm, arch, d, repos, nil, fixture.Store, fixture.Workers), fixture.Store } func TestBasic(t *testing.T) { diff --git a/internal/weldr/compose.go b/internal/weldr/compose.go index c2649c0ef..ca0f7f6a3 100644 --- a/internal/weldr/compose.go +++ b/internal/weldr/compose.go @@ -2,6 +2,7 @@ package weldr import ( "sort" + "time" "github.com/google/uuid" "github.com/osbuild/osbuild-composer/internal/common" @@ -21,37 +22,40 @@ type ComposeEntry struct { Uploads []uploadResponse `json:"uploads,omitempty"` } -func composeToComposeEntry(id uuid.UUID, compose compose.Compose, includeUploads bool) *ComposeEntry { +func composeToComposeEntry(id uuid.UUID, compose compose.Compose, state common.ComposeState, queued, started, finished time.Time, includeUploads bool) *ComposeEntry { var composeEntry ComposeEntry composeEntry.ID = id composeEntry.Blueprint = compose.Blueprint.Name composeEntry.Version = compose.Blueprint.Version composeEntry.ComposeType = compose.ImageBuilds[0].ImageType - composeEntry.QueueStatus = compose.ImageBuilds[0].QueueStatus if includeUploads { composeEntry.Uploads = targetsToUploadResponses(compose.ImageBuilds[0].Targets) } - switch compose.ImageBuilds[0].QueueStatus { - case common.IBWaiting: - composeEntry.JobCreated = float64(compose.ImageBuilds[0].JobCreated.UnixNano()) / 1000000000 + switch state { + case common.CWaiting: + composeEntry.QueueStatus = common.IBWaiting + composeEntry.JobCreated = float64(queued.UnixNano()) / 1000000000 - case common.IBRunning: - composeEntry.JobCreated = float64(compose.ImageBuilds[0].JobCreated.UnixNano()) / 1000000000 - composeEntry.JobStarted = float64(compose.ImageBuilds[0].JobStarted.UnixNano()) / 1000000000 + case common.CRunning: + composeEntry.QueueStatus = common.IBRunning + composeEntry.JobCreated = float64(queued.UnixNano()) / 1000000000 + composeEntry.JobStarted = float64(started.UnixNano()) / 1000000000 - case common.IBFinished: + case common.CFinished: + composeEntry.QueueStatus = common.IBFinished composeEntry.ImageSize = compose.ImageBuilds[0].Size - composeEntry.JobCreated = float64(compose.ImageBuilds[0].JobCreated.UnixNano()) / 1000000000 - composeEntry.JobStarted = float64(compose.ImageBuilds[0].JobStarted.UnixNano()) / 1000000000 - composeEntry.JobFinished = float64(compose.ImageBuilds[0].JobFinished.UnixNano()) / 1000000000 + composeEntry.JobCreated = float64(queued.UnixNano()) / 1000000000 + composeEntry.JobStarted = float64(started.UnixNano()) / 1000000000 + composeEntry.JobFinished = float64(finished.UnixNano()) / 1000000000 - case common.IBFailed: - composeEntry.JobCreated = float64(compose.ImageBuilds[0].JobCreated.UnixNano()) / 1000000000 - composeEntry.JobStarted = float64(compose.ImageBuilds[0].JobStarted.UnixNano()) / 1000000000 - composeEntry.JobFinished = float64(compose.ImageBuilds[0].JobFinished.UnixNano()) / 1000000000 + case common.CFailed: + composeEntry.QueueStatus = common.IBFailed + composeEntry.JobCreated = float64(queued.UnixNano()) / 1000000000 + composeEntry.JobStarted = float64(started.UnixNano()) / 1000000000 + composeEntry.JobFinished = float64(finished.UnixNano()) / 1000000000 default: panic("invalid compose state") } diff --git a/internal/worker/client.go b/internal/worker/client.go index 385ea9141..e325280d1 100644 --- a/internal/worker/client.go +++ b/internal/worker/client.go @@ -25,10 +25,9 @@ type Client struct { } type Job struct { - ComposeID uuid.UUID - ImageBuildID int - Manifest *osbuild.Manifest - Targets []*target.Target + Id uuid.UUID + Manifest *osbuild.Manifest + Targets []*target.Target } func NewClient(address string, conf *tls.Config) *Client { @@ -86,8 +85,7 @@ func (c *Client) AddJob() (*Job, error) { } return &Job{ - jr.ComposeID, - jr.ImageBuildID, + jr.Id, jr.Manifest, jr.Targets, }, nil @@ -99,7 +97,7 @@ func (c *Client) UpdateJob(job *Job, status common.ImageBuildState, result *comm if err != nil { panic(err) } - urlPath := fmt.Sprintf("/job-queue/v1/jobs/%s/builds/%d", job.ComposeID.String(), job.ImageBuildID) + urlPath := fmt.Sprintf("/job-queue/v1/jobs/%s", job.Id) url := c.createURL(urlPath) req, err := http.NewRequest("PATCH", url, &b) if err != nil { @@ -120,9 +118,9 @@ func (c *Client) UpdateJob(job *Job, status common.ImageBuildState, result *comm return nil } -func (c *Client) UploadImage(job *Job, reader io.Reader) error { +func (c *Client) UploadImage(composeId uuid.UUID, imageBuildId int, reader io.Reader) error { // content type doesn't really matter - url := c.createURL(fmt.Sprintf("/job-queue/v1/jobs/%s/builds/%d/image", job.ComposeID.String(), job.ImageBuildID)) + url := c.createURL(fmt.Sprintf("/job-queue/v1/jobs/%s/builds/%d/image", composeId, imageBuildId)) _, err := c.client.Post(url, "application/octet-stream", reader) return err diff --git a/internal/worker/json.go b/internal/worker/json.go index 561910221..7fdc8f8f9 100644 --- a/internal/worker/json.go +++ b/internal/worker/json.go @@ -8,6 +8,23 @@ import ( "github.com/osbuild/osbuild-composer/internal/target" ) +// +// JSON-serializable types for the jobqueue +// + +type OSBuildJob struct { + Manifest *osbuild.Manifest `json:"manifest"` + Targets []*target.Target `json:"targets,omitempty"` +} + +type OSBuildJobResult struct { + OSBuildOutput *common.ComposeResult `json:"osbuild_output,omitempty"` +} + +// +// JSON-serializable types for the HTTP API +// + type errorResponse struct { Message string `json:"message"` } @@ -16,10 +33,9 @@ type addJobRequest struct { } type addJobResponse struct { - ComposeID uuid.UUID `json:"compose_id"` - ImageBuildID int `json:"image_build_id"` - Manifest *osbuild.Manifest `json:"manifest"` - Targets []*target.Target `json:"targets"` + Id uuid.UUID `json:"id"` + Manifest *osbuild.Manifest `json:"manifest"` + Targets []*target.Target `json:"targets,omitempty"` } type updateJobRequest struct { diff --git a/internal/worker/server.go b/internal/worker/server.go index 8630c5516..b13345c8e 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -3,27 +3,37 @@ package worker import ( "encoding/json" "fmt" + "io" + "io/ioutil" "log" "net" "net/http" "strconv" + "time" "github.com/google/uuid" "github.com/julienschmidt/httprouter" - "github.com/osbuild/osbuild-composer/internal/store" + "github.com/osbuild/osbuild-composer/internal/common" + "github.com/osbuild/osbuild-composer/internal/jobqueue" + "github.com/osbuild/osbuild-composer/internal/osbuild" + "github.com/osbuild/osbuild-composer/internal/target" ) type Server struct { - logger *log.Logger - store *store.Store - router *httprouter.Router + logger *log.Logger + jobs jobqueue.JobQueue + router *httprouter.Router + imageWriter WriteImageFunc } -func NewServer(logger *log.Logger, store *store.Store) *Server { +type WriteImageFunc func(composeID uuid.UUID, imageBuildID int, reader io.Reader) error + +func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, imageWriter WriteImageFunc) *Server { s := &Server{ - logger: logger, - store: store, + logger: logger, + jobs: jobs, + imageWriter: imageWriter, } s.router = httprouter.New() @@ -33,7 +43,7 @@ func NewServer(logger *log.Logger, store *store.Store) *Server { s.router.NotFound = http.HandlerFunc(notFoundHandler) s.router.POST("/job-queue/v1/jobs", s.addJobHandler) - s.router.PATCH("/job-queue/v1/jobs/:job_id/builds/:build_id", s.updateJobHandler) + s.router.PATCH("/job-queue/v1/jobs/:job_id", s.updateJobHandler) s.router.POST("/job-queue/v1/jobs/:job_id/builds/:build_id/image", s.addJobImageHandler) return s @@ -59,6 +69,38 @@ func (s *Server) ServeHTTP(writer http.ResponseWriter, request *http.Request) { s.router.ServeHTTP(writer, request) } +func (s *Server) Enqueue(manifest *osbuild.Manifest, targets []*target.Target) (uuid.UUID, error) { + job := OSBuildJob{ + Manifest: manifest, + Targets: targets, + } + + return s.jobs.Enqueue("osbuild", job, nil) +} + +func (s *Server) JobStatus(id uuid.UUID) (state common.ComposeState, queued, started, finished time.Time, err error) { + var result OSBuildJobResult + var status jobqueue.JobStatus + + status, queued, started, finished, err = s.jobs.JobStatus(id, &result) + if err != nil { + return + } + + state = composeStateFromJobStatus(status, result.OSBuildOutput) + return +} + +func (s *Server) JobResult(id uuid.UUID) (common.ComposeState, *common.ComposeResult, error) { + var result OSBuildJobResult + status, _, _, _, err := s.jobs.JobStatus(id, &result) + if err != nil { + return common.CWaiting, nil, err + } + + return composeStateFromJobStatus(status, result.OSBuildOutput), result.OSBuildOutput, nil +} + // jsonErrorf() is similar to http.Error(), but returns the message in a json // object with a "message" field. func jsonErrorf(writer http.ResponseWriter, code int, message string, args ...interface{}) { @@ -92,15 +134,19 @@ func (s *Server) addJobHandler(writer http.ResponseWriter, request *http.Request return } - nextJob := s.store.PopJob() + var job OSBuildJob + id, err := s.jobs.Dequeue(request.Context(), []string{"osbuild"}, &job) + if err != nil { + jsonErrorf(writer, http.StatusInternalServerError, "%v", err) + return + } writer.WriteHeader(http.StatusCreated) // FIXME: handle or comment this possible error _ = json.NewEncoder(writer).Encode(addJobResponse{ - ComposeID: nextJob.ComposeID, - ImageBuildID: nextJob.ImageBuildID, - Manifest: nextJob.Manifest, - Targets: nextJob.Targets, + Id: id, + Manifest: job.Manifest, + Targets: job.Targets, }) } @@ -117,13 +163,6 @@ func (s *Server) updateJobHandler(writer http.ResponseWriter, request *http.Requ return } - imageBuildId, err := strconv.Atoi(params.ByName("build_id")) - - if err != nil { - jsonErrorf(writer, http.StatusBadRequest, "cannot parse image build id: %v", err) - return - } - var body updateJobRequest err = json.NewDecoder(request.Body).Decode(&body) if err != nil { @@ -131,13 +170,21 @@ func (s *Server) updateJobHandler(writer http.ResponseWriter, request *http.Requ return } - err = s.store.UpdateImageBuildInCompose(id, imageBuildId, body.Status, body.Result) + // The jobqueue doesn't support setting the status before a job is + // finished. This branch should never be hit, because the worker + // doesn't attempt this. Change the API to remove this awkwardness. + if body.Status != common.IBFinished && body.Status != common.IBFailed { + jsonErrorf(writer, http.StatusBadRequest, "setting status of a job to waiting or running is not supported") + return + } + + err = s.jobs.FinishJob(id, OSBuildJobResult{OSBuildOutput: body.Result}) if err != nil { - switch err.(type) { - case *store.NotFoundError, *store.NotPendingError: - jsonErrorf(writer, http.StatusNotFound, "%v", err) - case *store.NotRunningError, *store.InvalidRequestError: - jsonErrorf(writer, http.StatusBadRequest, "%v", err) + switch err { + case jobqueue.ErrNotExist: + jsonErrorf(writer, http.StatusNotFound, "job does not exist: %s", id) + case jobqueue.ErrNotRunning: + jsonErrorf(writer, http.StatusBadRequest, "job is not running: %s", id) default: jsonErrorf(writer, http.StatusInternalServerError, "%v", err) } @@ -155,23 +202,33 @@ func (s *Server) addJobImageHandler(writer http.ResponseWriter, request *http.Re } imageBuildId, err := strconv.Atoi(params.ByName("build_id")) - if err != nil { jsonErrorf(writer, http.StatusBadRequest, "cannot parse image build id: %v", err) return } - err = s.store.AddImageToImageUpload(id, imageBuildId, request.Body) - + if s.imageWriter == nil { + _, err = io.Copy(ioutil.Discard, request.Body) + } else { + err = s.imageWriter(id, imageBuildId, request.Body) + } if err != nil { - switch err.(type) { - case *store.NotFoundError: - jsonErrorf(writer, http.StatusNotFound, "%v", err) - case *store.NoLocalTargetError: - jsonErrorf(writer, http.StatusBadRequest, "%v", err) - default: - jsonErrorf(writer, http.StatusInternalServerError, "%v", err) - } - return + jsonErrorf(writer, http.StatusInternalServerError, "%v", err) } } + +func composeStateFromJobStatus(status jobqueue.JobStatus, output *common.ComposeResult) common.ComposeState { + switch status { + case jobqueue.JobPending: + return common.CWaiting + case jobqueue.JobRunning: + return common.CRunning + case jobqueue.JobFinished: + if output.Success { + return common.CFinished + } else { + return common.CFailed + } + } + return common.CWaiting +} diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go index 5a28d54ac..494d0b9d7 100644 --- a/internal/worker/server_test.go +++ b/internal/worker/server_test.go @@ -5,10 +5,10 @@ import ( "testing" "github.com/google/uuid" + "github.com/stretchr/testify/require" - "github.com/osbuild/osbuild-composer/internal/blueprint" "github.com/osbuild/osbuild-composer/internal/distro/fedoratest" - "github.com/osbuild/osbuild-composer/internal/store" + "github.com/osbuild/osbuild-composer/internal/jobqueue/testjobqueue" "github.com/osbuild/osbuild-composer/internal/test" "github.com/osbuild/osbuild-composer/internal/worker" ) @@ -27,15 +27,15 @@ func TestErrors(t *testing.T) { // Wrong method {"GET", "/job-queue/v1/jobs", ``, http.StatusMethodNotAllowed}, // Update job with invalid ID - {"PATCH", "/job-queue/v1/jobs/foo/builds/0", `{"status":"RUNNING"}`, http.StatusBadRequest}, + {"PATCH", "/job-queue/v1/jobs/foo", `{"status":"FINISHED"}`, http.StatusBadRequest}, // Update job that does not exist, with invalid body - {"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa/builds/0", ``, http.StatusBadRequest}, + {"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", ``, http.StatusBadRequest}, // Update job that does not exist - {"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa/builds/0", `{"status":"RUNNING"}`, http.StatusNotFound}, + {"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", `{"status":"FINISHED"}`, http.StatusNotFound}, } for _, c := range cases { - server := worker.NewServer(nil, store.New(nil)) + server := worker.NewServer(nil, testjobqueue.New(), nil) test.TestRoute(t, server, false, c.Method, c.Path, c.Body, c.ExpectedStatus, "{}", "message") } } @@ -50,21 +50,18 @@ func TestCreate(t *testing.T) { if err != nil { t.Fatalf("error getting image type from arch") } - store := store.New(nil) - server := worker.NewServer(nil, store) + server := worker.NewServer(nil, testjobqueue.New(), nil) manifest, err := imageType.Manifest(nil, nil, nil, nil, imageType.Size(0)) if err != nil { t.Fatalf("error creating osbuild manifest") } - id, err := store.PushCompose(manifest, imageType.Name(), &blueprint.Blueprint{}, 0, nil) - if err != nil { - t.Fatalf("error pushing compose: %v", err) - } + id, err := server.Enqueue(manifest, nil) + require.NoError(t, err) test.TestRoute(t, server, false, "POST", "/job-queue/v1/jobs", `{}`, http.StatusCreated, - `{"compose_id":"`+id.String()+`","image_build_id":0,"manifest":{"sources":{},"pipeline":{}},"targets":[]}`, "created") + `{"id":"`+id.String()+`","manifest":{"sources":{},"pipeline":{}}}`, "created") } func testUpdateTransition(t *testing.T, from, to string, expectedStatus int) { @@ -77,8 +74,7 @@ func testUpdateTransition(t *testing.T, from, to string, expectedStatus int) { if err != nil { t.Fatalf("error getting image type from arch") } - store := store.New(nil) - server := worker.NewServer(nil, store) + server := worker.NewServer(nil, testjobqueue.New(), nil) id := uuid.Nil if from != "VOID" { @@ -87,20 +83,18 @@ func testUpdateTransition(t *testing.T, from, to string, expectedStatus int) { t.Fatalf("error creating osbuild manifest") } - id, err = store.PushCompose(manifest, imageType.Name(), &blueprint.Blueprint{}, 0, nil) - if err != nil { - t.Fatalf("error pushing compose: %v", err) - } + id, err = server.Enqueue(manifest, nil) + require.NoError(t, err) if from != "WAITING" { test.SendHTTP(server, false, "POST", "/job-queue/v1/jobs", `{}`) if from != "RUNNING" { - test.SendHTTP(server, false, "PATCH", "/job-queue/v1/jobs/"+id.String()+"/builds/0", `{"status":"`+from+`"}`) + test.SendHTTP(server, false, "PATCH", "/job-queue/v1/jobs/"+id.String(), `{"status":"`+from+`"}`) } } } - test.TestRoute(t, server, false, "PATCH", "/job-queue/v1/jobs/"+id.String()+"/builds/0", `{"status":"`+to+`"}`, expectedStatus, "{}", "message") + test.TestRoute(t, server, false, "PATCH", "/job-queue/v1/jobs/"+id.String(), `{"status":"`+to+`"}`, expectedStatus, "{}", "message") } func TestUpdate(t *testing.T) { @@ -109,16 +103,16 @@ func TestUpdate(t *testing.T) { To string ExpectedStatus int }{ - {"VOID", "WAITING", http.StatusNotFound}, - {"VOID", "RUNNING", http.StatusNotFound}, + {"VOID", "WAITING", http.StatusBadRequest}, + {"VOID", "RUNNING", http.StatusBadRequest}, {"VOID", "FINISHED", http.StatusNotFound}, {"VOID", "FAILED", http.StatusNotFound}, - {"WAITING", "WAITING", http.StatusNotFound}, - {"WAITING", "RUNNING", http.StatusNotFound}, - {"WAITING", "FINISHED", http.StatusNotFound}, - {"WAITING", "FAILED", http.StatusNotFound}, + {"WAITING", "WAITING", http.StatusBadRequest}, + {"WAITING", "RUNNING", http.StatusBadRequest}, + {"WAITING", "FINISHED", http.StatusBadRequest}, + {"WAITING", "FAILED", http.StatusBadRequest}, {"RUNNING", "WAITING", http.StatusBadRequest}, - {"RUNNING", "RUNNING", http.StatusOK}, + {"RUNNING", "RUNNING", http.StatusBadRequest}, {"RUNNING", "FINISHED", http.StatusOK}, {"RUNNING", "FAILED", http.StatusOK}, {"FINISHED", "WAITING", http.StatusBadRequest}, @@ -132,6 +126,7 @@ func TestUpdate(t *testing.T) { } for _, c := range cases { + t.Log(c) testUpdateTransition(t, c.From, c.To, c.ExpectedStatus) } }