cloudapi/v2: Use worker to depsolve
Signed-off-by: Ondřej Budai <ondrej@budai.cz>
This commit is contained in:
parent
0f90aa9c78
commit
e904397fdb
6 changed files with 96 additions and 19 deletions
|
|
@ -33,6 +33,12 @@ func (impl *DepsolveJobImpl) Run(job worker.Job) error {
|
|||
var result worker.DepsolveJobResult
|
||||
result.PackageSpecs, err = impl.depsolve(args.PackageSets, args.Repos, args.ModulePlatformID, args.Arch, args.Releasever)
|
||||
if err != nil {
|
||||
switch err.(type) {
|
||||
case *rpmmd.DNFError:
|
||||
result.ErrorType = worker.DepsolveErrorType
|
||||
case error:
|
||||
result.ErrorType = worker.OtherErrorType
|
||||
}
|
||||
result.Error = err.Error()
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -50,6 +50,8 @@ const (
|
|||
ErrorUnknownManifestVersion ServiceErrorCode = 1010
|
||||
ErrorUnableToConvertOSTreeCommitStageMetadata ServiceErrorCode = 1011
|
||||
ErrorMalformedOSBuildJobResult ServiceErrorCode = 1012
|
||||
ErrorGettingDepsolveJobStatus ServiceErrorCode = 1013
|
||||
ErrorDepsolveJobCanceled ServiceErrorCode = 1014
|
||||
|
||||
// Errors contained within this file
|
||||
ErrorUnspecified ServiceErrorCode = 10000
|
||||
|
|
@ -107,6 +109,8 @@ func getServiceErrors() serviceErrors {
|
|||
serviceError{ErrorUnknownManifestVersion, http.StatusInternalServerError, "Unknown manifest version"},
|
||||
serviceError{ErrorUnableToConvertOSTreeCommitStageMetadata, http.StatusInternalServerError, "Unable to convert ostree commit stage metadata"},
|
||||
serviceError{ErrorMalformedOSBuildJobResult, http.StatusInternalServerError, "OSBuildJobResult does not have expected fields set"},
|
||||
serviceError{ErrorGettingDepsolveJobStatus, http.StatusInternalServerError, "Unable to get depsolve job status"},
|
||||
serviceError{ErrorDepsolveJobCanceled, http.StatusInternalServerError, "Depsolve job was cancelled"},
|
||||
|
||||
serviceError{ErrorUnspecified, http.StatusInternalServerError, "Unspecified internal error "},
|
||||
serviceError{ErrorNotHTTPError, http.StatusInternalServerError, "Error is not an instance of HTTPError"},
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import (
|
|||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/labstack/echo/v4"
|
||||
|
|
@ -199,18 +200,41 @@ func (h *apiHandlers) PostCompose(ctx echo.Context) error {
|
|||
}
|
||||
|
||||
packageSets := imageType.PackageSets(bp)
|
||||
pkgSpecSets := make(map[string][]rpmmd.PackageSpec)
|
||||
for name, packages := range packageSets {
|
||||
pkgs, _, err := h.server.rpmMetadata.Depsolve(packages, repositories, distribution.ModulePlatformID(), arch.Name(), distribution.Releasever())
|
||||
var dnfError *rpmmd.DNFError
|
||||
if err != nil && errors.As(err, &dnfError) {
|
||||
return HTTPError(ErrorDNFError)
|
||||
} else if err != nil {
|
||||
return HTTPError(ErrorFailedToDepsolve)
|
||||
}
|
||||
pkgSpecSets[name] = pkgs
|
||||
depsolveJobID, err := h.server.workers.EnqueueDepsolve(&worker.DepsolveJob{
|
||||
PackageSets: packageSets,
|
||||
Repos: repositories,
|
||||
ModulePlatformID: distribution.ModulePlatformID(),
|
||||
Arch: arch.Name(),
|
||||
Releasever: distribution.Releasever(),
|
||||
})
|
||||
if err != nil {
|
||||
return HTTPErrorWithInternal(ErrorEnqueueingJob, err)
|
||||
}
|
||||
|
||||
var depsolveResults worker.DepsolveJobResult
|
||||
for {
|
||||
status, _, err := h.server.workers.JobStatus(depsolveJobID, &depsolveResults)
|
||||
if err != nil {
|
||||
return HTTPErrorWithInternal(ErrorGettingDepsolveJobStatus, err)
|
||||
}
|
||||
if status.Canceled {
|
||||
return HTTPErrorWithInternal(ErrorDepsolveJobCanceled, err)
|
||||
}
|
||||
if !status.Finished.IsZero() {
|
||||
break
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
|
||||
if depsolveResults.Error != "" {
|
||||
if depsolveResults.ErrorType == worker.DepsolveErrorType {
|
||||
return HTTPError(ErrorDNFError)
|
||||
}
|
||||
return HTTPErrorWithInternal(ErrorFailedToDepsolve, errors.New(depsolveResults.Error))
|
||||
}
|
||||
|
||||
pkgSpecSets := depsolveResults.PackageSpecs
|
||||
|
||||
imageOptions := distro.ImageOptions{Size: imageType.Size(0)}
|
||||
if request.Customizations != nil && request.Customizations.Subscription != nil {
|
||||
imageOptions.Subscription = &distro.SubscriptionImageOptions{
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import (
|
|||
"net/http"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
|
|
@ -19,7 +20,7 @@ import (
|
|||
"github.com/osbuild/osbuild-composer/internal/worker"
|
||||
)
|
||||
|
||||
func newV2Server(t *testing.T, dir string) (*v2.Server, *worker.Server) {
|
||||
func newV2Server(t *testing.T, dir string) (*v2.Server, *worker.Server, context.CancelFunc) {
|
||||
rpmFixture := rpmmd_mock.BaseFixture(dir)
|
||||
rpm := rpmmd_mock.NewRPMMDMock(rpmFixture)
|
||||
require.NotNil(t, rpm)
|
||||
|
|
@ -31,14 +32,38 @@ func newV2Server(t *testing.T, dir string) (*v2.Server, *worker.Server) {
|
|||
v2Server := v2.NewServer(rpmFixture.Workers, rpm, distros, "image-builder.service")
|
||||
require.NotNil(t, v2Server)
|
||||
|
||||
return v2Server, rpmFixture.Workers
|
||||
// start a routine which just completes depsolve jobs
|
||||
depsolveContext, cancel := context.WithCancel(context.Background())
|
||||
go func() {
|
||||
for {
|
||||
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(50*time.Millisecond))
|
||||
defer cancel()
|
||||
_, token, _, _, _, err := rpmFixture.Workers.RequestJob(ctx, test_distro.TestDistroName, []string{"depsolve"})
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
rawMsg, err := json.Marshal(&worker.DepsolveJobResult{PackageSpecs: nil, Error: "", ErrorType: worker.ErrorType("")})
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, rpmFixture.Workers.FinishJob(token, rawMsg))
|
||||
|
||||
select {
|
||||
case <-depsolveContext.Done():
|
||||
return
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return v2Server, rpmFixture.Workers, cancel
|
||||
}
|
||||
|
||||
func TestUnknownRoute(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "osbuild-composer-test-api-v2-")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
srv, _ := newV2Server(t, dir)
|
||||
srv, _, cancel := newV2Server(t, dir)
|
||||
defer cancel()
|
||||
|
||||
test.TestRoute(t, srv.Handler("/api/image-builder-composer/v2"), false, "GET", "/api/image-builder-composer/v2/badroute", ``, http.StatusNotFound, `
|
||||
{
|
||||
|
|
@ -54,7 +79,8 @@ func TestGetError(t *testing.T) {
|
|||
dir, err := ioutil.TempDir("", "osbuild-composer-test-api-v2-")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
srv, _ := newV2Server(t, dir)
|
||||
srv, _, cancel := newV2Server(t, dir)
|
||||
defer cancel()
|
||||
|
||||
test.TestRoute(t, srv.Handler("/api/image-builder-composer/v2"), false, "GET", "/api/image-builder-composer/v2/errors/4", ``, http.StatusOK, `
|
||||
{
|
||||
|
|
@ -79,7 +105,8 @@ func TestGetErrorList(t *testing.T) {
|
|||
dir, err := ioutil.TempDir("", "osbuild-composer-test-api-v2-")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
srv, _ := newV2Server(t, dir)
|
||||
srv, _, cancel := newV2Server(t, dir)
|
||||
defer cancel()
|
||||
|
||||
test.TestRoute(t, srv.Handler("/api/image-builder-composer/v2"), false, "GET", "/api/image-builder-composer/v2/errors?page=3&size=1", ``, http.StatusOK, `
|
||||
{
|
||||
|
|
@ -100,7 +127,8 @@ func TestCompose(t *testing.T) {
|
|||
dir, err := ioutil.TempDir("", "osbuild-composer-test-api-v2-")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
srv, _ := newV2Server(t, dir)
|
||||
srv, _, cancel := newV2Server(t, dir)
|
||||
defer cancel()
|
||||
|
||||
// unsupported distribution
|
||||
test.TestRoute(t, srv.Handler("/api/image-builder-composer/v2"), false, "POST", "/api/image-builder-composer/v2/compose", fmt.Sprintf(`
|
||||
|
|
@ -224,7 +252,8 @@ func TestComposeStatusSuccess(t *testing.T) {
|
|||
dir, err := ioutil.TempDir("", "osbuild-composer-test-api-v2-")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
srv, wrksrv := newV2Server(t, dir)
|
||||
srv, wrksrv, cancel := newV2Server(t, dir)
|
||||
defer cancel()
|
||||
|
||||
test.TestRoute(t, srv.Handler("/api/image-builder-composer/v2"), false, "POST", "/api/image-builder-composer/v2/compose", fmt.Sprintf(`
|
||||
{
|
||||
|
|
@ -289,7 +318,8 @@ func TestComposeStatusFailure(t *testing.T) {
|
|||
dir, err := ioutil.TempDir("", "osbuild-composer-test-api-v2-")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
srv, wrksrv := newV2Server(t, dir)
|
||||
srv, wrksrv, cancel := newV2Server(t, dir)
|
||||
defer cancel()
|
||||
|
||||
test.TestRoute(t, srv.Handler("/api/image-builder-composer/v2"), false, "POST", "/api/image-builder-composer/v2/compose", fmt.Sprintf(`
|
||||
{
|
||||
|
|
@ -338,7 +368,8 @@ func TestComposeCustomizations(t *testing.T) {
|
|||
dir, err := ioutil.TempDir("", "osbuild-composer-test-api-v2-")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
srv, _ := newV2Server(t, dir)
|
||||
srv, _, cancel := newV2Server(t, dir)
|
||||
defer cancel()
|
||||
|
||||
test.TestRoute(t, srv.Handler("/api/image-builder-composer/v2"), false, "POST", "/api/image-builder-composer/v2/compose", fmt.Sprintf(`
|
||||
{
|
||||
|
|
|
|||
|
|
@ -81,9 +81,17 @@ type DepsolveJob struct {
|
|||
Releasever string `json:"releasever"`
|
||||
}
|
||||
|
||||
type ErrorType string
|
||||
|
||||
const (
|
||||
DepsolveErrorType ErrorType = "depsolve"
|
||||
OtherErrorType ErrorType = "other"
|
||||
)
|
||||
|
||||
type DepsolveJobResult struct {
|
||||
PackageSpecs map[string][]rpmmd.PackageSpec `json:"package_specs"`
|
||||
Error string `json:"error"`
|
||||
ErrorType ErrorType `json:"error_type"`
|
||||
}
|
||||
|
||||
//
|
||||
|
|
|
|||
|
|
@ -104,6 +104,10 @@ func (s *Server) EnqueueKojiFinalize(job *KojiFinalizeJob, initID uuid.UUID, bui
|
|||
return s.jobs.Enqueue("koji-finalize", job, append([]uuid.UUID{initID}, buildIDs...))
|
||||
}
|
||||
|
||||
func (s *Server) EnqueueDepsolve(job *DepsolveJob) (uuid.UUID, error) {
|
||||
return s.jobs.Enqueue("depsolve", job, nil)
|
||||
}
|
||||
|
||||
func (s *Server) JobStatus(id uuid.UUID, result interface{}) (*JobStatus, []uuid.UUID, error) {
|
||||
rawResult, queued, started, finished, canceled, deps, err := s.jobs.JobStatus(id)
|
||||
if err != nil {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue