worker: NewServer: move config parameters to a new Config struct

We will have more parameters soon so let's make this prettier sooner rather
than later.

Signed-off-by: Ondřej Budai <ondrej@budai.cz>
This commit is contained in:
Ondřej Budai 2022-03-03 14:37:21 +01:00 committed by Ondřej Budai
parent 9feb7b59d6
commit c1dc58eba4
5 changed files with 49 additions and 32 deletions

View file

@ -54,10 +54,13 @@ func NewComposer(config *ComposerConfigFile, stateDir, cacheDir string) (*Compos
cacheDir: cacheDir,
}
workerConfig := worker.Config{
BasePath: config.Worker.BasePath,
}
var err error
artifactsDir := ""
if config.Worker.EnableArtifacts {
artifactsDir, err = c.ensureStateDirectory("artifacts", 0755)
workerConfig.ArtifactsDir, err = c.ensureStateDirectory("artifacts", 0755)
if err != nil {
return nil, err
}
@ -98,12 +101,12 @@ func NewComposer(config *ComposerConfigFile, stateDir, cacheDir string) (*Compos
}
}
requestJobTimeout, err := time.ParseDuration(config.Worker.RequestJobTimeout)
workerConfig.RequestJobTimeout, err = time.ParseDuration(config.Worker.RequestJobTimeout)
if err != nil {
return nil, fmt.Errorf("Unable to parse request job timeout: %v", err)
}
c.workers = worker.NewServer(c.logger, jobs, artifactsDir, requestJobTimeout, config.Worker.BasePath)
c.workers = worker.NewServer(c.logger, jobs, workerConfig)
return &c, nil
}
@ -121,8 +124,10 @@ func (c *Composer) InitWeldr(repoPaths []string, weldrListener net.Listener,
func (c *Composer) InitAPI(cert, key string, enableTLS bool, enableMTLS bool, enableJWT bool, l net.Listener) error {
config := v2.ServerConfig{
AWSBucket: c.config.Koji.AWS.Bucket,
AWSBucket: c.config.Koji.AWS.Bucket,
TenantProviderFields: c.config.CloudAPI.JWT.TenantProviderFields,
}
c.api = cloudapi.NewServer(c.workers, c.distros, config)
c.koji = kojiapi.NewServer(c.logger, c.workers, c.rpm, c.distros)

View file

@ -8,7 +8,6 @@ import (
"net/http"
"os"
"testing"
"time"
"github.com/stretchr/testify/require"
@ -26,7 +25,7 @@ import (
func newV2Server(t *testing.T, dir string) (*v2.Server, *worker.Server, context.CancelFunc) {
q, err := fsjobqueue.New(dir)
require.NoError(t, err)
workerServer := worker.NewServer(nil, q, "", time.Duration(0), "/api/worker/v1")
workerServer := worker.NewServer(nil, q, worker.Config{BasePath: "/api/worker/v1"})
distros, err := distro_mock.NewDefaultRegistry()
require.NoError(t, err)

View file

@ -57,7 +57,7 @@ func createBaseWorkersFixture(tmpdir string) *worker.Server {
if err != nil {
panic(err)
}
return worker.NewServer(nil, q, "", time.Duration(0), "/api/worker/v1")
return worker.NewServer(nil, q, worker.Config{BasePath: "/api/worker/v1"})
}
func createBaseDepsolveFixture() []rpmmd.PackageSpec {

View file

@ -28,10 +28,9 @@ import (
)
type Server struct {
jobs jobqueue.JobQueue
logger *log.Logger
artifactsDir string
requestJobTimeout time.Duration
jobs jobqueue.JobQueue
logger *log.Logger
config Config
}
type JobStatus struct {
@ -45,15 +44,20 @@ var ErrInvalidToken = errors.New("token does not exist")
var ErrJobNotRunning = errors.New("job isn't running")
var ErrInvalidJobType = errors.New("job has invalid type")
func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, artifactsDir string, requestJobTimeout time.Duration, basePath string) *Server {
type Config struct {
ArtifactsDir string
RequestJobTimeout time.Duration
BasePath string
}
func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, config Config) *Server {
s := &Server{
jobs: jobs,
logger: logger,
artifactsDir: artifactsDir,
requestJobTimeout: requestJobTimeout,
jobs: jobs,
logger: logger,
config: config,
}
api.BasePath = basePath
api.BasePath = config.BasePath
go s.WatchHeartbeats()
return s
@ -344,7 +348,7 @@ func (s *Server) Cancel(id uuid.UUID) error {
// Provides access to artifacts of a job. Returns an io.Reader for the artifact
// and the artifact's size.
func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error) {
if s.artifactsDir == "" {
if s.config.ArtifactsDir == "" {
return nil, 0, errors.New("Artifacts not enabled")
}
@ -357,7 +361,7 @@ func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error
return nil, 0, fmt.Errorf("Cannot access artifacts before job is finished: %s", id)
}
p := path.Join(s.artifactsDir, id.String(), name)
p := path.Join(s.config.ArtifactsDir, id.String(), name)
f, err := os.Open(p)
if err != nil {
return nil, 0, fmt.Errorf("Error accessing artifact %s for job %s: %v", name, id, err)
@ -373,7 +377,7 @@ func (s *Server) JobArtifact(id uuid.UUID, name string) (io.Reader, int64, error
// Deletes all artifacts for job `id`.
func (s *Server) DeleteArtifacts(id uuid.UUID) error {
if s.artifactsDir == "" {
if s.config.ArtifactsDir == "" {
return errors.New("Artifacts not enabled")
}
@ -386,7 +390,7 @@ func (s *Server) DeleteArtifacts(id uuid.UUID) error {
return fmt.Errorf("Cannot delete artifacts before job is finished: %s", id)
}
return os.RemoveAll(path.Join(s.artifactsDir, id.String()))
return os.RemoveAll(path.Join(s.config.ArtifactsDir, id.String()))
}
func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) {
@ -415,8 +419,8 @@ func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string,
dequeueCtx := ctx
var cancel context.CancelFunc
if s.requestJobTimeout != 0 {
dequeueCtx, cancel = context.WithTimeout(ctx, s.requestJobTimeout)
if s.config.RequestJobTimeout != 0 {
dequeueCtx, cancel = context.WithTimeout(ctx, s.config.RequestJobTimeout)
defer cancel()
}
@ -444,8 +448,8 @@ func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string,
dynamicArgs = append(dynamicArgs, result)
}
if s.artifactsDir != "" {
err = os.MkdirAll(path.Join(s.artifactsDir, "tmp", token.String()), 0700)
if s.config.ArtifactsDir != "" {
err = os.MkdirAll(path.Join(s.config.ArtifactsDir, "tmp", token.String()), 0700)
if err != nil {
return
}
@ -493,8 +497,8 @@ func (s *Server) FinishJob(token uuid.UUID, result json.RawMessage) error {
// Move artifacts from the temporary location to the final job
// location. Log any errors, but do not treat them as fatal. The job is
// already finished.
if s.artifactsDir != "" {
err := os.Rename(path.Join(s.artifactsDir, "tmp", token.String()), path.Join(s.artifactsDir, jobId.String()))
if s.config.ArtifactsDir != "" {
err := os.Rename(path.Join(s.config.ArtifactsDir, "tmp", token.String()), path.Join(s.config.ArtifactsDir, jobId.String()))
if err != nil {
logrus.Errorf("Error moving artifacts for job %s: %v", jobId, err)
}
@ -672,7 +676,7 @@ func (h *apiHandlers) UploadJobArtifact(ctx echo.Context, tokenstr string, name
request := ctx.Request()
if h.server.artifactsDir == "" {
if h.server.config.ArtifactsDir == "" {
_, err := io.Copy(ioutil.Discard, request.Body)
if err != nil {
return api.HTTPErrorWithInternal(api.ErrorDiscardingArtifact, err)
@ -680,7 +684,7 @@ func (h *apiHandlers) UploadJobArtifact(ctx echo.Context, tokenstr string, name
return ctx.NoContent(http.StatusOK)
}
f, err := os.Create(path.Join(h.server.artifactsDir, "tmp", token.String(), name))
f, err := os.Create(path.Join(h.server.config.ArtifactsDir, "tmp", token.String(), name))
if err != nil {
return api.HTTPErrorWithInternal(api.ErrorDiscardingArtifact, err)
}

View file

@ -30,7 +30,12 @@ func newTestServer(t *testing.T, tempdir string, jobRequestTimeout time.Duration
if err != nil {
t.Fatalf("error creating fsjobqueue: %v", err)
}
return worker.NewServer(nil, q, "", jobRequestTimeout, basePath)
config := worker.Config{
RequestJobTimeout: jobRequestTimeout,
BasePath: basePath,
}
return worker.NewServer(nil, q, config)
}
// Ensure that the status request returns OK.
@ -326,7 +331,11 @@ func TestOAuth(t *testing.T) {
q, err := fsjobqueue.New(tempdir)
require.NoError(t, err)
workerServer := worker.NewServer(nil, q, tempdir, time.Duration(0), "/api/image-builder-worker/v1")
config := worker.Config{
ArtifactsDir: tempdir,
BasePath: "/api/image-builder-worker/v1",
}
workerServer := worker.NewServer(nil, q, config)
handler := workerServer.Handler()
workSrv := httptest.NewServer(handler)