From d25ae71fef869db8c384c34f73b140ffe0be6f5e Mon Sep 17 00:00:00 2001 From: sanne Date: Mon, 18 Oct 2021 17:18:47 +0200 Subject: [PATCH] worker: Configurable timeout for RequestJob This is backwards compatible, as long as the timeout is 0 (never timeout), which is the default. In case of the dbjobqueue the underlying timeout is due to context.Canceled, context.DeadlineExceeded, or net.Error with Timeout() true. For the fsjobqueue only the first two are considered. --- cmd/osbuild-composer/composer.go | 8 +++- cmd/osbuild-composer/config.go | 40 +++++++++-------- cmd/osbuild-composer/config_test.go | 9 ++-- cmd/osbuild-worker/main.go | 4 ++ docs/news/unreleased/request-job-timeout.md | 15 +++++++ go.mod | 1 + internal/cloudapi/v2/v2_test.go | 5 +-- internal/jobqueue/dbjobqueue/dbjobqueue.go | 6 ++- internal/jobqueue/fsjobqueue/fsjobqueue.go | 5 ++- internal/jobqueue/jobqueue.go | 7 +-- .../jobqueue/jobqueuetest/jobqueuetest.go | 15 ++++++- internal/mocks/rpmmd/fixtures.go | 2 +- internal/worker/api/api.gen.go | 38 ++++++++-------- internal/worker/api/openapi.yml | 6 +++ internal/worker/client.go | 6 +++ internal/worker/server.go | 31 +++++++++---- internal/worker/server_test.go | 45 ++++++++++++++----- templates/composer.yml | 1 + vendor/modules.txt | 1 + 19 files changed, 171 insertions(+), 74 deletions(-) create mode 100644 docs/news/unreleased/request-job-timeout.md diff --git a/cmd/osbuild-composer/composer.go b/cmd/osbuild-composer/composer.go index f414c4631..ca2714f7c 100644 --- a/cmd/osbuild-composer/composer.go +++ b/cmd/osbuild-composer/composer.go @@ -11,6 +11,7 @@ import ( "net/http" "os" "path" + "time" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -87,7 +88,12 @@ func NewComposer(config *ComposerConfigFile, stateDir, cacheDir string) (*Compos } } - c.workers = worker.NewServer(c.logger, jobs, artifactsDir, config.Worker.BasePath) + requestJobTimeout, err := time.ParseDuration(config.Worker.RequestJobTimeout) + if err != nil { + return nil, fmt.Errorf("Unable to parse request job timeout: %v", err) + } + + c.workers = worker.NewServer(c.logger, jobs, artifactsDir, requestJobTimeout, config.Worker.BasePath) return &c, nil } diff --git a/cmd/osbuild-composer/config.go b/cmd/osbuild-composer/config.go index c8e779b9c..30bfb21a0 100644 --- a/cmd/osbuild-composer/config.go +++ b/cmd/osbuild-composer/config.go @@ -33,21 +33,22 @@ type AWSConfig struct { } type WorkerAPIConfig struct { - AllowedDomains []string `toml:"allowed_domains"` - CA string `toml:"ca"` - BasePath string `toml:"base_path"` - PGHost string `toml:"pg_host" env:"PGHOST"` - PGPort string `toml:"pg_port" env:"PGPORT"` - PGDatabase string `toml:"pg_database" env:"PGDATABASE"` - PGUser string `toml:"pg_user" env:"PGUSER"` - PGPassword string `toml:"pg_password" env:"PGPASSWORD"` - PGSSLMode string `toml:"pg_ssl_mode" env:"PGSSLMODE"` - EnableTLS bool `toml:"enable_tls"` - EnableMTLS bool `toml:"enable_mtls"` - EnableJWT bool `toml:"enable_jwt"` - JWTKeysURL string `toml:"jwt_keys_url"` - JWTKeysCA string `toml:"jwt_ca_file"` - JWTACLFile string `toml:"jwt_acl_file"` + AllowedDomains []string `toml:"allowed_domains"` + CA string `toml:"ca"` + RequestJobTimeout string `toml:"request_job_timeout"` + BasePath string `toml:"base_path"` + PGHost string `toml:"pg_host" env:"PGHOST"` + PGPort string `toml:"pg_port" env:"PGPORT"` + PGDatabase string `toml:"pg_database" env:"PGDATABASE"` + PGUser string `toml:"pg_user" env:"PGUSER"` + PGPassword string `toml:"pg_password" env:"PGPASSWORD"` + PGSSLMode string `toml:"pg_ssl_mode" env:"PGSSLMODE"` + EnableTLS bool `toml:"enable_tls"` + EnableMTLS bool `toml:"enable_mtls"` + EnableJWT bool `toml:"enable_jwt"` + JWTKeysURL string `toml:"jwt_keys_url"` + JWTKeysCA string `toml:"jwt_ca_file"` + JWTACLFile string `toml:"jwt_acl_file"` } type WeldrAPIConfig struct { @@ -86,10 +87,11 @@ func GetDefaultConfig() *ComposerConfigFile { }, }, Worker: WorkerAPIConfig{ - BasePath: "/api/worker/v1", - EnableTLS: true, - EnableMTLS: true, - EnableJWT: false, + RequestJobTimeout: "0", + BasePath: "/api/worker/v1", + EnableTLS: true, + EnableMTLS: true, + EnableJWT: false, }, WeldrAPI: WeldrAPIConfig{ map[string]WeldrDistroConfig{ diff --git a/cmd/osbuild-composer/config_test.go b/cmd/osbuild-composer/config_test.go index 67343242e..159707d83 100644 --- a/cmd/osbuild-composer/config_test.go +++ b/cmd/osbuild-composer/config_test.go @@ -39,10 +39,11 @@ func TestDefaultConfig(t *testing.T) { }, defaultConfig.Koji) require.Equal(t, WorkerAPIConfig{ - BasePath: "/api/worker/v1", - EnableTLS: true, - EnableMTLS: true, - EnableJWT: false, + RequestJobTimeout: "0", + BasePath: "/api/worker/v1", + EnableTLS: true, + EnableMTLS: true, + EnableJWT: false, }, defaultConfig.Worker) expectedWeldrAPIConfig := WeldrAPIConfig{ diff --git a/cmd/osbuild-worker/main.go b/cmd/osbuild-worker/main.go index 0ba2d99e8..f31999a38 100644 --- a/cmd/osbuild-worker/main.go +++ b/cmd/osbuild-worker/main.go @@ -89,6 +89,10 @@ func WatchJob(ctx context.Context, job worker.Job) { func RequestAndRunJob(client *worker.Client, acceptedJobTypes []string, jobImpls map[string]JobImplementation) error { logrus.Info("Waiting for a new job...") job, err := client.RequestJob(acceptedJobTypes, common.CurrentArch()) + if err == worker.ErrClientRequestJobTimeout { + logrus.Debugf("Requesting job timed out: %v", err) + return nil + } if err != nil { logrus.Errorf("Requesting job failed: %v", err) return err diff --git a/docs/news/unreleased/request-job-timeout.md b/docs/news/unreleased/request-job-timeout.md new file mode 100644 index 000000000..b2e520f3b --- /dev/null +++ b/docs/news/unreleased/request-job-timeout.md @@ -0,0 +1,15 @@ +# Timeout when requesting jobs + +When workers request a new job they make a blocking call to the `/api/worker/v1/jobs` +endpoint. There are cases however where a polling approach is more useful, for instance when idle +connections get terminated after a certain period of time. + +The new `request_job_timeout` option under the worker config section allows for a timeout on the +`/api/worker/v1/jobs` endpoint. It's a string with `"0"` as default, any string which is parseable +by `time.Duration.ParseDuration()` is allowed however, for instance `"10s"`. + +Because this is an expected timeout, "204 No Content" will be returned by the worker server in case +of such a timeout. The worker client will simply poll again straight away. + +To maintain backwards compatilibity the default behaviour is still a blocking connection without +timeout. diff --git a/go.mod b/go.mod index 0e51a4993..489f73cf5 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/google/go-cmp v0.5.6 github.com/google/uuid v1.3.0 github.com/gophercloud/gophercloud v0.22.0 + github.com/jackc/pgconn v1.10.0 github.com/jackc/pgtype v1.8.1 github.com/jackc/pgx/v4 v4.13.0 github.com/julienschmidt/httprouter v1.3.0 diff --git a/internal/cloudapi/v2/v2_test.go b/internal/cloudapi/v2/v2_test.go index 1cde229e6..6ec615094 100644 --- a/internal/cloudapi/v2/v2_test.go +++ b/internal/cloudapi/v2/v2_test.go @@ -8,7 +8,6 @@ import ( "net/http" "os" "testing" - "time" "github.com/stretchr/testify/require" @@ -36,9 +35,7 @@ func newV2Server(t *testing.T, dir string) (*v2.Server, *worker.Server, context. depsolveContext, cancel := context.WithCancel(context.Background()) go func() { for { - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(50*time.Millisecond)) - defer cancel() - _, token, _, _, _, err := rpmFixture.Workers.RequestJob(ctx, test_distro.TestDistroName, []string{"depsolve"}) + _, token, _, _, _, err := rpmFixture.Workers.RequestJob(context.Background(), test_distro.TestDistroName, []string{"depsolve"}) if err != nil { continue } diff --git a/internal/jobqueue/dbjobqueue/dbjobqueue.go b/internal/jobqueue/dbjobqueue/dbjobqueue.go index af6a0a4ad..099a14373 100644 --- a/internal/jobqueue/dbjobqueue/dbjobqueue.go +++ b/internal/jobqueue/dbjobqueue/dbjobqueue.go @@ -14,6 +14,7 @@ import ( "time" "github.com/google/uuid" + "github.com/jackc/pgconn" "github.com/jackc/pgtype" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" @@ -149,7 +150,7 @@ func (q *dbJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu func (q *dbJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { // Return early if the context is already canceled. if err := ctx.Err(); err != nil { - return uuid.Nil, uuid.Nil, nil, "", nil, err + return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout } conn, err := q.pool.Acquire(ctx) @@ -183,6 +184,9 @@ func (q *dbJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, } _, err = conn.Conn().WaitForNotification(ctx) if err != nil { + if pgconn.Timeout(err) { + return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout + } return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error waiting for notification on jobs channel: %v", err) } } diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index c20db8543..54ac5974e 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -178,7 +178,7 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, // Return early if the context is already canceled. if err := ctx.Err(); err != nil { - return uuid.Nil, uuid.Nil, nil, "", nil, err + return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout } // Filter q.pending by the `jobTypes`. Ignore those job types that this @@ -212,6 +212,9 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, } if err != nil { + if errors.As(err, &context.Canceled) || errors.As(err, &context.DeadlineExceeded) { + return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout + } return uuid.Nil, uuid.Nil, nil, "", nil, err } diff --git a/internal/jobqueue/jobqueue.go b/internal/jobqueue/jobqueue.go index a5a5f47a6..50fe92d8c 100644 --- a/internal/jobqueue/jobqueue.go +++ b/internal/jobqueue/jobqueue.go @@ -74,7 +74,8 @@ type JobQueue interface { } var ( - ErrNotExist = errors.New("job does not exist") - ErrNotRunning = errors.New("job is not running") - ErrCanceled = errors.New("job ws canceled") + ErrNotExist = errors.New("job does not exist") + ErrNotRunning = errors.New("job is not running") + ErrCanceled = errors.New("job ws canceled") + ErrDequeueTimeout = errors.New("dequeue context timed out or was canceled") ) diff --git a/internal/jobqueue/jobqueuetest/jobqueuetest.go b/internal/jobqueue/jobqueuetest/jobqueuetest.go index dd1178792..58f7fab00 100644 --- a/internal/jobqueue/jobqueuetest/jobqueuetest.go +++ b/internal/jobqueue/jobqueuetest/jobqueuetest.go @@ -37,6 +37,7 @@ func TestJobQueue(t *testing.T, makeJobQueue MakeJobQueue) { t.Run("dependencies", wrap(testDependencies)) t.Run("multiple-workers", wrap(testMultipleWorkers)) t.Run("heartbeats", wrap(testHeartbeats)) + t.Run("timeout", wrap(testDequeueTimeout)) } func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID) uuid.UUID { @@ -153,7 +154,7 @@ func testJobTypes(t *testing.T, q jobqueue.JobQueue) { ctx, cancel := context.WithCancel(context.Background()) cancel() id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"zebra"}) - require.Equal(t, err, context.Canceled) + require.Equal(t, err, jobqueue.ErrDequeueTimeout) require.Equal(t, uuid.Nil, id) require.Equal(t, uuid.Nil, tok) require.Empty(t, deps) @@ -161,6 +162,18 @@ func testJobTypes(t *testing.T, q jobqueue.JobQueue) { require.Nil(t, args) } +func testDequeueTimeout(t *testing.T, q jobqueue.JobQueue) { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*20) + defer cancel() + _, _, _, _, _, err := q.Dequeue(ctx, []string{"octopus"}) + require.Equal(t, jobqueue.ErrDequeueTimeout, err) + + ctx2, cancel2 := context.WithCancel(context.Background()) + cancel2() + _, _, _, _, _, err = q.Dequeue(ctx2, []string{"octopus"}) + require.Equal(t, jobqueue.ErrDequeueTimeout, err) +} + func testDependencies(t *testing.T, q jobqueue.JobQueue) { t.Run("done-before-pushing-dependant", func(t *testing.T) { one := pushTestJob(t, q, "test", nil, nil) diff --git a/internal/mocks/rpmmd/fixtures.go b/internal/mocks/rpmmd/fixtures.go index 55ded9d0a..b76403862 100644 --- a/internal/mocks/rpmmd/fixtures.go +++ b/internal/mocks/rpmmd/fixtures.go @@ -57,7 +57,7 @@ func createBaseWorkersFixture(tmpdir string) *worker.Server { if err != nil { panic(err) } - return worker.NewServer(nil, q, "", "/api/worker/v1") + return worker.NewServer(nil, q, "", time.Duration(0), "/api/worker/v1") } func createBaseDepsolveFixture() []rpmmd.PackageSpec { diff --git a/internal/worker/api/api.gen.go b/internal/worker/api/api.gen.go index cf6d89e1d..449f2005e 100644 --- a/internal/worker/api/api.gen.go +++ b/internal/worker/api/api.gen.go @@ -256,25 +256,25 @@ func RegisterHandlers(router EchoRouter, si ServerInterface) { // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/9xXTW/jNhD9KwTbo2I5TfcioIfNtlhkiyJF0kUXSINgTI0tJhKpDCknhqH/XvDDX5Ji", - "Z4H4sDlZCcmZN2/eDIdLLnRVa4XKGp4tuREFVuA//yDS5D6gLC+nPLtZ8p8JpzzjP6WbQ2k8kV5O7lHY", - "K5wioRLI22TJa9I1kpXoDQqdo/u1ixp5xo0lqWa8TXiFxsDMr+VoBMnaSq14xs9BPDwB5cz5AysnspR2", - "wZ6kLdiTpgckw/5rxuMz8Rubn50lDB8bKA0jBKMVT/quHB5w1u9kPoglHu0v+bXHRhLmPLsJway3dwxv", - "QrpdY9CeH97etgn/jPaLnlyhqbUy+KYcgxJY4nZsE61LBNWPYLV1GGPXV9Z1VXigAxS+wOyDVPlhXj17", - "fmsSPPTRJfwKHxs0gUP/1UcHJIpBGO4ffoe0WJkXt/CMAxEsegDD+SQ4OATu7RMMNPO/zyczfRJ93xut", - "Rlfw9FcUXevQWTkFYe9KLSBU00Cg+UJBJcXdyuiakgPWdwlK+F4n4R+H8u5XtywNhTAs1GsLtjHH4Np4", - "y4exx33D8L7WOVjcJ1VC05T2IO0dp/HUkAK3XG5I+S4qnDOpprrfkv8ppGHSMFDs498XbKpp3YmtZhRi", - "ZKByVoDKS2T3emJGrhVLWzqYl9fnjSxz9snBMEjshP3rDfCEz5FMcHMam7WCWvKMn43GozFPeA228Jyl", - "6G4nky5l3rq/Z2j7WD+jQ8KkMtb1OqanzBbI/FFmahRyKjFnkwXzXWfdwi/ycDjcgM4rQYUWyXhR7Tq5", - "+H3HLnfE8cwj5QlXULmgvf1N9iw1mMS71sHGZ6hqz87pWf/Wam/d2ZBJH/wv43G4T5VF5eOGui5lqJL0", - "Pt5fG/P7Uh9ibH3Gf/327Sh2PxzFbptwg6IhaRc+LecIhMSzm1tHmGmqCmgRVRBSvp04dzx12vT1qM2A", - "fGLBGgZOxCPmpb8WCZuUWjwY1igry7DF18UcZAmTEkc9RW0uhigGNPZc54s346Z/LQaaOuI5PYrD2Gm8", - "w10ePxGCxZz/aArrxuE1s9HV1arXudRv9JQurX5Atd2Veo1lJYEj1XRnvBwI5fJP/kPW+05RU6OUVLNA", - "f69LD3Rhn5i9jXig89ZgwyS5m8X1HXukWu6NDYOlPD6Gv3csmxAlg13tdEs3XY2eJl066fharhs7pIJS", - "Q/5FTz7GE/w1OvQ/3yPD5O3k/DqtamHRnhhLCNUu6V2TL4ny3QnHJdpNkyttBNmsR9SXm/1l3PIanqI5", - "P5wyqZjD7mbsCvxg/+EYg1+3yL8qfK5RWMzj2KSFaMjpq9+C3di7F7PjaPOMGpzSr6WbfVnYFV8NxJ4K", - "KQpGaBtShhmkuRSrTUOz+vVq5WgdsvPOfI/tMdIbZ2uar3pYQyXPeAq1TMNjL52f+vfy1oKI77mTrR23", - "7f8BAAD//yi1+mtgFAAA", + "H4sIAAAAAAAC/9xXW2/bNhT+KwS3R8VymvZFwB6abijSYcuQrFiBLAiOqWObiUQqh5Qdw9B/L3jxTVLi", + "FLAfmicrIXku3/n4ncMlF7qstEJlDc+W3IgpluA//yDS5D6gKC7HPLtZ8l8Jxzzjv6SbQ2k8kV6O7lHY", + "KxwjoRLIm2TJK9IVkpXoDQqdo/u1iwp5xo0lqSa8SXiJxsDEr+VoBMnKSq14xs9BPMyBcub8gZUjWUi7", + "YHNpp2yu6QHJsP/r4fBM/MZmZ2cJw8caCsMIwWjFk64rFw8463cy740lHu0u+bXHWhLmPLsJyay3twxv", + "Urpdx6A9Pry5bRL+Ge0XPbpCU2ll8KAYgxJY4HZuI60LBNXNYLW1P8a2r6ztauoD7YHwGWQfpMr34+rR", + "81uT4KEbXcKv8LFGEzD0X93ogMS0Nwz3D79DWizNs1t4xoEIFp0Aw/kkONgX3OELDDTxv08nE30Sfd8b", + "rQZXMP8rkq5x0Vk5BmHvCi0g3KaeRPOFglKKu5XRNSR7rO8ClPAXnYR/7Ku7X92y1JdCP1GvLdjaHANr", + "4y3vjz3u6w/va5WDxZeoSmjqwu6FveU0nupj4JbLDSg/BIVzJtVYdyX536k0TBoGin3854KNNa2V2GpG", + "IUcGKmdTUHmB7F6PzMBJsbSFC/Py+ryWRc4+uTAMEjth/3kDPOEzJBPcnEaxVlBJnvGzwXAw5AmvwE49", + "Zim67mTSpcwb9/cEbTfWz+giYVIZ67SO6TGzU2T+KDMVCjmWmLPRgnnVWUv4RR4Ohw7ovBKUaJGMJ9Wu", + "k4vfd+xyBxzPfKQ84QpKl7S3v6mepRqT2Gtd2PgEZeXROT3rdq3m1p0NlfTJvxsOQz9VFpXPG6qqkOGW", + "pPexf23Mv1T6kGPjK/7+27ej2P1wFLtNwg2KmqRd+LKcIxASz25uHWCmLkugRWRBKPl24dzx1HHT30dt", + "eugTL6xh4Eg8YJ76a5KwUaHFg2G1srIIW/y9mIEsYFTgoMOoTWOIZEBjz3W+OBg23bYYYGqR5/QoDqPS", + "eIe7OH4iBIu5u9Hvhu8P5rxXtHY9/619WeawVZeEWVowmIBU/GfjfDs/z+IN069W6uuy3jA8XVr9gGpb", + "JztStyLlkVSmNfD2pHL5J/8pFWhHZqhWSqpJgL/TN3r6gi/Mi62hpxdUYMNsu1vFddc/krp0BplecRke", + "w98bpk3IksEud9pXN10NwyZdOur4u1zVto8FhYb8ix59jCf4a3jof36Ehsnh6Pw6rmph0Z4YSwjlLuht", + "k8+R8s0RxxXazbcrbgTarIfm58X+Mm55DU7RnB+XmVTMxe6m/hL8U+PDMUbR9iX/qvCpQmExj4OcFqIm", + "x6+uBLtB/MWYHUabh13vu+FaummchV3xHUNsPpViyghtTcowgzSTYrWp7/VwvVo5mkK2Xr5vUR4jvHHa", + "p9lKw2oqeMZTqGQanp/p7NS/4LcWRHxhnmztuG2+BwAA//9PNd8O8hQAAA==", } // GetSwagger returns the Swagger specification corresponding to the generated code diff --git a/internal/worker/api/openapi.yml b/internal/worker/api/openapi.yml index b84e29458..520c04ee2 100644 --- a/internal/worker/api/openapi.yml +++ b/internal/worker/api/openapi.yml @@ -62,6 +62,12 @@ paths: application/json: schema: $ref: '#/components/schemas/RequestJobResponse' + '204': + description: No job was available, try again + content: + application/json: + schema: + $ref: '#/components/schemas/ObjectReference' '4XX': content: application/json: diff --git a/internal/worker/client.go b/internal/worker/client.go index b757fa65a..b5f8f48fd 100644 --- a/internal/worker/client.go +++ b/internal/worker/client.go @@ -5,6 +5,7 @@ import ( "context" "crypto/tls" "encoding/json" + "errors" "fmt" "io" "net" @@ -45,6 +46,8 @@ type Job interface { UploadArtifact(name string, reader io.Reader) error } +var ErrClientRequestJobTimeout = errors.New("Dequeue timed out, retry") + type job struct { client *Client id uuid.UUID @@ -185,6 +188,9 @@ func (c *Client) RequestJob(types []string, arch string) (Job, error) { } defer response.Body.Close() + if response.StatusCode == http.StatusNoContent { + return nil, ErrClientRequestJobTimeout + } if response.StatusCode != http.StatusCreated { return nil, errorFromResponse(response, "error requesting job") } diff --git a/internal/worker/server.go b/internal/worker/server.go index 92c0ad61e..5aa8220dd 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -26,9 +26,10 @@ import ( ) type Server struct { - jobs jobqueue.JobQueue - logger *log.Logger - artifactsDir string + jobs jobqueue.JobQueue + logger *log.Logger + artifactsDir string + requestJobTimeout time.Duration } type JobStatus struct { @@ -41,11 +42,12 @@ type JobStatus struct { var ErrInvalidToken = errors.New("token does not exist") var ErrJobNotRunning = errors.New("job isn't running") -func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, artifactsDir string, basePath string) *Server { +func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, artifactsDir string, requestJobTimeout time.Duration, basePath string) *Server { s := &Server{ - jobs: jobs, - logger: logger, - artifactsDir: artifactsDir, + jobs: jobs, + logger: logger, + artifactsDir: artifactsDir, + requestJobTimeout: requestJobTimeout, } api.BasePath = basePath @@ -217,7 +219,13 @@ func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string) jts = append(jts, t) } - jobId, token, depIDs, jobType, args, err := s.jobs.Dequeue(ctx, jts) + dequeueCtx := ctx + var cancel context.CancelFunc + if s.requestJobTimeout != 0 { + dequeueCtx, cancel = context.WithTimeout(ctx, s.requestJobTimeout) + defer cancel() + } + jobId, token, depIDs, jobType, args, err := s.jobs.Dequeue(dequeueCtx, jts) if err != nil { return uuid.Nil, uuid.Nil, "", nil, nil, err } @@ -337,6 +345,13 @@ func (h *apiHandlers) RequestJob(ctx echo.Context) error { jobId, token, jobType, jobArgs, dynamicJobArgs, err := h.server.RequestJob(ctx.Request().Context(), body.Arch, body.Types) if err != nil { + if err == jobqueue.ErrDequeueTimeout { + return ctx.JSON(http.StatusNoContent, api.ObjectReference{ + Href: fmt.Sprintf("%s/jobs", api.BasePath), + Id: uuid.Nil.String(), + Kind: "RequestJob", + }) + } return api.HTTPErrorWithInternal(api.ErrorRequestingJob, err) } diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go index bfb3e1bba..d8c746576 100644 --- a/internal/worker/server_test.go +++ b/internal/worker/server_test.go @@ -10,23 +10,25 @@ import ( "os" "strings" "testing" + "time" "github.com/google/uuid" "github.com/stretchr/testify/require" "github.com/osbuild/osbuild-composer/internal/distro" "github.com/osbuild/osbuild-composer/internal/distro/test_distro" + "github.com/osbuild/osbuild-composer/internal/jobqueue" "github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue" "github.com/osbuild/osbuild-composer/internal/test" "github.com/osbuild/osbuild-composer/internal/worker" ) -func newTestServer(t *testing.T, tempdir string, basePath string) *worker.Server { +func newTestServer(t *testing.T, tempdir string, jobRequestTimeout time.Duration, basePath string) *worker.Server { q, err := fsjobqueue.New(tempdir) if err != nil { t.Fatalf("error creating fsjobqueue: %v", err) } - return worker.NewServer(nil, q, "", basePath) + return worker.NewServer(nil, q, "", jobRequestTimeout, basePath) } // Ensure that the status request returns OK. @@ -35,7 +37,7 @@ func TestStatus(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(tempdir) - server := newTestServer(t, tempdir, "/api/worker/v1") + server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() test.TestRoute(t, handler, false, "GET", "/api/worker/v1/status", ``, http.StatusOK, `{"status":"OK", "href": "/api/worker/v1/status", "kind":"Status"}`, "message", "id") } @@ -66,7 +68,7 @@ func TestErrors(t *testing.T) { defer os.RemoveAll(tempdir) for _, c := range cases { - server := newTestServer(t, tempdir, "/api/worker/v1") + server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() test.TestRoute(t, handler, false, c.Method, c.Path, c.Body, c.ExpectedStatus, `{"kind":"Error"}`, "message", "href", "operation_id", "reason", "id", "code") } @@ -98,7 +100,7 @@ func TestErrorsAlteredBasePath(t *testing.T) { defer os.RemoveAll(tempdir) for _, c := range cases { - server := newTestServer(t, tempdir, "/api/image-builder-worker/v1") + server := newTestServer(t, tempdir, time.Duration(0), "/api/image-builder-worker/v1") handler := server.Handler() test.TestRoute(t, handler, false, c.Method, c.Path, c.Body, c.ExpectedStatus, `{"kind":"Error"}`, "message", "href", "operation_id", "reason", "id", "code") } @@ -122,7 +124,7 @@ func TestCreate(t *testing.T) { if err != nil { t.Fatalf("error creating osbuild manifest: %v", err) } - server := newTestServer(t, tempdir, "/api/worker/v1") + server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() _, err = server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) @@ -151,7 +153,7 @@ func TestCancel(t *testing.T) { if err != nil { t.Fatalf("error creating osbuild manifest: %v", err) } - server := newTestServer(t, tempdir, "/api/worker/v1") + server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) @@ -192,7 +194,7 @@ func TestUpdate(t *testing.T) { if err != nil { t.Fatalf("error creating osbuild manifest: %v", err) } - server := newTestServer(t, tempdir, "/api/worker/v1") + server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) @@ -224,7 +226,7 @@ func TestArgs(t *testing.T) { tempdir, err := ioutil.TempDir("", "worker-tests-") require.NoError(t, err) defer os.RemoveAll(tempdir) - server := newTestServer(t, tempdir, "/api/worker/v1") + server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") job := worker.OSBuildJob{ Manifest: manifest, @@ -264,7 +266,7 @@ func TestUpload(t *testing.T) { if err != nil { t.Fatalf("error creating osbuild manifest: %v", err) } - server := newTestServer(t, tempdir, "/api/worker/v1") + server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) @@ -298,7 +300,7 @@ func TestUploadAlteredBasePath(t *testing.T) { if err != nil { t.Fatalf("error creating osbuild manifest: %v", err) } - server := newTestServer(t, tempdir, "/api/image-builder-worker/v1") + server := newTestServer(t, tempdir, time.Duration(0), "/api/image-builder-worker/v1") handler := server.Handler() jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) @@ -321,7 +323,7 @@ func TestOAuth(t *testing.T) { q, err := fsjobqueue.New(tempdir) require.NoError(t, err) - workerServer := worker.NewServer(nil, q, tempdir, "/api/image-builder-worker/v1") + workerServer := worker.NewServer(nil, q, tempdir, time.Duration(0), "/api/image-builder-worker/v1") handler := workerServer.Handler() workSrv := httptest.NewServer(handler) @@ -386,3 +388,22 @@ func TestOAuth(t *testing.T) { c, err := job.Canceled() require.False(t, c) } + +func TestTimeout(t *testing.T) { + tempdir, err := ioutil.TempDir("", "worker-tests-") + require.NoError(t, err) + defer os.RemoveAll(tempdir) + + distroStruct := test_distro.New() + arch, err := distroStruct.GetArch(test_distro.TestArchName) + if err != nil { + t.Fatalf("error getting arch from distro: %v", err) + } + server := newTestServer(t, tempdir, time.Millisecond*10, "/api/image-builder-worker/v1") + + _, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + require.Equal(t, jobqueue.ErrDequeueTimeout, err) + + test.TestRoute(t, server.Handler(), false, "POST", "/api/image-builder-worker/v1/jobs", `{"arch":"arch","types":["types"]}`, http.StatusNoContent, + `{"href":"/api/image-builder-worker/v1/jobs","id":"00000000-0000-0000-0000-000000000000","kind":"RequestJob"}`) +} diff --git a/templates/composer.yml b/templates/composer.yml index e6a7a0db3..c9f402e69 100644 --- a/templates/composer.yml +++ b/templates/composer.yml @@ -175,6 +175,7 @@ objects: jwt_keys_url = "${SSO_BASE_URL}/protocol/openid-connect/certs" jwt_acl_file = "${COMPOSER_CONFIG_DIR}/acl.yml" [worker] + request_job_timeout = "20s" base_path = "/api/image-builder-worker/v1" enable_tls = false enable_mtls = false diff --git a/vendor/modules.txt b/vendor/modules.txt index f7e4bd166..88bb97e84 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -213,6 +213,7 @@ github.com/gorilla/css/scanner # github.com/jackc/chunkreader/v2 v2.0.1 github.com/jackc/chunkreader/v2 # github.com/jackc/pgconn v1.10.0 +## explicit github.com/jackc/pgconn github.com/jackc/pgconn/internal/ctxwatch github.com/jackc/pgconn/stmtcache