diff --git a/cmd/osbuild-composer/composer.go b/cmd/osbuild-composer/composer.go index 1af14a998..59b06df53 100644 --- a/cmd/osbuild-composer/composer.go +++ b/cmd/osbuild-composer/composer.go @@ -54,10 +54,13 @@ func NewComposer(config *ComposerConfigFile, stateDir, cacheDir string) (*Compos cacheDir: cacheDir, } + workerConfig := worker.Config{ + BasePath: config.Worker.BasePath, + } + var err error - artifactsDir := "" if config.Worker.EnableArtifacts { - artifactsDir, err = c.ensureStateDirectory("artifacts", 0755) + workerConfig.ArtifactsDir, err = c.ensureStateDirectory("artifacts", 0755) if err != nil { return nil, err } @@ -98,12 +101,12 @@ func NewComposer(config *ComposerConfigFile, stateDir, cacheDir string) (*Compos } } - requestJobTimeout, err := time.ParseDuration(config.Worker.RequestJobTimeout) + workerConfig.RequestJobTimeout, err = time.ParseDuration(config.Worker.RequestJobTimeout) if err != nil { return nil, fmt.Errorf("Unable to parse request job timeout: %v", err) } - c.workers = worker.NewServer(c.logger, jobs, artifactsDir, requestJobTimeout, config.Worker.BasePath) + c.workers = worker.NewServer(c.logger, jobs, workerConfig) return &c, nil } @@ -121,8 +124,10 @@ func (c *Composer) InitWeldr(repoPaths []string, weldrListener net.Listener, func (c *Composer) InitAPI(cert, key string, enableTLS bool, enableMTLS bool, enableJWT bool, l net.Listener) error { config := v2.ServerConfig{ - AWSBucket: c.config.Koji.AWS.Bucket, + AWSBucket: c.config.Koji.AWS.Bucket, + TenantProviderFields: c.config.CloudAPI.JWT.TenantProviderFields, } + c.api = cloudapi.NewServer(c.workers, c.distros, config) c.koji = kojiapi.NewServer(c.logger, c.workers, c.rpm, c.distros) diff --git a/internal/cloudapi/v2/v2_test.go b/internal/cloudapi/v2/v2_test.go index ac84c5a59..f496bd316 100644 --- a/internal/cloudapi/v2/v2_test.go +++ b/internal/cloudapi/v2/v2_test.go @@ -8,7 +8,6 @@ import ( "net/http" "os" "testing" - "time" "github.com/stretchr/testify/require" @@ -26,7 +25,7 @@ import ( func newV2Server(t *testing.T, dir string) (*v2.Server, *worker.Server, context.CancelFunc) { q, err := fsjobqueue.New(dir) require.NoError(t, err) - workerServer := worker.NewServer(nil, q, "", time.Duration(0), "/api/worker/v1") + workerServer := worker.NewServer(nil, q, worker.Config{BasePath: "/api/worker/v1"}) distros, err := distro_mock.NewDefaultRegistry() require.NoError(t, err) diff --git a/internal/mocks/rpmmd/fixtures.go b/internal/mocks/rpmmd/fixtures.go index 2669b1886..abe7706ec 100644 --- a/internal/mocks/rpmmd/fixtures.go +++ b/internal/mocks/rpmmd/fixtures.go @@ -57,7 +57,7 @@ func createBaseWorkersFixture(tmpdir string) *worker.Server { if err != nil { panic(err) } - return worker.NewServer(nil, q, "", time.Duration(0), "/api/worker/v1") + return worker.NewServer(nil, q, worker.Config{BasePath: "/api/worker/v1"}) } func createBaseDepsolveFixture() []rpmmd.PackageSpec { diff --git a/internal/worker/server.go b/internal/worker/server.go index 88a2831d4..370b2f7e3 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -28,10 +28,9 @@ import ( ) type Server struct { - jobs jobqueue.JobQueue - logger *log.Logger - artifactsDir string - requestJobTimeout time.Duration + jobs jobqueue.JobQueue + logger *log.Logger + config Config } type JobStatus struct { @@ -45,15 +44,20 @@ var ErrInvalidToken = errors.New("token does not exist") var ErrJobNotRunning = errors.New("job isn't running") var ErrInvalidJobType = errors.New("job has invalid type") -func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, artifactsDir string, requestJobTimeout time.Duration, basePath string) *Server { +type Config struct { + ArtifactsDir string + RequestJobTimeout time.Duration + BasePath string +} + +func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, config Config) *Server { s := &Server{ - jobs: jobs, - logger: logger, - artifactsDir: artifactsDir, - requestJobTimeout: requestJobTimeout, + jobs: jobs, + logger: logger, + config: config, } - api.BasePath = basePath + api.BasePath = config.BasePath go s.WatchHeartbeats() return s @@ -344,7 +348,7 @@ func (s *Server) Cancel(id uuid.UUID) error { // Provides access to artifacts of a job. Returns an io.Reader for the artifact // and the artifact's size. func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error) { - if s.artifactsDir == "" { + if s.config.ArtifactsDir == "" { return nil, 0, errors.New("Artifacts not enabled") } @@ -357,7 +361,7 @@ func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error return nil, 0, fmt.Errorf("Cannot access artifacts before job is finished: %s", id) } - p := path.Join(s.artifactsDir, id.String(), name) + p := path.Join(s.config.ArtifactsDir, id.String(), name) f, err := os.Open(p) if err != nil { return nil, 0, fmt.Errorf("Error accessing artifact %s for job %s: %v", name, id, err) @@ -373,7 +377,7 @@ func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error // Deletes all artifacts for job `id`. func (s *Server) DeleteArtifacts(id uuid.UUID) error { - if s.artifactsDir == "" { + if s.config.ArtifactsDir == "" { return errors.New("Artifacts not enabled") } @@ -386,7 +390,7 @@ func (s *Server) DeleteArtifacts(id uuid.UUID) error { return fmt.Errorf("Cannot delete artifacts before job is finished: %s", id) } - return os.RemoveAll(path.Join(s.artifactsDir, id.String())) + return os.RemoveAll(path.Join(s.config.ArtifactsDir, id.String())) } func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) { @@ -415,8 +419,8 @@ func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, dequeueCtx := ctx var cancel context.CancelFunc - if s.requestJobTimeout != 0 { - dequeueCtx, cancel = context.WithTimeout(ctx, s.requestJobTimeout) + if s.config.RequestJobTimeout != 0 { + dequeueCtx, cancel = context.WithTimeout(ctx, s.config.RequestJobTimeout) defer cancel() } @@ -444,8 +448,8 @@ func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, dynamicArgs = append(dynamicArgs, result) } - if s.artifactsDir != "" { - err = os.MkdirAll(path.Join(s.artifactsDir, "tmp", token.String()), 0700) + if s.config.ArtifactsDir != "" { + err = os.MkdirAll(path.Join(s.config.ArtifactsDir, "tmp", token.String()), 0700) if err != nil { return } @@ -493,8 +497,8 @@ func (s *Server) FinishJob(token uuid.UUID, result json.RawMessage) error { // Move artifacts from the temporary location to the final job // location. Log any errors, but do not treat them as fatal. The job is // already finished. - if s.artifactsDir != "" { - err := os.Rename(path.Join(s.artifactsDir, "tmp", token.String()), path.Join(s.artifactsDir, jobId.String())) + if s.config.ArtifactsDir != "" { + err := os.Rename(path.Join(s.config.ArtifactsDir, "tmp", token.String()), path.Join(s.config.ArtifactsDir, jobId.String())) if err != nil { logrus.Errorf("Error moving artifacts for job %s: %v", jobId, err) } @@ -672,7 +676,7 @@ func (h *apiHandlers) UploadJobArtifact(ctx echo.Context, tokenstr string, name request := ctx.Request() - if h.server.artifactsDir == "" { + if h.server.config.ArtifactsDir == "" { _, err := io.Copy(ioutil.Discard, request.Body) if err != nil { return api.HTTPErrorWithInternal(api.ErrorDiscardingArtifact, err) @@ -680,7 +684,7 @@ func (h *apiHandlers) UploadJobArtifact(ctx echo.Context, tokenstr string, name return ctx.NoContent(http.StatusOK) } - f, err := os.Create(path.Join(h.server.artifactsDir, "tmp", token.String(), name)) + f, err := os.Create(path.Join(h.server.config.ArtifactsDir, "tmp", token.String(), name)) if err != nil { return api.HTTPErrorWithInternal(api.ErrorDiscardingArtifact, err) } diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go index 8db49b776..1de2391a2 100644 --- a/internal/worker/server_test.go +++ b/internal/worker/server_test.go @@ -30,7 +30,12 @@ func newTestServer(t *testing.T, tempdir string, jobRequestTimeout time.Duration if err != nil { t.Fatalf("error creating fsjobqueue: %v", err) } - return worker.NewServer(nil, q, "", jobRequestTimeout, basePath) + + config := worker.Config{ + RequestJobTimeout: jobRequestTimeout, + BasePath: basePath, + } + return worker.NewServer(nil, q, config) } // Ensure that the status request returns OK. @@ -326,7 +331,11 @@ func TestOAuth(t *testing.T) { q, err := fsjobqueue.New(tempdir) require.NoError(t, err) - workerServer := worker.NewServer(nil, q, tempdir, time.Duration(0), "/api/image-builder-worker/v1") + config := worker.Config{ + ArtifactsDir: tempdir, + BasePath: "/api/image-builder-worker/v1", + } + workerServer := worker.NewServer(nil, q, config) handler := workerServer.Handler() workSrv := httptest.NewServer(handler)