store: move queue out of the store

The store is responsible for two things: user state and the compose queue. This
is problematic, because the rcm API has slightly different semantics from weldr
and only used the queue part of the store. Also, the store is simply too
complex.

This commit splits the queue part out, using the new jobqueue package in both
the weldr and the rcm package. The queue is saved to a new directory `queue/`.

The weldr package now also has access to a worker server to enqueue and list
jobs. Its store continues to track composes, but the `QueueStatus` for each
compose (and image build) is deprecated. The field in `ImageBuild` is kept for
backwards compatibility for composes which finished before this change, but a
lot of code dealing with it in package compose is dropped.

store.PushCompose() is degraded to storing a new compose. It should probably be
renamed in the future. store.PopJob() is removed.

Job ids are now independent of compose ids. Because of that, the local
target gains ComposeId and ImageBuildId fields, because a worker cannot
infer those from a job anymore. This also necessitates a change in the
worker API: the job routes are changed to expect that instead of a
(compose id, image build id) pair. The route that accepts built images
keeps that pair, because it reports the image back to weldr.

worker.Server() now interacts with a job queue instead of the store. It gains
public functions that allow enqueuing an osbuild job and getting its status,
because only it knows about the specific argument and result types in the job
queue (OSBuildJob and OSBuildJobResult). One oddity remains: it needs to report
an uploaded image to weldr. Do this with a function that's passed in for now,
so that the dependency to the store can be dropped completely.

The rcm API drops its dependencies to package blueprint and store, because it
too interacts only with the worker server now.

Fixes #342
This commit is contained in:
Lars Karlitski 2020-05-03 17:44:22 +02:00 committed by Tom Gundersen
parent 64011e3cba
commit b5769add2c
18 changed files with 415 additions and 443 deletions

View file

@ -15,6 +15,7 @@ import (
"github.com/osbuild/osbuild-composer/internal/distro/rhel81" "github.com/osbuild/osbuild-composer/internal/distro/rhel81"
"github.com/osbuild/osbuild-composer/internal/distro/rhel82" "github.com/osbuild/osbuild-composer/internal/distro/rhel82"
"github.com/osbuild/osbuild-composer/internal/distro/rhel83" "github.com/osbuild/osbuild-composer/internal/distro/rhel83"
"github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue"
"github.com/osbuild/osbuild-composer/internal/rcm" "github.com/osbuild/osbuild-composer/internal/rcm"
"github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/common"
@ -118,11 +119,28 @@ func main() {
store := store.New(&stateDir) store := store.New(&stateDir)
workerAPI := worker.NewServer(logger, store) queueDir := path.Join(stateDir, "jobs")
weldrAPI := weldr.New(rpm, arch, distribution, repoMap[common.CurrentArch()], logger, store) err = os.Mkdir(queueDir, 0700)
if err != nil && !os.IsExist(err) {
log.Fatalf("cannot create queue directory: %v", err)
}
jobs, err := fsjobqueue.New(queueDir)
if err != nil {
log.Fatalf("cannot create jobqueue: %v", err)
}
outputDir := path.Join(stateDir, "outputs")
err = os.Mkdir(outputDir, 0755)
if err != nil && !os.IsExist(err) {
log.Fatalf("cannot create output directory: %v", err)
}
workers := worker.NewServer(logger, jobs, store.AddImageToImageUpload)
weldrAPI := weldr.New(rpm, arch, distribution, repoMap[common.CurrentArch()], logger, store, workers)
go func() { go func() {
err := workerAPI.Serve(jobListener) err := workers.Serve(jobListener)
common.PanicOnError(err) common.PanicOnError(err)
}() }()
@ -133,7 +151,7 @@ func main() {
log.Fatal("The RCM API socket unit is misconfigured. It should contain only one socket.") log.Fatal("The RCM API socket unit is misconfigured. It should contain only one socket.")
} }
rcmListener := rcmApiListeners[0] rcmListener := rcmApiListeners[0]
rcmAPI := rcm.New(logger, store, rpm, distros) rcmAPI := rcm.New(logger, workers, rpm, distros)
go func() { go func() {
err := rcmAPI.Serve(rcmListener) err := rcmAPI.Serve(rcmListener)
// If the RCM API fails, take down the whole process, not just a single gorutine // If the RCM API fails, take down the whole process, not just a single gorutine
@ -158,7 +176,7 @@ func main() {
listener := tls.NewListener(listener, tlsConfig) listener := tls.NewListener(listener, tlsConfig)
go func() { go func() {
err := workerAPI.Serve(listener) err := workers.Serve(listener)
common.PanicOnError(err) common.PanicOnError(err)
}() }()
} }

View file

@ -12,6 +12,7 @@ import (
"os" "os"
"path" "path"
"github.com/google/uuid"
"github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/target" "github.com/osbuild/osbuild-composer/internal/target"
"github.com/osbuild/osbuild-composer/internal/upload/awsupload" "github.com/osbuild/osbuild-composer/internal/upload/awsupload"
@ -62,7 +63,7 @@ func (e *TargetsError) Error() string {
return errString return errString
} }
func RunJob(job *worker.Job, uploadFunc func(*worker.Job, io.Reader) error) (*common.ComposeResult, error) { func RunJob(job *worker.Job, uploadFunc func(uuid.UUID, int, io.Reader) error) (*common.ComposeResult, error) {
tmpStore, err := ioutil.TempDir("/var/tmp", "osbuild-store") tmpStore, err := ioutil.TempDir("/var/tmp", "osbuild-store")
if err != nil { if err != nil {
return nil, fmt.Errorf("error setting up osbuild store: %v", err) return nil, fmt.Errorf("error setting up osbuild store: %v", err)
@ -86,7 +87,7 @@ func RunJob(job *worker.Job, uploadFunc func(*worker.Job, io.Reader) error) (*co
continue continue
} }
err = uploadFunc(job, f) err = uploadFunc(options.ComposeId, options.ImageBuildId, f)
if err != nil { if err != nil {
r = append(r, err) r = append(r, err)
continue continue
@ -101,7 +102,7 @@ func RunJob(job *worker.Job, uploadFunc func(*worker.Job, io.Reader) error) (*co
} }
if options.Key == "" { if options.Key == "" {
options.Key = job.ComposeID.String() options.Key = job.Id.String()
} }
_, err = a.Upload(path.Join(tmpStore, "refs", result.OutputID, options.Filename), options.Bucket, options.Key) _, err = a.Upload(path.Join(tmpStore, "refs", result.OutputID, options.Filename), options.Bucket, options.Key)
@ -191,7 +192,7 @@ func main() {
log.Fatal(err) log.Fatal(err)
} }
fmt.Printf("Running job %s\n", job.ComposeID.String()) fmt.Printf("Running job %s\n", job.Id)
var status common.ImageBuildState var status common.ImageBuildState
result, err := RunJob(job, client.UploadImage) result, err := RunJob(job, client.UploadImage)

View file

@ -47,7 +47,7 @@ func executeTests(m *testing.M) int {
} }
repos := []rpmmd.RepoConfig{{Id: "test-system-repo", BaseURL: "http://example.com/test/os/test_arch"}} repos := []rpmmd.RepoConfig{{Id: "test-system-repo", BaseURL: "http://example.com/test/os/test_arch"}}
logger := log.New(os.Stdout, "", 0) logger := log.New(os.Stdout, "", 0)
api := weldr.New(rpm, arch, distro, repos, logger, fixture.Store) api := weldr.New(rpm, arch, distro, repos, logger, fixture.Store, fixture.Workers)
server := http.Server{Handler: api} server := http.Server{Handler: api}
defer server.Close() defer server.Close()

View file

@ -5,6 +5,7 @@ package compose
import ( import (
"time" "time"
"github.com/google/uuid"
"github.com/osbuild/osbuild-composer/internal/blueprint" "github.com/osbuild/osbuild-composer/internal/blueprint"
"github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/osbuild" "github.com/osbuild/osbuild-composer/internal/osbuild"
@ -21,15 +22,20 @@ func (ste *StateTransitionError) Error() string {
// ImageBuild represents a single image build inside a compose // ImageBuild represents a single image build inside a compose
type ImageBuild struct { type ImageBuild struct {
Id int `json:"id"` Id int `json:"id"`
QueueStatus common.ImageBuildState `json:"queue_status"` ImageType common.ImageType `json:"image_type"`
ImageType common.ImageType `json:"image_type"` Manifest *osbuild.Manifest `json:"manifest"`
Manifest *osbuild.Manifest `json:"manifest"` Targets []*target.Target `json:"targets"`
Targets []*target.Target `json:"targets"` JobCreated time.Time `json:"job_created"`
JobCreated time.Time `json:"job_created"` JobStarted time.Time `json:"job_started"`
JobStarted time.Time `json:"job_started"` JobFinished time.Time `json:"job_finished"`
JobFinished time.Time `json:"job_finished"` Size uint64 `json:"size"`
Size uint64 `json:"size"` JobId uuid.UUID `json:"jobid,omitempty"`
// Kept for backwards compatibility. Image builds which were done
// before the move to the job queue use this to store whether they
// finished successfully.
QueueStatus common.ImageBuildState `json:"queue_status,omitempty"`
} }
// DeepCopy creates a copy of the ImageBuild structure // DeepCopy creates a copy of the ImageBuild structure
@ -55,6 +61,7 @@ func (ib *ImageBuild) DeepCopy() ImageBuild {
JobStarted: ib.JobStarted, JobStarted: ib.JobStarted,
JobFinished: ib.JobFinished, JobFinished: ib.JobFinished,
Size: ib.Size, Size: ib.Size,
JobId: ib.JobId,
} }
} }
@ -94,55 +101,6 @@ func (c *Compose) DeepCopy() Compose {
} }
} }
func anyImageBuild(fn func(common.ImageBuildState) bool, list []common.ImageBuildState) bool {
acc := false
for _, i := range list {
if fn(i) {
acc = true
}
}
return acc
}
func allImageBuilds(fn func(common.ImageBuildState) bool, list []common.ImageBuildState) bool {
acc := true
for _, i := range list {
if !fn(i) {
acc = false
}
}
return acc
}
// GetState returns a state of the whole compose which is derived from the states of
// individual image builds inside the compose
func (c *Compose) GetState() common.ComposeState {
var imageBuildsStates []common.ImageBuildState
for _, ib := range c.ImageBuilds {
imageBuildsStates = append(imageBuildsStates, ib.QueueStatus)
}
// In case all states are the same
if allImageBuilds(func(ib common.ImageBuildState) bool { return ib == common.IBWaiting }, imageBuildsStates) {
return common.CWaiting
}
if allImageBuilds(func(ib common.ImageBuildState) bool { return ib == common.IBFinished }, imageBuildsStates) {
return common.CFinished
}
if allImageBuilds(func(ib common.ImageBuildState) bool { return ib == common.IBFailed }, imageBuildsStates) {
return common.CFailed
}
// In case the states are mixed
// TODO: can this condition be removed because it is already covered by the default?
if anyImageBuild(func(ib common.ImageBuildState) bool { return ib == common.IBRunning }, imageBuildsStates) {
return common.CRunning
}
if allImageBuilds(func(ib common.ImageBuildState) bool { return ib == common.IBFailed || ib == common.IBFinished }, imageBuildsStates) {
return common.CFailed
}
// Default value
return common.CRunning
}
// UpdateState changes a state of a single image build inside the Compose // UpdateState changes a state of a single image build inside the Compose
func (c *Compose) UpdateState(imageBuildId int, newState common.ImageBuildState) error { func (c *Compose) UpdateState(imageBuildId int, newState common.ImageBuildState) error {
switch newState { switch newState {

View file

@ -1,125 +0,0 @@
package compose
import (
"github.com/osbuild/osbuild-composer/internal/common"
"testing"
)
func TestGetState(t *testing.T) {
cases := []struct {
compose Compose
expecedStatus common.ComposeState
}{
{
compose: Compose{
ImageBuilds: []ImageBuild{
{QueueStatus: common.IBWaiting},
},
},
expecedStatus: common.CWaiting,
},
{
compose: Compose{
ImageBuilds: []ImageBuild{
{QueueStatus: common.IBRunning},
},
},
expecedStatus: common.CRunning,
},
{
compose: Compose{
ImageBuilds: []ImageBuild{
{QueueStatus: common.IBFailed},
},
},
expecedStatus: common.CFailed,
},
{
compose: Compose{
ImageBuilds: []ImageBuild{
{QueueStatus: common.IBFinished},
},
},
expecedStatus: common.CFinished,
},
{
compose: Compose{
ImageBuilds: []ImageBuild{
{QueueStatus: common.IBWaiting},
{QueueStatus: common.IBWaiting},
},
},
expecedStatus: common.CWaiting,
},
{
compose: Compose{
ImageBuilds: []ImageBuild{
{QueueStatus: common.IBWaiting},
{QueueStatus: common.IBRunning},
},
},
expecedStatus: common.CRunning,
},
{
compose: Compose{
ImageBuilds: []ImageBuild{
{QueueStatus: common.IBRunning},
{QueueStatus: common.IBRunning},
},
},
expecedStatus: common.CRunning,
},
{
compose: Compose{
ImageBuilds: []ImageBuild{
{QueueStatus: common.IBRunning},
{QueueStatus: common.IBFailed},
},
},
expecedStatus: common.CRunning,
},
{
compose: Compose{
ImageBuilds: []ImageBuild{
{QueueStatus: common.IBWaiting},
{QueueStatus: common.IBFailed},
},
},
expecedStatus: common.CRunning,
},
{
compose: Compose{
ImageBuilds: []ImageBuild{
{QueueStatus: common.IBFailed},
{QueueStatus: common.IBFailed},
},
},
expecedStatus: common.CFailed,
},
{
compose: Compose{
ImageBuilds: []ImageBuild{
{QueueStatus: common.IBFinished},
{QueueStatus: common.IBFinished},
},
},
expecedStatus: common.CFinished,
},
{
compose: Compose{
ImageBuilds: []ImageBuild{
{QueueStatus: common.IBFinished},
{QueueStatus: common.IBFailed},
},
},
expecedStatus: common.CFailed,
},
}
for n, c := range cases {
got := c.compose.GetState()
wanted := c.expecedStatus
if got != wanted {
t.Error("Compose", n, "should be in", wanted.ToString(), "state, but it is:", got.ToString())
}
}
}

View file

@ -7,6 +7,8 @@ import (
"github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/compose" "github.com/osbuild/osbuild-composer/internal/compose"
"github.com/osbuild/osbuild-composer/internal/jobqueue/testjobqueue"
"github.com/osbuild/osbuild-composer/internal/worker"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/osbuild/osbuild-composer/internal/blueprint" "github.com/osbuild/osbuild-composer/internal/blueprint"
@ -150,6 +152,10 @@ func createBaseStoreFixture() *store.Store {
return s return s
} }
func createBaseWorkersFixture() *worker.Server {
return worker.NewServer(nil, testjobqueue.New(), nil)
}
func createBaseDepsolveFixture() []rpmmd.PackageSpec { func createBaseDepsolveFixture() []rpmmd.PackageSpec {
return []rpmmd.PackageSpec{ return []rpmmd.PackageSpec{
{ {
@ -207,6 +213,7 @@ func BaseFixture() Fixture {
nil, nil,
}, },
createBaseStoreFixture(), createBaseStoreFixture(),
createBaseWorkersFixture(),
} }
} }
@ -223,6 +230,7 @@ func NoComposesFixture() Fixture {
nil, nil,
}, },
createStoreWithoutComposesFixture(), createStoreWithoutComposesFixture(),
createBaseWorkersFixture(),
} }
} }
@ -242,6 +250,7 @@ func NonExistingPackage() Fixture {
}, },
}, },
createBaseStoreFixture(), createBaseStoreFixture(),
createBaseWorkersFixture(),
} }
} }
@ -261,6 +270,7 @@ func BadDepsolve() Fixture {
}, },
}, },
createBaseStoreFixture(), createBaseStoreFixture(),
createBaseWorkersFixture(),
} }
} }
@ -283,5 +293,6 @@ func BadFetch() Fixture {
}, },
}, },
createBaseStoreFixture(), createBaseStoreFixture(),
createBaseWorkersFixture(),
} }
} }

View file

@ -3,6 +3,7 @@ package rpmmd_mock
import ( import (
"github.com/osbuild/osbuild-composer/internal/rpmmd" "github.com/osbuild/osbuild-composer/internal/rpmmd"
"github.com/osbuild/osbuild-composer/internal/store" "github.com/osbuild/osbuild-composer/internal/store"
"github.com/osbuild/osbuild-composer/internal/worker"
) )
type fetchPackageList struct { type fetchPackageList struct {
@ -20,6 +21,7 @@ type Fixture struct {
fetchPackageList fetchPackageList
depsolve depsolve
*store.Store *store.Store
Workers *worker.Server
} }
type rpmmdMock struct { type rpmmdMock struct {

View file

@ -10,19 +10,18 @@ import (
"net/http" "net/http"
"github.com/osbuild/osbuild-composer/internal/rpmmd" "github.com/osbuild/osbuild-composer/internal/rpmmd"
"github.com/osbuild/osbuild-composer/internal/worker"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/julienschmidt/httprouter" "github.com/julienschmidt/httprouter"
"github.com/osbuild/osbuild-composer/internal/blueprint"
"github.com/osbuild/osbuild-composer/internal/distro" "github.com/osbuild/osbuild-composer/internal/distro"
"github.com/osbuild/osbuild-composer/internal/store"
) )
// API encapsulates RCM-specific API that is exposed over a separate TCP socket // API encapsulates RCM-specific API that is exposed over a separate TCP socket
type API struct { type API struct {
logger *log.Logger logger *log.Logger
store *store.Store workers *worker.Server
router *httprouter.Router router *httprouter.Router
// rpmMetadata is an interface to dnf-json and we include it here so that we can // rpmMetadata is an interface to dnf-json and we include it here so that we can
// mock it in the unit tests // mock it in the unit tests
rpmMetadata rpmmd.RPMMD rpmMetadata rpmmd.RPMMD
@ -30,10 +29,10 @@ type API struct {
} }
// New creates new RCM API // New creates new RCM API
func New(logger *log.Logger, store *store.Store, rpmMetadata rpmmd.RPMMD, distros *distro.Registry) *API { func New(logger *log.Logger, workers *worker.Server, rpmMetadata rpmmd.RPMMD, distros *distro.Registry) *API {
api := &API{ api := &API{
logger: logger, logger: logger,
store: store, workers: workers,
router: httprouter.New(), router: httprouter.New(),
rpmMetadata: rpmMetadata, rpmMetadata: rpmMetadata,
distros: distros, distros: distros,
@ -215,8 +214,7 @@ func (api *API) submit(writer http.ResponseWriter, request *http.Request, _ http
return return
} }
// Push the requested compose to the store composeID, err := api.workers.Enqueue(manifest, nil)
composeID, err := api.store.PushCompose(manifest, imageType.Name(), &blueprint.Blueprint{}, 0, nil)
if err != nil { if err != nil {
if api.logger != nil { if api.logger != nil {
api.logger.Println("RCM API failed to push compose:", err) api.logger.Println("RCM API failed to push compose:", err)
@ -254,22 +252,20 @@ func (api *API) status(writer http.ResponseWriter, request *http.Request, params
} }
// Check that the compose exists // Check that the compose exists
compose, exists := api.store.GetCompose(id) status, _, err := api.workers.JobResult(id)
if !exists { if err != nil {
writer.WriteHeader(http.StatusBadRequest) writer.WriteHeader(http.StatusBadRequest)
errorReason.Error = "Compose UUID does not exist" errorReason.Error = err.Error()
// TODO: handle error // TODO: handle error
_ = json.NewEncoder(writer).Encode(errorReason) _ = json.NewEncoder(writer).Encode(errorReason)
return return
} }
// JSON structure with success response // JSON structure with success response
var reply struct { type reply struct {
Status string `json:"status"` Status string `json:"status"`
} }
// TODO: return per-job status like Koji does (requires changes in the store)
reply.Status = compose.GetState().ToString()
// TODO: handle error // TODO: handle error
_ = json.NewEncoder(writer).Encode(reply) _ = json.NewEncoder(writer).Encode(reply{Status: status.ToString()})
} }

View file

@ -6,15 +6,18 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os"
"regexp" "regexp"
"testing" "testing"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/osbuild/osbuild-composer/internal/jobqueue/testjobqueue"
distro_mock "github.com/osbuild/osbuild-composer/internal/mocks/distro" distro_mock "github.com/osbuild/osbuild-composer/internal/mocks/distro"
rpmmd_mock "github.com/osbuild/osbuild-composer/internal/mocks/rpmmd" rpmmd_mock "github.com/osbuild/osbuild-composer/internal/mocks/rpmmd"
"github.com/osbuild/osbuild-composer/internal/rcm" "github.com/osbuild/osbuild-composer/internal/rcm"
"github.com/osbuild/osbuild-composer/internal/store" "github.com/osbuild/osbuild-composer/internal/worker"
) )
type API interface { type API interface {
@ -30,6 +33,21 @@ func internalRequest(api API, method, path, body, contentType string) *http.Resp
return resp.Result() return resp.Result()
} }
func newTestWorkerServer(t *testing.T) (*worker.Server, string) {
dir, err := ioutil.TempDir("", "rcm-test-")
require.NoError(t, err)
w := worker.NewServer(nil, testjobqueue.New(), nil)
require.NotNil(t, w)
return w, dir
}
func cleanupTempDir(t *testing.T, dir string) {
err := os.RemoveAll(dir)
require.NoError(t, err)
}
func TestBasicRcmAPI(t *testing.T) { func TestBasicRcmAPI(t *testing.T) {
// Test the HTTP API responses // Test the HTTP API responses
// This test mainly focuses on HTTP status codes and JSON structures, not necessarily on their content // This test mainly focuses on HTTP status codes and JSON structures, not necessarily on their content
@ -47,14 +65,18 @@ func TestBasicRcmAPI(t *testing.T) {
{"POST", "/v1/compose", `{"status":"RUNNING"}`, "text/plain", http.StatusBadRequest, ``}, {"POST", "/v1/compose", `{"status":"RUNNING"}`, "text/plain", http.StatusBadRequest, ``},
{"POST", "/v1/compose", `{"image_builds":[]}`, "application/json", http.StatusBadRequest, ""}, {"POST", "/v1/compose", `{"image_builds":[]}`, "application/json", http.StatusBadRequest, ""},
{"POST", "/v1/compose/111-222-333", `{"status":"RUNNING"}`, "application/json", http.StatusMethodNotAllowed, ``}, {"POST", "/v1/compose/111-222-333", `{"status":"RUNNING"}`, "application/json", http.StatusMethodNotAllowed, ``},
{"GET", "/v1/compose/7802c476-9cd1-41b7-ba81-43c1906bce73", `{"status":"RUNNING"}`, "application/json", http.StatusBadRequest, `{"error_reason":"Compose UUID does not exist"}`}, {"GET", "/v1/compose/7802c476-9cd1-41b7-ba81-43c1906bce73", `{"status":"RUNNING"}`, "application/json", http.StatusBadRequest, ``},
} }
registry, err := distro_mock.NewDefaultRegistry() registry, err := distro_mock.NewDefaultRegistry()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
api := rcm.New(nil, store.New(nil), rpmmd_mock.NewRPMMDMock(rpmmd_mock.BaseFixture()), registry)
workers, dir := newTestWorkerServer(t)
defer cleanupTempDir(t, dir)
api := rcm.New(nil, workers, rpmmd_mock.NewRPMMDMock(rpmmd_mock.BaseFixture()), registry)
for _, c := range cases { for _, c := range cases {
resp := internalRequest(api, c.Method, c.Path, c.Body, c.ContentType) resp := internalRequest(api, c.Method, c.Path, c.Body, c.ContentType)
@ -79,7 +101,11 @@ func TestSubmit(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
api := rcm.New(nil, store.New(nil), rpmmd_mock.NewRPMMDMock(rpmmd_mock.BaseFixture()), registry)
workers, dir := newTestWorkerServer(t)
defer cleanupTempDir(t, dir)
api := rcm.New(nil, workers, rpmmd_mock.NewRPMMDMock(rpmmd_mock.BaseFixture()), registry)
var submit_reply struct { var submit_reply struct {
UUID uuid.UUID `json:"compose_id"` UUID uuid.UUID `json:"compose_id"`
@ -207,7 +233,11 @@ func TestStatus(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
api := rcm.New(nil, store.New(nil), rpmmd_mock.NewRPMMDMock(rpmmd_mock.BaseFixture()), registry)
workers, dir := newTestWorkerServer(t)
defer cleanupTempDir(t, dir)
api := rcm.New(nil, workers, rpmmd_mock.NewRPMMDMock(rpmmd_mock.BaseFixture()), registry)
var submit_reply struct { var submit_reply struct {
UUID uuid.UUID `json:"compose_id"` UUID uuid.UUID `json:"compose_id"`

View file

@ -84,14 +84,6 @@ func (e *NotPendingError) Error() string {
return e.message return e.message
} }
type NotRunningError struct {
message string
}
func (e *NotRunningError) Error() string {
return e.message
}
type InvalidRequestError struct { type InvalidRequestError struct {
message string message string
} }
@ -136,27 +128,23 @@ func New(stateDir *string) *Store {
if s.Composes == nil { if s.Composes == nil {
s.Composes = make(map[uuid.UUID]compose.Compose) s.Composes = make(map[uuid.UUID]compose.Compose)
} else { } else {
// Backwards compatibility: fail all builds that are queued or
// running. Jobs status is now handled outside of the store
// (and the compose). The fields are kept so that previously
// succeeded builds still show up correctly.
for composeID, compose := range s.Composes { for composeID, compose := range s.Composes {
if len(compose.ImageBuilds) == 0 { if len(compose.ImageBuilds) == 0 {
panic("the was a compose with zero image builds, that is forbidden") panic("the was a compose with zero image builds, that is forbidden")
} }
for imgID, imgBuild := range compose.ImageBuilds { for imgID, imgBuild := range compose.ImageBuilds {
switch imgBuild.QueueStatus { switch imgBuild.QueueStatus {
case common.IBRunning: case common.IBRunning, common.IBWaiting:
// We do not support resuming an in-flight build
compose.ImageBuilds[imgID].QueueStatus = common.IBFailed compose.ImageBuilds[imgID].QueueStatus = common.IBFailed
s.Composes[composeID] = compose s.Composes[composeID] = compose
case common.IBWaiting:
// Push waiting composes back into the pending jobs queue
s.pendingJobs <- Job{
ComposeID: composeID,
ImageBuildID: imgBuild.Id,
Manifest: imgBuild.Manifest,
Targets: imgBuild.Targets,
}
} }
} }
} }
} }
if s.Sources == nil { if s.Sources == nil {
s.Sources = make(map[string]SourceConfig) s.Sources = make(map[string]SourceConfig)
@ -501,13 +489,15 @@ func (s *Store) getImageBuildDirectory(composeID uuid.UUID, imageBuildID int) st
return fmt.Sprintf("%s/%d", s.getComposeDirectory(composeID), imageBuildID) return fmt.Sprintf("%s/%d", s.getComposeDirectory(composeID), imageBuildID)
} }
func (s *Store) PushCompose(manifest *osbuild.Manifest, imageType distro.ImageType, bp *blueprint.Blueprint, size uint64, targets []*target.Target) (uuid.UUID, error) { func (s *Store) PushCompose(composeID uuid.UUID, manifest *osbuild.Manifest, imageType distro.ImageType, bp *blueprint.Blueprint, size uint64, targets []*target.Target, jobId uuid.UUID) error {
if _, exists := s.GetCompose(composeID); exists {
panic("a compose with this id already exists")
}
if targets == nil { if targets == nil {
targets = []*target.Target{} targets = []*target.Target{}
} }
composeID := uuid.New()
// Compatibility layer for image types in Weldr API v0 // Compatibility layer for image types in Weldr API v0
imageTypeCommon, exists := common.ImageTypeFromCompatString(imageType.Name()) imageTypeCommon, exists := common.ImageTypeFromCompatString(imageType.Name())
if !exists { if !exists {
@ -519,7 +509,7 @@ func (s *Store) PushCompose(manifest *osbuild.Manifest, imageType distro.ImageTy
err := os.MkdirAll(outputDir, 0755) err := os.MkdirAll(outputDir, 0755)
if err != nil { if err != nil {
return uuid.Nil, fmt.Errorf("cannot create output directory for job %v: %#v", composeID, err) return fmt.Errorf("cannot create output directory for job %v: %#v", composeID, err)
} }
} }
@ -529,37 +519,28 @@ func (s *Store) PushCompose(manifest *osbuild.Manifest, imageType distro.ImageTy
Blueprint: bp, Blueprint: bp,
ImageBuilds: []compose.ImageBuild{ ImageBuilds: []compose.ImageBuild{
{ {
QueueStatus: common.IBWaiting, Manifest: manifest,
Manifest: manifest, ImageType: imageTypeCommon,
ImageType: imageTypeCommon, Targets: targets,
Targets: targets, JobCreated: time.Now(),
JobCreated: time.Now(), Size: size,
Size: size, JobId: jobId,
}, },
}, },
} }
return nil return nil
}) })
s.pendingJobs <- Job{ return nil
ComposeID: composeID,
ImageBuildID: 0,
Manifest: manifest,
Targets: targets,
}
return composeID, nil
} }
// PushTestCompose is used for testing // PushTestCompose is used for testing
// Set testSuccess to create a fake successful compose, otherwise it will create a failed compose // Set testSuccess to create a fake successful compose, otherwise it will create a failed compose
// It does not actually run a compose job // It does not actually run a compose job
func (s *Store) PushTestCompose(manifest *osbuild.Manifest, imageType distro.ImageType, bp *blueprint.Blueprint, size uint64, targets []*target.Target, testSuccess bool) (uuid.UUID, error) { func (s *Store) PushTestCompose(composeID uuid.UUID, manifest *osbuild.Manifest, imageType distro.ImageType, bp *blueprint.Blueprint, size uint64, targets []*target.Target, testSuccess bool) error {
if targets == nil { if targets == nil {
targets = []*target.Target{} targets = []*target.Target{}
} }
composeID := uuid.New()
// Compatibility layer for image types in Weldr API v0 // Compatibility layer for image types in Weldr API v0
imageTypeCommon, exists := common.ImageTypeFromCompatString(imageType.Name()) imageTypeCommon, exists := common.ImageTypeFromCompatString(imageType.Name())
if !exists { if !exists {
@ -571,7 +552,7 @@ func (s *Store) PushTestCompose(manifest *osbuild.Manifest, imageType distro.Ima
err := os.MkdirAll(outputDir, 0755) err := os.MkdirAll(outputDir, 0755)
if err != nil { if err != nil {
return uuid.Nil, fmt.Errorf("cannot create output directory for job %v: %#v", composeID, err) return fmt.Errorf("cannot create output directory for job %v: %#v", composeID, err)
} }
} }
@ -607,33 +588,20 @@ func (s *Store) PushTestCompose(manifest *osbuild.Manifest, imageType distro.Ima
// Instead of starting the job, immediately set a final status // Instead of starting the job, immediately set a final status
err := s.UpdateImageBuildInCompose(composeID, 0, status, &result) err := s.UpdateImageBuildInCompose(composeID, 0, status, &result)
if err != nil { if err != nil {
return uuid.Nil, err return err
} }
return composeID, nil return nil
} }
// DeleteCompose deletes the compose from the state file and also removes all files on disk that are // DeleteCompose deletes the compose from the state file and also removes all files on disk that are
// associated with this compose // associated with this compose
func (s *Store) DeleteCompose(id uuid.UUID) error { func (s *Store) DeleteCompose(id uuid.UUID) error {
return s.change(func() error { return s.change(func() error {
compose, exists := s.Composes[id] if _, exists := s.Composes[id]; !exists {
if !exists {
return &NotFoundError{} return &NotFoundError{}
} }
// If any of the image builds have build artifacts, remove them
invalidRequest := true
for _, imageBuild := range compose.ImageBuilds {
if imageBuild.QueueStatus == common.IBFinished || imageBuild.QueueStatus == common.IBFailed {
invalidRequest = false
}
}
if invalidRequest {
return &InvalidRequestError{fmt.Sprintf("Compose %s is not in FINISHED or FAILED.", id)}
}
delete(s.Composes, id) delete(s.Composes, id)
var err error var err error
@ -648,31 +616,6 @@ func (s *Store) DeleteCompose(id uuid.UUID) error {
}) })
} }
// PopJob returns a job from the job queue and changes the status of the corresponding image build to running
func (s *Store) PopJob() Job {
job := <-s.pendingJobs
// FIXME: handle or comment this possible error
_ = s.change(func() error {
// Get the compose from the map
compose, exists := s.Composes[job.ComposeID]
// Check that it exists
if !exists {
panic("Invalid job in queue.")
}
// Change queue status to running for the image build as well as for the targets
compose.ImageBuilds[job.ImageBuildID].QueueStatus = common.IBRunning
compose.ImageBuilds[job.ImageBuildID].JobStarted = time.Now()
for m := range compose.ImageBuilds[job.ImageBuildID].Targets {
compose.ImageBuilds[job.ImageBuildID].Targets[m].Status = common.IBRunning
}
// Replace the compose struct with the new one
// TODO: I'm not sure this is needed, but I don't know what is the golang semantics in this case
s.Composes[job.ComposeID] = compose
return nil
})
return job
}
// UpdateImageBuildInCompose sets the status and optionally also the final image. // UpdateImageBuildInCompose sets the status and optionally also the final image.
func (s *Store) UpdateImageBuildInCompose(composeID uuid.UUID, imageBuildID int, status common.ImageBuildState, result *common.ComposeResult) error { func (s *Store) UpdateImageBuildInCompose(composeID uuid.UUID, imageBuildID int, status common.ImageBuildState, result *common.ComposeResult) error {
return s.change(func() error { return s.change(func() error {

View file

@ -1,7 +1,11 @@
package target package target
import "github.com/google/uuid"
type LocalTargetOptions struct { type LocalTargetOptions struct {
Filename string `json:"filename"` ComposeId uuid.UUID `json:"compose_id"`
ImageBuildId int `json:"image_build_id"`
Filename string `json:"filename"`
} }
func (LocalTargetOptions) isTargetOptions() {} func (LocalTargetOptions) isTargetOptions() {}

View file

@ -14,6 +14,7 @@ import (
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/BurntSushi/toml" "github.com/BurntSushi/toml"
"github.com/google/uuid" "github.com/google/uuid"
@ -21,14 +22,17 @@ import (
"github.com/osbuild/osbuild-composer/internal/blueprint" "github.com/osbuild/osbuild-composer/internal/blueprint"
"github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/compose"
"github.com/osbuild/osbuild-composer/internal/distro" "github.com/osbuild/osbuild-composer/internal/distro"
"github.com/osbuild/osbuild-composer/internal/rpmmd" "github.com/osbuild/osbuild-composer/internal/rpmmd"
"github.com/osbuild/osbuild-composer/internal/store" "github.com/osbuild/osbuild-composer/internal/store"
"github.com/osbuild/osbuild-composer/internal/target" "github.com/osbuild/osbuild-composer/internal/target"
"github.com/osbuild/osbuild-composer/internal/worker"
) )
type API struct { type API struct {
store *store.Store store *store.Store
workers *worker.Server
rpmmd rpmmd.RPMMD rpmmd rpmmd.RPMMD
arch distro.Arch arch distro.Arch
@ -39,14 +43,15 @@ type API struct {
router *httprouter.Router router *httprouter.Router
} }
func New(rpmmd rpmmd.RPMMD, arch distro.Arch, distro distro.Distro, repos []rpmmd.RepoConfig, logger *log.Logger, store *store.Store) *API { func New(rpmmd rpmmd.RPMMD, arch distro.Arch, distro distro.Distro, repos []rpmmd.RepoConfig, logger *log.Logger, store *store.Store, workers *worker.Server) *API {
api := &API{ api := &API{
store: store, store: store,
rpmmd: rpmmd, workers: workers,
arch: arch, rpmmd: rpmmd,
distro: distro, arch: arch,
repos: repos, distro: distro,
logger: logger, repos: repos,
logger: logger,
} }
api.router = httprouter.New() api.router = httprouter.New()
@ -135,6 +140,40 @@ func (api *API) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
api.router.ServeHTTP(writer, request) api.router.ServeHTTP(writer, request)
} }
// Returns the state of the image in `compose` and the times the job was
// queued, started, and finished. Assumes that there's only one image in the
// compose. Returns CWaiting on error.
func (api *API) getComposeState(compose compose.Compose) (state common.ComposeState, queued, started, finished time.Time) {
if len(compose.ImageBuilds) == 0 {
return
}
jobId := compose.ImageBuilds[0].JobId
// backwards compatibility: composes that were around before splitting
// the job queue from the store still contain their valid status and
// times. Return those here as a fallback.
if jobId == uuid.Nil {
switch compose.ImageBuilds[0].QueueStatus {
case common.IBWaiting:
state = common.CWaiting
case common.IBRunning:
state = common.CRunning
case common.IBFinished:
state = common.CFinished
case common.IBFailed:
state = common.CFailed
}
queued = compose.ImageBuilds[0].JobCreated
started = compose.ImageBuilds[0].JobStarted
finished = compose.ImageBuilds[0].JobFinished
return
}
state, queued, started, finished, _ = api.workers.JobStatus(jobId)
return
}
func verifyRequestVersion(writer http.ResponseWriter, params httprouter.Params, minVersion uint) bool { func verifyRequestVersion(writer http.ResponseWriter, params httprouter.Params, minVersion uint) bool {
versionString := params.ByName("version") versionString := params.ByName("version")
@ -1433,6 +1472,8 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request
return return
} }
composeID := uuid.New()
var targets []*target.Target var targets []*target.Target
if isRequestVersionAtLeast(params, 1) && cr.Upload != nil { if isRequestVersionAtLeast(params, 1) && cr.Upload != nil {
t := uploadRequestToTarget(*cr.Upload, imageType) t := uploadRequestToTarget(*cr.Upload, imageType)
@ -1441,7 +1482,9 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request
targets = append(targets, target.NewLocalTarget( targets = append(targets, target.NewLocalTarget(
&target.LocalTargetOptions{ &target.LocalTargetOptions{
Filename: imageType.Filename(), ComposeId: composeID,
ImageBuildId: 0,
Filename: imageType.Filename(),
}, },
)) ))
@ -1465,8 +1508,6 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request
return return
} }
var composeID uuid.UUID
// Check for test parameter // Check for test parameter
q, err := url.ParseQuery(request.URL.RawQuery) q, err := url.ParseQuery(request.URL.RawQuery)
if err != nil { if err != nil {
@ -1492,12 +1533,17 @@ func (api *API) composeHandler(writer http.ResponseWriter, request *http.Request
testMode := q.Get("test") testMode := q.Get("test")
if testMode == "1" { if testMode == "1" {
// Create a failed compose // Create a failed compose
composeID, err = api.store.PushTestCompose(manifest, imageType, bp, size, targets, false) err = api.store.PushTestCompose(composeID, manifest, imageType, bp, size, targets, false)
} else if testMode == "2" { } else if testMode == "2" {
// Create a successful compose // Create a successful compose
composeID, err = api.store.PushTestCompose(manifest, imageType, bp, size, targets, true) err = api.store.PushTestCompose(composeID, manifest, imageType, bp, size, targets, true)
} else { } else {
composeID, err = api.store.PushCompose(manifest, imageType, bp, size, targets) var jobId uuid.UUID
jobId, err = api.workers.Enqueue(manifest, targets)
if err == nil {
err = api.store.PushCompose(composeID, manifest, imageType, bp, size, targets, jobId)
}
} }
// TODO: we should probably do some kind of blueprint validation in future // TODO: we should probably do some kind of blueprint validation in future
@ -1549,29 +1595,34 @@ func (api *API) composeDeleteHandler(writer http.ResponseWriter, request *http.R
continue continue
} }
err = api.store.DeleteCompose(id) compose, exists := api.store.GetCompose(id)
if !exists {
if err != nil { errors = append(errors, composeDeleteError{
switch err.(type) { "UnknownUUID",
case *store.NotFoundError: fmt.Sprintf("compose %s doesn't exist", id),
errors = append(errors, composeDeleteError{ })
"UnknownUUID", continue
fmt.Sprintf("compose %s doesn't exist", id),
})
case *store.InvalidRequestError:
errors = append(errors, composeDeleteError{
"BuildInWrongState",
err.Error(),
})
default:
errors = append(errors, composeDeleteError{
"ComposeError",
fmt.Sprintf("%s: %s", id, err.Error()),
})
}
} else {
results = append(results, composeDeleteStatus{id, true})
} }
state, _, _, _ := api.getComposeState(compose)
if state != common.CFinished && state != common.CFailed {
errors = append(errors, composeDeleteError{
"BuildInWrongState",
fmt.Sprintf("Compose %s is not in FINISHED or FAILED.", id),
})
continue
}
err = api.store.DeleteCompose(id)
if err != nil {
errors = append(errors, composeDeleteError{
"ComposeError",
fmt.Sprintf("%s: %s", id, err.Error()),
})
continue
}
results = append(results, composeDeleteStatus{id, true})
} }
reply := struct { reply := struct {
@ -1614,13 +1665,16 @@ func (api *API) composeQueueHandler(writer http.ResponseWriter, request *http.Re
Run []*ComposeEntry `json:"run"` Run []*ComposeEntry `json:"run"`
}{[]*ComposeEntry{}, []*ComposeEntry{}} }{[]*ComposeEntry{}, []*ComposeEntry{}}
includeUploads := isRequestVersionAtLeast(params, 1)
composes := api.store.GetAllComposes() composes := api.store.GetAllComposes()
for id, compose := range composes { for id, compose := range composes {
switch compose.GetState() { state, queued, started, finished := api.getComposeState(compose)
switch state {
case common.CWaiting: case common.CWaiting:
reply.New = append(reply.New, composeToComposeEntry(id, compose, isRequestVersionAtLeast(params, 1))) reply.New = append(reply.New, composeToComposeEntry(id, compose, common.CWaiting, queued, started, finished, includeUploads))
case common.CRunning: case common.CRunning:
reply.Run = append(reply.Run, composeToComposeEntry(id, compose, isRequestVersionAtLeast(params, 1))) reply.Run = append(reply.Run, composeToComposeEntry(id, compose, common.CRunning, queued, started, finished, includeUploads))
} }
} }
@ -1682,9 +1736,10 @@ func (api *API) composeStatusHandler(writer http.ResponseWriter, request *http.R
if !exists { if !exists {
continue continue
} }
state, _, _, _ := api.getComposeState(compose)
if filterBlueprint != "" && compose.Blueprint.Name != filterBlueprint { if filterBlueprint != "" && compose.Blueprint.Name != filterBlueprint {
continue continue
} else if filterStatus != "" && compose.ImageBuilds[0].QueueStatus.ToString() != filterStatus { } else if filterStatus != "" && state.ToString() != filterStatus {
continue continue
} else if filterImageTypeExists && compose.ImageBuilds[0].ImageType != filterImageType { } else if filterImageTypeExists && compose.ImageBuilds[0].ImageType != filterImageType {
continue continue
@ -1696,7 +1751,8 @@ func (api *API) composeStatusHandler(writer http.ResponseWriter, request *http.R
includeUploads := isRequestVersionAtLeast(params, 1) includeUploads := isRequestVersionAtLeast(params, 1)
for _, id := range filteredUUIDs { for _, id := range filteredUUIDs {
if compose, exists := composes[id]; exists { if compose, exists := composes[id]; exists {
reply.UUIDs = append(reply.UUIDs, composeToComposeEntry(id, compose, includeUploads)) state, queued, started, finished := api.getComposeState(compose)
reply.UUIDs = append(reply.UUIDs, composeToComposeEntry(id, compose, state, queued, started, finished, includeUploads))
} }
} }
sortComposeEntries(reply.UUIDs) sortComposeEntries(reply.UUIDs)
@ -1755,8 +1811,9 @@ func (api *API) composeInfoHandler(writer http.ResponseWriter, request *http.Req
} }
// Weldr API assumes only one image build per compose, that's why only the // Weldr API assumes only one image build per compose, that's why only the
// 1st build is considered // 1st build is considered
state, _, _, _ := api.getComposeState(compose)
reply.ComposeType, _ = compose.ImageBuilds[0].ImageType.ToCompatString() reply.ComposeType, _ = compose.ImageBuilds[0].ImageType.ToCompatString()
reply.QueueStatus = compose.GetState().ToString() reply.QueueStatus = state.ToString()
reply.ImageSize = compose.ImageBuilds[0].Size reply.ImageSize = compose.ImageBuilds[0].Size
if isRequestVersionAtLeast(params, 1) { if isRequestVersionAtLeast(params, 1) {
@ -1793,10 +1850,11 @@ func (api *API) composeImageHandler(writer http.ResponseWriter, request *http.Re
return return
} }
if compose.GetState() != common.CFinished { state, _, _, _ := api.getComposeState(compose)
if state != common.CFinished {
errors := responseError{ errors := responseError{
ID: "BuildInWrongState", ID: "BuildInWrongState",
Msg: fmt.Sprintf("Build %s is in wrong state: %s", uuidString, compose.GetState().ToString()), Msg: fmt.Sprintf("Build %s is in wrong state: %s", uuidString, state.ToString()),
} }
statusResponseError(writer, http.StatusBadRequest, errors) statusResponseError(writer, http.StatusBadRequest, errors)
return return
@ -1863,7 +1921,8 @@ func (api *API) composeLogsHandler(writer http.ResponseWriter, request *http.Req
return return
} }
if compose.GetState() != common.CFinished && compose.GetState() != common.CFailed { state, _, _, _ := api.getComposeState(compose)
if state != common.CFinished && state != common.CFailed {
errors := responseError{ errors := responseError{
ID: "BuildInWrongState", ID: "BuildInWrongState",
Msg: fmt.Sprintf("Build %s not in FINISHED or FAILED state.", uuidString), Msg: fmt.Sprintf("Build %s not in FINISHED or FAILED state.", uuidString),
@ -1950,7 +2009,8 @@ func (api *API) composeLogHandler(writer http.ResponseWriter, request *http.Requ
return return
} }
if compose.GetState() == common.CWaiting { state, _, _, _ := api.getComposeState(compose)
if state == common.CWaiting {
errors := responseError{ errors := responseError{
ID: "BuildInWrongState", ID: "BuildInWrongState",
Msg: fmt.Sprintf("Build %s has not started yet. No logs to view.", uuidString), Msg: fmt.Sprintf("Build %s has not started yet. No logs to view.", uuidString),
@ -1959,7 +2019,7 @@ func (api *API) composeLogHandler(writer http.ResponseWriter, request *http.Requ
return return
} }
if compose.GetState() == common.CRunning { if state == common.CRunning {
fmt.Fprintf(writer, "Running...\n") fmt.Fprintf(writer, "Running...\n")
return return
} }
@ -2004,9 +2064,11 @@ func (api *API) composeFinishedHandler(writer http.ResponseWriter, request *http
includeUploads := isRequestVersionAtLeast(params, 1) includeUploads := isRequestVersionAtLeast(params, 1)
for id, compose := range api.store.GetAllComposes() { for id, compose := range api.store.GetAllComposes() {
if compose.ImageBuilds[0].QueueStatus == common.IBFinished { state, queued, started, finished := api.getComposeState(compose)
reply.Finished = append(reply.Finished, composeToComposeEntry(id, compose, includeUploads)) if state != common.CFinished {
continue
} }
reply.Finished = append(reply.Finished, composeToComposeEntry(id, compose, common.CFinished, queued, started, finished, includeUploads))
} }
sortComposeEntries(reply.Finished) sortComposeEntries(reply.Finished)
@ -2025,9 +2087,11 @@ func (api *API) composeFailedHandler(writer http.ResponseWriter, request *http.R
includeUploads := isRequestVersionAtLeast(params, 1) includeUploads := isRequestVersionAtLeast(params, 1)
for id, compose := range api.store.GetAllComposes() { for id, compose := range api.store.GetAllComposes() {
if compose.ImageBuilds[0].QueueStatus == common.IBFailed { state, queued, started, finished := api.getComposeState(compose)
reply.Failed = append(reply.Failed, composeToComposeEntry(id, compose, includeUploads)) if state != common.CFailed {
continue
} }
reply.Failed = append(reply.Failed, composeToComposeEntry(id, compose, common.CFailed, queued, started, finished, includeUploads))
} }
sortComposeEntries(reply.Failed) sortComposeEntries(reply.Failed)

View file

@ -38,7 +38,7 @@ func createWeldrAPI(fixtureGenerator rpmmd_mock.FixtureGenerator) (*API, *store.
panic(err) panic(err)
} }
return New(rpm, arch, d, repos, nil, fixture.Store), fixture.Store return New(rpm, arch, d, repos, nil, fixture.Store, fixture.Workers), fixture.Store
} }
func TestBasic(t *testing.T) { func TestBasic(t *testing.T) {

View file

@ -2,6 +2,7 @@ package weldr
import ( import (
"sort" "sort"
"time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/common"
@ -21,37 +22,40 @@ type ComposeEntry struct {
Uploads []uploadResponse `json:"uploads,omitempty"` Uploads []uploadResponse `json:"uploads,omitempty"`
} }
func composeToComposeEntry(id uuid.UUID, compose compose.Compose, includeUploads bool) *ComposeEntry { func composeToComposeEntry(id uuid.UUID, compose compose.Compose, state common.ComposeState, queued, started, finished time.Time, includeUploads bool) *ComposeEntry {
var composeEntry ComposeEntry var composeEntry ComposeEntry
composeEntry.ID = id composeEntry.ID = id
composeEntry.Blueprint = compose.Blueprint.Name composeEntry.Blueprint = compose.Blueprint.Name
composeEntry.Version = compose.Blueprint.Version composeEntry.Version = compose.Blueprint.Version
composeEntry.ComposeType = compose.ImageBuilds[0].ImageType composeEntry.ComposeType = compose.ImageBuilds[0].ImageType
composeEntry.QueueStatus = compose.ImageBuilds[0].QueueStatus
if includeUploads { if includeUploads {
composeEntry.Uploads = targetsToUploadResponses(compose.ImageBuilds[0].Targets) composeEntry.Uploads = targetsToUploadResponses(compose.ImageBuilds[0].Targets)
} }
switch compose.ImageBuilds[0].QueueStatus { switch state {
case common.IBWaiting: case common.CWaiting:
composeEntry.JobCreated = float64(compose.ImageBuilds[0].JobCreated.UnixNano()) / 1000000000 composeEntry.QueueStatus = common.IBWaiting
composeEntry.JobCreated = float64(queued.UnixNano()) / 1000000000
case common.IBRunning: case common.CRunning:
composeEntry.JobCreated = float64(compose.ImageBuilds[0].JobCreated.UnixNano()) / 1000000000 composeEntry.QueueStatus = common.IBRunning
composeEntry.JobStarted = float64(compose.ImageBuilds[0].JobStarted.UnixNano()) / 1000000000 composeEntry.JobCreated = float64(queued.UnixNano()) / 1000000000
composeEntry.JobStarted = float64(started.UnixNano()) / 1000000000
case common.IBFinished: case common.CFinished:
composeEntry.QueueStatus = common.IBFinished
composeEntry.ImageSize = compose.ImageBuilds[0].Size composeEntry.ImageSize = compose.ImageBuilds[0].Size
composeEntry.JobCreated = float64(compose.ImageBuilds[0].JobCreated.UnixNano()) / 1000000000 composeEntry.JobCreated = float64(queued.UnixNano()) / 1000000000
composeEntry.JobStarted = float64(compose.ImageBuilds[0].JobStarted.UnixNano()) / 1000000000 composeEntry.JobStarted = float64(started.UnixNano()) / 1000000000
composeEntry.JobFinished = float64(compose.ImageBuilds[0].JobFinished.UnixNano()) / 1000000000 composeEntry.JobFinished = float64(finished.UnixNano()) / 1000000000
case common.IBFailed: case common.CFailed:
composeEntry.JobCreated = float64(compose.ImageBuilds[0].JobCreated.UnixNano()) / 1000000000 composeEntry.QueueStatus = common.IBFailed
composeEntry.JobStarted = float64(compose.ImageBuilds[0].JobStarted.UnixNano()) / 1000000000 composeEntry.JobCreated = float64(queued.UnixNano()) / 1000000000
composeEntry.JobFinished = float64(compose.ImageBuilds[0].JobFinished.UnixNano()) / 1000000000 composeEntry.JobStarted = float64(started.UnixNano()) / 1000000000
composeEntry.JobFinished = float64(finished.UnixNano()) / 1000000000
default: default:
panic("invalid compose state") panic("invalid compose state")
} }

View file

@ -25,10 +25,9 @@ type Client struct {
} }
type Job struct { type Job struct {
ComposeID uuid.UUID Id uuid.UUID
ImageBuildID int Manifest *osbuild.Manifest
Manifest *osbuild.Manifest Targets []*target.Target
Targets []*target.Target
} }
func NewClient(address string, conf *tls.Config) *Client { func NewClient(address string, conf *tls.Config) *Client {
@ -86,8 +85,7 @@ func (c *Client) AddJob() (*Job, error) {
} }
return &Job{ return &Job{
jr.ComposeID, jr.Id,
jr.ImageBuildID,
jr.Manifest, jr.Manifest,
jr.Targets, jr.Targets,
}, nil }, nil
@ -99,7 +97,7 @@ func (c *Client) UpdateJob(job *Job, status common.ImageBuildState, result *comm
if err != nil { if err != nil {
panic(err) panic(err)
} }
urlPath := fmt.Sprintf("/job-queue/v1/jobs/%s/builds/%d", job.ComposeID.String(), job.ImageBuildID) urlPath := fmt.Sprintf("/job-queue/v1/jobs/%s", job.Id)
url := c.createURL(urlPath) url := c.createURL(urlPath)
req, err := http.NewRequest("PATCH", url, &b) req, err := http.NewRequest("PATCH", url, &b)
if err != nil { if err != nil {
@ -120,9 +118,9 @@ func (c *Client) UpdateJob(job *Job, status common.ImageBuildState, result *comm
return nil return nil
} }
func (c *Client) UploadImage(job *Job, reader io.Reader) error { func (c *Client) UploadImage(composeId uuid.UUID, imageBuildId int, reader io.Reader) error {
// content type doesn't really matter // content type doesn't really matter
url := c.createURL(fmt.Sprintf("/job-queue/v1/jobs/%s/builds/%d/image", job.ComposeID.String(), job.ImageBuildID)) url := c.createURL(fmt.Sprintf("/job-queue/v1/jobs/%s/builds/%d/image", composeId, imageBuildId))
_, err := c.client.Post(url, "application/octet-stream", reader) _, err := c.client.Post(url, "application/octet-stream", reader)
return err return err

View file

@ -8,6 +8,23 @@ import (
"github.com/osbuild/osbuild-composer/internal/target" "github.com/osbuild/osbuild-composer/internal/target"
) )
//
// JSON-serializable types for the jobqueue
//
type OSBuildJob struct {
Manifest *osbuild.Manifest `json:"manifest"`
Targets []*target.Target `json:"targets,omitempty"`
}
type OSBuildJobResult struct {
OSBuildOutput *common.ComposeResult `json:"osbuild_output,omitempty"`
}
//
// JSON-serializable types for the HTTP API
//
type errorResponse struct { type errorResponse struct {
Message string `json:"message"` Message string `json:"message"`
} }
@ -16,10 +33,9 @@ type addJobRequest struct {
} }
type addJobResponse struct { type addJobResponse struct {
ComposeID uuid.UUID `json:"compose_id"` Id uuid.UUID `json:"id"`
ImageBuildID int `json:"image_build_id"` Manifest *osbuild.Manifest `json:"manifest"`
Manifest *osbuild.Manifest `json:"manifest"` Targets []*target.Target `json:"targets,omitempty"`
Targets []*target.Target `json:"targets"`
} }
type updateJobRequest struct { type updateJobRequest struct {

View file

@ -3,27 +3,37 @@ package worker
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"io/ioutil"
"log" "log"
"net" "net"
"net/http" "net/http"
"strconv" "strconv"
"time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/julienschmidt/httprouter" "github.com/julienschmidt/httprouter"
"github.com/osbuild/osbuild-composer/internal/store" "github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/jobqueue"
"github.com/osbuild/osbuild-composer/internal/osbuild"
"github.com/osbuild/osbuild-composer/internal/target"
) )
type Server struct { type Server struct {
logger *log.Logger logger *log.Logger
store *store.Store jobs jobqueue.JobQueue
router *httprouter.Router router *httprouter.Router
imageWriter WriteImageFunc
} }
func NewServer(logger *log.Logger, store *store.Store) *Server { type WriteImageFunc func(composeID uuid.UUID, imageBuildID int, reader io.Reader) error
func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, imageWriter WriteImageFunc) *Server {
s := &Server{ s := &Server{
logger: logger, logger: logger,
store: store, jobs: jobs,
imageWriter: imageWriter,
} }
s.router = httprouter.New() s.router = httprouter.New()
@ -33,7 +43,7 @@ func NewServer(logger *log.Logger, store *store.Store) *Server {
s.router.NotFound = http.HandlerFunc(notFoundHandler) s.router.NotFound = http.HandlerFunc(notFoundHandler)
s.router.POST("/job-queue/v1/jobs", s.addJobHandler) s.router.POST("/job-queue/v1/jobs", s.addJobHandler)
s.router.PATCH("/job-queue/v1/jobs/:job_id/builds/:build_id", s.updateJobHandler) s.router.PATCH("/job-queue/v1/jobs/:job_id", s.updateJobHandler)
s.router.POST("/job-queue/v1/jobs/:job_id/builds/:build_id/image", s.addJobImageHandler) s.router.POST("/job-queue/v1/jobs/:job_id/builds/:build_id/image", s.addJobImageHandler)
return s return s
@ -59,6 +69,38 @@ func (s *Server) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
s.router.ServeHTTP(writer, request) s.router.ServeHTTP(writer, request)
} }
func (s *Server) Enqueue(manifest *osbuild.Manifest, targets []*target.Target) (uuid.UUID, error) {
job := OSBuildJob{
Manifest: manifest,
Targets: targets,
}
return s.jobs.Enqueue("osbuild", job, nil)
}
func (s *Server) JobStatus(id uuid.UUID) (state common.ComposeState, queued, started, finished time.Time, err error) {
var result OSBuildJobResult
var status jobqueue.JobStatus
status, queued, started, finished, err = s.jobs.JobStatus(id, &result)
if err != nil {
return
}
state = composeStateFromJobStatus(status, result.OSBuildOutput)
return
}
func (s *Server) JobResult(id uuid.UUID) (common.ComposeState, *common.ComposeResult, error) {
var result OSBuildJobResult
status, _, _, _, err := s.jobs.JobStatus(id, &result)
if err != nil {
return common.CWaiting, nil, err
}
return composeStateFromJobStatus(status, result.OSBuildOutput), result.OSBuildOutput, nil
}
// jsonErrorf() is similar to http.Error(), but returns the message in a json // jsonErrorf() is similar to http.Error(), but returns the message in a json
// object with a "message" field. // object with a "message" field.
func jsonErrorf(writer http.ResponseWriter, code int, message string, args ...interface{}) { func jsonErrorf(writer http.ResponseWriter, code int, message string, args ...interface{}) {
@ -92,15 +134,19 @@ func (s *Server) addJobHandler(writer http.ResponseWriter, request *http.Request
return return
} }
nextJob := s.store.PopJob() var job OSBuildJob
id, err := s.jobs.Dequeue(request.Context(), []string{"osbuild"}, &job)
if err != nil {
jsonErrorf(writer, http.StatusInternalServerError, "%v", err)
return
}
writer.WriteHeader(http.StatusCreated) writer.WriteHeader(http.StatusCreated)
// FIXME: handle or comment this possible error // FIXME: handle or comment this possible error
_ = json.NewEncoder(writer).Encode(addJobResponse{ _ = json.NewEncoder(writer).Encode(addJobResponse{
ComposeID: nextJob.ComposeID, Id: id,
ImageBuildID: nextJob.ImageBuildID, Manifest: job.Manifest,
Manifest: nextJob.Manifest, Targets: job.Targets,
Targets: nextJob.Targets,
}) })
} }
@ -117,13 +163,6 @@ func (s *Server) updateJobHandler(writer http.ResponseWriter, request *http.Requ
return return
} }
imageBuildId, err := strconv.Atoi(params.ByName("build_id"))
if err != nil {
jsonErrorf(writer, http.StatusBadRequest, "cannot parse image build id: %v", err)
return
}
var body updateJobRequest var body updateJobRequest
err = json.NewDecoder(request.Body).Decode(&body) err = json.NewDecoder(request.Body).Decode(&body)
if err != nil { if err != nil {
@ -131,13 +170,21 @@ func (s *Server) updateJobHandler(writer http.ResponseWriter, request *http.Requ
return return
} }
err = s.store.UpdateImageBuildInCompose(id, imageBuildId, body.Status, body.Result) // The jobqueue doesn't support setting the status before a job is
// finished. This branch should never be hit, because the worker
// doesn't attempt this. Change the API to remove this awkwardness.
if body.Status != common.IBFinished && body.Status != common.IBFailed {
jsonErrorf(writer, http.StatusBadRequest, "setting status of a job to waiting or running is not supported")
return
}
err = s.jobs.FinishJob(id, OSBuildJobResult{OSBuildOutput: body.Result})
if err != nil { if err != nil {
switch err.(type) { switch err {
case *store.NotFoundError, *store.NotPendingError: case jobqueue.ErrNotExist:
jsonErrorf(writer, http.StatusNotFound, "%v", err) jsonErrorf(writer, http.StatusNotFound, "job does not exist: %s", id)
case *store.NotRunningError, *store.InvalidRequestError: case jobqueue.ErrNotRunning:
jsonErrorf(writer, http.StatusBadRequest, "%v", err) jsonErrorf(writer, http.StatusBadRequest, "job is not running: %s", id)
default: default:
jsonErrorf(writer, http.StatusInternalServerError, "%v", err) jsonErrorf(writer, http.StatusInternalServerError, "%v", err)
} }
@ -155,23 +202,33 @@ func (s *Server) addJobImageHandler(writer http.ResponseWriter, request *http.Re
} }
imageBuildId, err := strconv.Atoi(params.ByName("build_id")) imageBuildId, err := strconv.Atoi(params.ByName("build_id"))
if err != nil { if err != nil {
jsonErrorf(writer, http.StatusBadRequest, "cannot parse image build id: %v", err) jsonErrorf(writer, http.StatusBadRequest, "cannot parse image build id: %v", err)
return return
} }
err = s.store.AddImageToImageUpload(id, imageBuildId, request.Body) if s.imageWriter == nil {
_, err = io.Copy(ioutil.Discard, request.Body)
} else {
err = s.imageWriter(id, imageBuildId, request.Body)
}
if err != nil { if err != nil {
switch err.(type) { jsonErrorf(writer, http.StatusInternalServerError, "%v", err)
case *store.NotFoundError:
jsonErrorf(writer, http.StatusNotFound, "%v", err)
case *store.NoLocalTargetError:
jsonErrorf(writer, http.StatusBadRequest, "%v", err)
default:
jsonErrorf(writer, http.StatusInternalServerError, "%v", err)
}
return
} }
} }
func composeStateFromJobStatus(status jobqueue.JobStatus, output *common.ComposeResult) common.ComposeState {
switch status {
case jobqueue.JobPending:
return common.CWaiting
case jobqueue.JobRunning:
return common.CRunning
case jobqueue.JobFinished:
if output.Success {
return common.CFinished
} else {
return common.CFailed
}
}
return common.CWaiting
}

View file

@ -5,10 +5,10 @@ import (
"testing" "testing"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/stretchr/testify/require"
"github.com/osbuild/osbuild-composer/internal/blueprint"
"github.com/osbuild/osbuild-composer/internal/distro/fedoratest" "github.com/osbuild/osbuild-composer/internal/distro/fedoratest"
"github.com/osbuild/osbuild-composer/internal/store" "github.com/osbuild/osbuild-composer/internal/jobqueue/testjobqueue"
"github.com/osbuild/osbuild-composer/internal/test" "github.com/osbuild/osbuild-composer/internal/test"
"github.com/osbuild/osbuild-composer/internal/worker" "github.com/osbuild/osbuild-composer/internal/worker"
) )
@ -27,15 +27,15 @@ func TestErrors(t *testing.T) {
// Wrong method // Wrong method
{"GET", "/job-queue/v1/jobs", ``, http.StatusMethodNotAllowed}, {"GET", "/job-queue/v1/jobs", ``, http.StatusMethodNotAllowed},
// Update job with invalid ID // Update job with invalid ID
{"PATCH", "/job-queue/v1/jobs/foo/builds/0", `{"status":"RUNNING"}`, http.StatusBadRequest}, {"PATCH", "/job-queue/v1/jobs/foo", `{"status":"FINISHED"}`, http.StatusBadRequest},
// Update job that does not exist, with invalid body // Update job that does not exist, with invalid body
{"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa/builds/0", ``, http.StatusBadRequest}, {"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", ``, http.StatusBadRequest},
// Update job that does not exist // Update job that does not exist
{"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa/builds/0", `{"status":"RUNNING"}`, http.StatusNotFound}, {"PATCH", "/job-queue/v1/jobs/aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", `{"status":"FINISHED"}`, http.StatusNotFound},
} }
for _, c := range cases { for _, c := range cases {
server := worker.NewServer(nil, store.New(nil)) server := worker.NewServer(nil, testjobqueue.New(), nil)
test.TestRoute(t, server, false, c.Method, c.Path, c.Body, c.ExpectedStatus, "{}", "message") test.TestRoute(t, server, false, c.Method, c.Path, c.Body, c.ExpectedStatus, "{}", "message")
} }
} }
@ -50,21 +50,18 @@ func TestCreate(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("error getting image type from arch") t.Fatalf("error getting image type from arch")
} }
store := store.New(nil) server := worker.NewServer(nil, testjobqueue.New(), nil)
server := worker.NewServer(nil, store)
manifest, err := imageType.Manifest(nil, nil, nil, nil, imageType.Size(0)) manifest, err := imageType.Manifest(nil, nil, nil, nil, imageType.Size(0))
if err != nil { if err != nil {
t.Fatalf("error creating osbuild manifest") t.Fatalf("error creating osbuild manifest")
} }
id, err := store.PushCompose(manifest, imageType.Name(), &blueprint.Blueprint{}, 0, nil) id, err := server.Enqueue(manifest, nil)
if err != nil { require.NoError(t, err)
t.Fatalf("error pushing compose: %v", err)
}
test.TestRoute(t, server, false, "POST", "/job-queue/v1/jobs", `{}`, http.StatusCreated, test.TestRoute(t, server, false, "POST", "/job-queue/v1/jobs", `{}`, http.StatusCreated,
`{"compose_id":"`+id.String()+`","image_build_id":0,"manifest":{"sources":{},"pipeline":{}},"targets":[]}`, "created") `{"id":"`+id.String()+`","manifest":{"sources":{},"pipeline":{}}}`, "created")
} }
func testUpdateTransition(t *testing.T, from, to string, expectedStatus int) { func testUpdateTransition(t *testing.T, from, to string, expectedStatus int) {
@ -77,8 +74,7 @@ func testUpdateTransition(t *testing.T, from, to string, expectedStatus int) {
if err != nil { if err != nil {
t.Fatalf("error getting image type from arch") t.Fatalf("error getting image type from arch")
} }
store := store.New(nil) server := worker.NewServer(nil, testjobqueue.New(), nil)
server := worker.NewServer(nil, store)
id := uuid.Nil id := uuid.Nil
if from != "VOID" { if from != "VOID" {
@ -87,20 +83,18 @@ func testUpdateTransition(t *testing.T, from, to string, expectedStatus int) {
t.Fatalf("error creating osbuild manifest") t.Fatalf("error creating osbuild manifest")
} }
id, err = store.PushCompose(manifest, imageType.Name(), &blueprint.Blueprint{}, 0, nil) id, err = server.Enqueue(manifest, nil)
if err != nil { require.NoError(t, err)
t.Fatalf("error pushing compose: %v", err)
}
if from != "WAITING" { if from != "WAITING" {
test.SendHTTP(server, false, "POST", "/job-queue/v1/jobs", `{}`) test.SendHTTP(server, false, "POST", "/job-queue/v1/jobs", `{}`)
if from != "RUNNING" { if from != "RUNNING" {
test.SendHTTP(server, false, "PATCH", "/job-queue/v1/jobs/"+id.String()+"/builds/0", `{"status":"`+from+`"}`) test.SendHTTP(server, false, "PATCH", "/job-queue/v1/jobs/"+id.String(), `{"status":"`+from+`"}`)
} }
} }
} }
test.TestRoute(t, server, false, "PATCH", "/job-queue/v1/jobs/"+id.String()+"/builds/0", `{"status":"`+to+`"}`, expectedStatus, "{}", "message") test.TestRoute(t, server, false, "PATCH", "/job-queue/v1/jobs/"+id.String(), `{"status":"`+to+`"}`, expectedStatus, "{}", "message")
} }
func TestUpdate(t *testing.T) { func TestUpdate(t *testing.T) {
@ -109,16 +103,16 @@ func TestUpdate(t *testing.T) {
To string To string
ExpectedStatus int ExpectedStatus int
}{ }{
{"VOID", "WAITING", http.StatusNotFound}, {"VOID", "WAITING", http.StatusBadRequest},
{"VOID", "RUNNING", http.StatusNotFound}, {"VOID", "RUNNING", http.StatusBadRequest},
{"VOID", "FINISHED", http.StatusNotFound}, {"VOID", "FINISHED", http.StatusNotFound},
{"VOID", "FAILED", http.StatusNotFound}, {"VOID", "FAILED", http.StatusNotFound},
{"WAITING", "WAITING", http.StatusNotFound}, {"WAITING", "WAITING", http.StatusBadRequest},
{"WAITING", "RUNNING", http.StatusNotFound}, {"WAITING", "RUNNING", http.StatusBadRequest},
{"WAITING", "FINISHED", http.StatusNotFound}, {"WAITING", "FINISHED", http.StatusBadRequest},
{"WAITING", "FAILED", http.StatusNotFound}, {"WAITING", "FAILED", http.StatusBadRequest},
{"RUNNING", "WAITING", http.StatusBadRequest}, {"RUNNING", "WAITING", http.StatusBadRequest},
{"RUNNING", "RUNNING", http.StatusOK}, {"RUNNING", "RUNNING", http.StatusBadRequest},
{"RUNNING", "FINISHED", http.StatusOK}, {"RUNNING", "FINISHED", http.StatusOK},
{"RUNNING", "FAILED", http.StatusOK}, {"RUNNING", "FAILED", http.StatusOK},
{"FINISHED", "WAITING", http.StatusBadRequest}, {"FINISHED", "WAITING", http.StatusBadRequest},
@ -132,6 +126,7 @@ func TestUpdate(t *testing.T) {
} }
for _, c := range cases { for _, c := range cases {
t.Log(c)
testUpdateTransition(t, c.From, c.To, c.ExpectedStatus) testUpdateTransition(t, c.From, c.To, c.ExpectedStatus)
} }
} }