diff --git a/cmd/osbuild-composer/composer.go b/cmd/osbuild-composer/composer.go index f414c4631..ca2714f7c 100644 --- a/cmd/osbuild-composer/composer.go +++ b/cmd/osbuild-composer/composer.go @@ -11,6 +11,7 @@ import ( "net/http" "os" "path" + "time" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -87,7 +88,12 @@ func NewComposer(config *ComposerConfigFile, stateDir, cacheDir string) (*Compos } } - c.workers = worker.NewServer(c.logger, jobs, artifactsDir, config.Worker.BasePath) + requestJobTimeout, err := time.ParseDuration(config.Worker.RequestJobTimeout) + if err != nil { + return nil, fmt.Errorf("Unable to parse request job timeout: %v", err) + } + + c.workers = worker.NewServer(c.logger, jobs, artifactsDir, requestJobTimeout, config.Worker.BasePath) return &c, nil } diff --git a/cmd/osbuild-composer/config.go b/cmd/osbuild-composer/config.go index c8e779b9c..30bfb21a0 100644 --- a/cmd/osbuild-composer/config.go +++ b/cmd/osbuild-composer/config.go @@ -33,21 +33,22 @@ type AWSConfig struct { } type WorkerAPIConfig struct { - AllowedDomains []string `toml:"allowed_domains"` - CA string `toml:"ca"` - BasePath string `toml:"base_path"` - PGHost string `toml:"pg_host" env:"PGHOST"` - PGPort string `toml:"pg_port" env:"PGPORT"` - PGDatabase string `toml:"pg_database" env:"PGDATABASE"` - PGUser string `toml:"pg_user" env:"PGUSER"` - PGPassword string `toml:"pg_password" env:"PGPASSWORD"` - PGSSLMode string `toml:"pg_ssl_mode" env:"PGSSLMODE"` - EnableTLS bool `toml:"enable_tls"` - EnableMTLS bool `toml:"enable_mtls"` - EnableJWT bool `toml:"enable_jwt"` - JWTKeysURL string `toml:"jwt_keys_url"` - JWTKeysCA string `toml:"jwt_ca_file"` - JWTACLFile string `toml:"jwt_acl_file"` + AllowedDomains []string `toml:"allowed_domains"` + CA string `toml:"ca"` + RequestJobTimeout string `toml:"request_job_timeout"` + BasePath string `toml:"base_path"` + PGHost string `toml:"pg_host" env:"PGHOST"` + PGPort string `toml:"pg_port" env:"PGPORT"` + PGDatabase string `toml:"pg_database" env:"PGDATABASE"` + PGUser string `toml:"pg_user" env:"PGUSER"` + PGPassword string `toml:"pg_password" env:"PGPASSWORD"` + PGSSLMode string `toml:"pg_ssl_mode" env:"PGSSLMODE"` + EnableTLS bool `toml:"enable_tls"` + EnableMTLS bool `toml:"enable_mtls"` + EnableJWT bool `toml:"enable_jwt"` + JWTKeysURL string `toml:"jwt_keys_url"` + JWTKeysCA string `toml:"jwt_ca_file"` + JWTACLFile string `toml:"jwt_acl_file"` } type WeldrAPIConfig struct { @@ -86,10 +87,11 @@ func GetDefaultConfig() *ComposerConfigFile { }, }, Worker: WorkerAPIConfig{ - BasePath: "/api/worker/v1", - EnableTLS: true, - EnableMTLS: true, - EnableJWT: false, + RequestJobTimeout: "0", + BasePath: "/api/worker/v1", + EnableTLS: true, + EnableMTLS: true, + EnableJWT: false, }, WeldrAPI: WeldrAPIConfig{ map[string]WeldrDistroConfig{ diff --git a/cmd/osbuild-composer/config_test.go b/cmd/osbuild-composer/config_test.go index 67343242e..159707d83 100644 --- a/cmd/osbuild-composer/config_test.go +++ b/cmd/osbuild-composer/config_test.go @@ -39,10 +39,11 @@ func TestDefaultConfig(t *testing.T) { }, defaultConfig.Koji) require.Equal(t, WorkerAPIConfig{ - BasePath: "/api/worker/v1", - EnableTLS: true, - EnableMTLS: true, - EnableJWT: false, + RequestJobTimeout: "0", + BasePath: "/api/worker/v1", + EnableTLS: true, + EnableMTLS: true, + EnableJWT: false, }, defaultConfig.Worker) expectedWeldrAPIConfig := WeldrAPIConfig{ diff --git a/cmd/osbuild-worker/main.go b/cmd/osbuild-worker/main.go index 0ba2d99e8..f31999a38 100644 --- a/cmd/osbuild-worker/main.go +++ b/cmd/osbuild-worker/main.go @@ -89,6 +89,10 @@ func WatchJob(ctx context.Context, job worker.Job) { func RequestAndRunJob(client *worker.Client, acceptedJobTypes []string, jobImpls map[string]JobImplementation) error { logrus.Info("Waiting for a new job...") job, err := client.RequestJob(acceptedJobTypes, common.CurrentArch()) + if err == worker.ErrClientRequestJobTimeout { + logrus.Debugf("Requesting job timed out: %v", err) + return nil + } if err != nil { logrus.Errorf("Requesting job failed: %v", err) return err diff --git a/docs/news/unreleased/request-job-timeout.md b/docs/news/unreleased/request-job-timeout.md new file mode 100644 index 000000000..b2e520f3b --- /dev/null +++ b/docs/news/unreleased/request-job-timeout.md @@ -0,0 +1,15 @@ +# Timeout when requesting jobs + +When workers request a new job they make a blocking call to the `/api/worker/v1/jobs` +endpoint. There are cases however where a polling approach is more useful, for instance when idle +connections get terminated after a certain period of time. + +The new `request_job_timeout` option under the worker config section allows for a timeout on the +`/api/worker/v1/jobs` endpoint. It's a string with `"0"` as default, any string which is parseable +by `time.Duration.ParseDuration()` is allowed however, for instance `"10s"`. + +Because this is an expected timeout, "204 No Content" will be returned by the worker server in case +of such a timeout. The worker client will simply poll again straight away. + +To maintain backwards compatilibity the default behaviour is still a blocking connection without +timeout. diff --git a/go.mod b/go.mod index 0e51a4993..489f73cf5 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/google/go-cmp v0.5.6 github.com/google/uuid v1.3.0 github.com/gophercloud/gophercloud v0.22.0 + github.com/jackc/pgconn v1.10.0 github.com/jackc/pgtype v1.8.1 github.com/jackc/pgx/v4 v4.13.0 github.com/julienschmidt/httprouter v1.3.0 diff --git a/internal/cloudapi/v2/v2_test.go b/internal/cloudapi/v2/v2_test.go index 1cde229e6..6ec615094 100644 --- a/internal/cloudapi/v2/v2_test.go +++ b/internal/cloudapi/v2/v2_test.go @@ -8,7 +8,6 @@ import ( "net/http" "os" "testing" - "time" "github.com/stretchr/testify/require" @@ -36,9 +35,7 @@ func newV2Server(t *testing.T, dir string) (*v2.Server, *worker.Server, context. depsolveContext, cancel := context.WithCancel(context.Background()) go func() { for { - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(50*time.Millisecond)) - defer cancel() - _, token, _, _, _, err := rpmFixture.Workers.RequestJob(ctx, test_distro.TestDistroName, []string{"depsolve"}) + _, token, _, _, _, err := rpmFixture.Workers.RequestJob(context.Background(), test_distro.TestDistroName, []string{"depsolve"}) if err != nil { continue } diff --git a/internal/jobqueue/dbjobqueue/dbjobqueue.go b/internal/jobqueue/dbjobqueue/dbjobqueue.go index af6a0a4ad..099a14373 100644 --- a/internal/jobqueue/dbjobqueue/dbjobqueue.go +++ b/internal/jobqueue/dbjobqueue/dbjobqueue.go @@ -14,6 +14,7 @@ import ( "time" "github.com/google/uuid" + "github.com/jackc/pgconn" "github.com/jackc/pgtype" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/pgxpool" @@ -149,7 +150,7 @@ func (q *dbJobQueue) Enqueue(jobType string, args interface{}, dependencies []uu func (q *dbJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, uuid.UUID, []uuid.UUID, string, json.RawMessage, error) { // Return early if the context is already canceled. if err := ctx.Err(); err != nil { - return uuid.Nil, uuid.Nil, nil, "", nil, err + return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout } conn, err := q.pool.Acquire(ctx) @@ -183,6 +184,9 @@ func (q *dbJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, } _, err = conn.Conn().WaitForNotification(ctx) if err != nil { + if pgconn.Timeout(err) { + return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout + } return uuid.Nil, uuid.Nil, nil, "", nil, fmt.Errorf("error waiting for notification on jobs channel: %v", err) } } diff --git a/internal/jobqueue/fsjobqueue/fsjobqueue.go b/internal/jobqueue/fsjobqueue/fsjobqueue.go index c20db8543..54ac5974e 100644 --- a/internal/jobqueue/fsjobqueue/fsjobqueue.go +++ b/internal/jobqueue/fsjobqueue/fsjobqueue.go @@ -178,7 +178,7 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, // Return early if the context is already canceled. if err := ctx.Err(); err != nil { - return uuid.Nil, uuid.Nil, nil, "", nil, err + return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout } // Filter q.pending by the `jobTypes`. Ignore those job types that this @@ -212,6 +212,9 @@ func (q *fsJobQueue) Dequeue(ctx context.Context, jobTypes []string) (uuid.UUID, } if err != nil { + if errors.As(err, &context.Canceled) || errors.As(err, &context.DeadlineExceeded) { + return uuid.Nil, uuid.Nil, nil, "", nil, jobqueue.ErrDequeueTimeout + } return uuid.Nil, uuid.Nil, nil, "", nil, err } diff --git a/internal/jobqueue/jobqueue.go b/internal/jobqueue/jobqueue.go index a5a5f47a6..50fe92d8c 100644 --- a/internal/jobqueue/jobqueue.go +++ b/internal/jobqueue/jobqueue.go @@ -74,7 +74,8 @@ type JobQueue interface { } var ( - ErrNotExist = errors.New("job does not exist") - ErrNotRunning = errors.New("job is not running") - ErrCanceled = errors.New("job ws canceled") + ErrNotExist = errors.New("job does not exist") + ErrNotRunning = errors.New("job is not running") + ErrCanceled = errors.New("job ws canceled") + ErrDequeueTimeout = errors.New("dequeue context timed out or was canceled") ) diff --git a/internal/jobqueue/jobqueuetest/jobqueuetest.go b/internal/jobqueue/jobqueuetest/jobqueuetest.go index dd1178792..58f7fab00 100644 --- a/internal/jobqueue/jobqueuetest/jobqueuetest.go +++ b/internal/jobqueue/jobqueuetest/jobqueuetest.go @@ -37,6 +37,7 @@ func TestJobQueue(t *testing.T, makeJobQueue MakeJobQueue) { t.Run("dependencies", wrap(testDependencies)) t.Run("multiple-workers", wrap(testMultipleWorkers)) t.Run("heartbeats", wrap(testHeartbeats)) + t.Run("timeout", wrap(testDequeueTimeout)) } func pushTestJob(t *testing.T, q jobqueue.JobQueue, jobType string, args interface{}, dependencies []uuid.UUID) uuid.UUID { @@ -153,7 +154,7 @@ func testJobTypes(t *testing.T, q jobqueue.JobQueue) { ctx, cancel := context.WithCancel(context.Background()) cancel() id, tok, deps, typ, args, err := q.Dequeue(ctx, []string{"zebra"}) - require.Equal(t, err, context.Canceled) + require.Equal(t, err, jobqueue.ErrDequeueTimeout) require.Equal(t, uuid.Nil, id) require.Equal(t, uuid.Nil, tok) require.Empty(t, deps) @@ -161,6 +162,18 @@ func testJobTypes(t *testing.T, q jobqueue.JobQueue) { require.Nil(t, args) } +func testDequeueTimeout(t *testing.T, q jobqueue.JobQueue) { + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*20) + defer cancel() + _, _, _, _, _, err := q.Dequeue(ctx, []string{"octopus"}) + require.Equal(t, jobqueue.ErrDequeueTimeout, err) + + ctx2, cancel2 := context.WithCancel(context.Background()) + cancel2() + _, _, _, _, _, err = q.Dequeue(ctx2, []string{"octopus"}) + require.Equal(t, jobqueue.ErrDequeueTimeout, err) +} + func testDependencies(t *testing.T, q jobqueue.JobQueue) { t.Run("done-before-pushing-dependant", func(t *testing.T) { one := pushTestJob(t, q, "test", nil, nil) diff --git a/internal/mocks/rpmmd/fixtures.go b/internal/mocks/rpmmd/fixtures.go index 55ded9d0a..b76403862 100644 --- a/internal/mocks/rpmmd/fixtures.go +++ b/internal/mocks/rpmmd/fixtures.go @@ -57,7 +57,7 @@ func createBaseWorkersFixture(tmpdir string) *worker.Server { if err != nil { panic(err) } - return worker.NewServer(nil, q, "", "/api/worker/v1") + return worker.NewServer(nil, q, "", time.Duration(0), "/api/worker/v1") } func createBaseDepsolveFixture() []rpmmd.PackageSpec { diff --git a/internal/worker/api/api.gen.go b/internal/worker/api/api.gen.go index cf6d89e1d..449f2005e 100644 --- a/internal/worker/api/api.gen.go +++ b/internal/worker/api/api.gen.go @@ -256,25 +256,25 @@ func RegisterHandlers(router EchoRouter, si ServerInterface) { // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/9xXTW/jNhD9KwTbo2I5TfcioIfNtlhkiyJF0kUXSINgTI0tJhKpDCknhqH/XvDDX5Ji", - "Z4H4sDlZCcmZN2/eDIdLLnRVa4XKGp4tuREFVuA//yDS5D6gLC+nPLtZ8p8JpzzjP6WbQ2k8kV5O7lHY", - "K5wioRLI22TJa9I1kpXoDQqdo/u1ixp5xo0lqWa8TXiFxsDMr+VoBMnaSq14xs9BPDwB5cz5AysnspR2", - "wZ6kLdiTpgckw/5rxuMz8Rubn50lDB8bKA0jBKMVT/quHB5w1u9kPoglHu0v+bXHRhLmPLsJway3dwxv", - "QrpdY9CeH97etgn/jPaLnlyhqbUy+KYcgxJY4nZsE61LBNWPYLV1GGPXV9Z1VXigAxS+wOyDVPlhXj17", - "fmsSPPTRJfwKHxs0gUP/1UcHJIpBGO4ffoe0WJkXt/CMAxEsegDD+SQ4OATu7RMMNPO/zyczfRJ93xut", - "Rlfw9FcUXevQWTkFYe9KLSBU00Cg+UJBJcXdyuiakgPWdwlK+F4n4R+H8u5XtywNhTAs1GsLtjHH4Np4", - "y4exx33D8L7WOVjcJ1VC05T2IO0dp/HUkAK3XG5I+S4qnDOpprrfkv8ppGHSMFDs498XbKpp3YmtZhRi", - "ZKByVoDKS2T3emJGrhVLWzqYl9fnjSxz9snBMEjshP3rDfCEz5FMcHMam7WCWvKMn43GozFPeA228Jyl", - "6G4nky5l3rq/Z2j7WD+jQ8KkMtb1OqanzBbI/FFmahRyKjFnkwXzXWfdwi/ycDjcgM4rQYUWyXhR7Tq5", - "+H3HLnfE8cwj5QlXULmgvf1N9iw1mMS71sHGZ6hqz87pWf/Wam/d2ZBJH/wv43G4T5VF5eOGui5lqJL0", - "Pt5fG/P7Uh9ibH3Gf/327Sh2PxzFbptwg6IhaRc+LecIhMSzm1tHmGmqCmgRVRBSvp04dzx12vT1qM2A", - "fGLBGgZOxCPmpb8WCZuUWjwY1igry7DF18UcZAmTEkc9RW0uhigGNPZc54s346Z/LQaaOuI5PYrD2Gm8", - "w10ePxGCxZz/aArrxuE1s9HV1arXudRv9JQurX5Atd2Veo1lJYEj1XRnvBwI5fJP/kPW+05RU6OUVLNA", - "f69LD3Rhn5i9jXig89ZgwyS5m8X1HXukWu6NDYOlPD6Gv3csmxAlg13tdEs3XY2eJl066fharhs7pIJS", - "Q/5FTz7GE/w1OvQ/3yPD5O3k/DqtamHRnhhLCNUu6V2TL4ny3QnHJdpNkyttBNmsR9SXm/1l3PIanqI5", - "P5wyqZjD7mbsCvxg/+EYg1+3yL8qfK5RWMzj2KSFaMjpq9+C3di7F7PjaPOMGpzSr6WbfVnYFV8NxJ4K", - "KQpGaBtShhmkuRSrTUOz+vVq5WgdsvPOfI/tMdIbZ2uar3pYQyXPeAq1TMNjL52f+vfy1oKI77mTrR23", - "7f8BAAD//yi1+mtgFAAA", + "H4sIAAAAAAAC/9xXW2/bNhT+KwS3R8VymvZFwB6abijSYcuQrFiBLAiOqWObiUQqh5Qdw9B/L3jxTVLi", + "FLAfmicrIXku3/n4ncMlF7qstEJlDc+W3IgpluA//yDS5D6gKC7HPLtZ8l8Jxzzjv6SbQ2k8kV6O7lHY", + "KxwjoRLIm2TJK9IVkpXoDQqdo/u1iwp5xo0lqSa8SXiJxsDEr+VoBMnKSq14xs9BPMyBcub8gZUjWUi7", + "YHNpp2yu6QHJsP/r4fBM/MZmZ2cJw8caCsMIwWjFk64rFw8463cy740lHu0u+bXHWhLmPLsJyay3twxv", + "Urpdx6A9Pry5bRL+Ge0XPbpCU2ll8KAYgxJY4HZuI60LBNXNYLW1P8a2r6ztauoD7YHwGWQfpMr34+rR", + "81uT4KEbXcKv8LFGEzD0X93ogMS0Nwz3D79DWizNs1t4xoEIFp0Aw/kkONgX3OELDDTxv08nE30Sfd8b", + "rQZXMP8rkq5x0Vk5BmHvCi0g3KaeRPOFglKKu5XRNSR7rO8ClPAXnYR/7Ku7X92y1JdCP1GvLdjaHANr", + "4y3vjz3u6w/va5WDxZeoSmjqwu6FveU0nupj4JbLDSg/BIVzJtVYdyX536k0TBoGin3854KNNa2V2GpG", + "IUcGKmdTUHmB7F6PzMBJsbSFC/Py+ryWRc4+uTAMEjth/3kDPOEzJBPcnEaxVlBJnvGzwXAw5AmvwE49", + "Zim67mTSpcwb9/cEbTfWz+giYVIZ67SO6TGzU2T+KDMVCjmWmLPRgnnVWUv4RR4Ohw7ovBKUaJGMJ9Wu", + "k4vfd+xyBxzPfKQ84QpKl7S3v6mepRqT2Gtd2PgEZeXROT3rdq3m1p0NlfTJvxsOQz9VFpXPG6qqkOGW", + "pPexf23Mv1T6kGPjK/7+27ej2P1wFLtNwg2KmqRd+LKcIxASz25uHWCmLkugRWRBKPl24dzx1HHT30dt", + "eugTL6xh4Eg8YJ76a5KwUaHFg2G1srIIW/y9mIEsYFTgoMOoTWOIZEBjz3W+OBg23bYYYGqR5/QoDqPS", + "eIe7OH4iBIu5u9Hvhu8P5rxXtHY9/619WeawVZeEWVowmIBU/GfjfDs/z+IN069W6uuy3jA8XVr9gGpb", + "JztStyLlkVSmNfD2pHL5J/8pFWhHZqhWSqpJgL/TN3r6gi/Mi62hpxdUYMNsu1vFddc/krp0BplecRke", + "w98bpk3IksEud9pXN10NwyZdOur4u1zVto8FhYb8ix59jCf4a3jof36Ehsnh6Pw6rmph0Z4YSwjlLuht", + "k8+R8s0RxxXazbcrbgTarIfm58X+Mm55DU7RnB+XmVTMxe6m/hL8U+PDMUbR9iX/qvCpQmExj4OcFqIm", + "x6+uBLtB/MWYHUabh13vu+FaummchV3xHUNsPpViyghtTcowgzSTYrWp7/VwvVo5mkK2Xr5vUR4jvHHa", + "p9lKw2oqeMZTqGQanp/p7NS/4LcWRHxhnmztuG2+BwAA//9PNd8O8hQAAA==", } // GetSwagger returns the Swagger specification corresponding to the generated code diff --git a/internal/worker/api/openapi.yml b/internal/worker/api/openapi.yml index b84e29458..520c04ee2 100644 --- a/internal/worker/api/openapi.yml +++ b/internal/worker/api/openapi.yml @@ -62,6 +62,12 @@ paths: application/json: schema: $ref: '#/components/schemas/RequestJobResponse' + '204': + description: No job was available, try again + content: + application/json: + schema: + $ref: '#/components/schemas/ObjectReference' '4XX': content: application/json: diff --git a/internal/worker/client.go b/internal/worker/client.go index b757fa65a..b5f8f48fd 100644 --- a/internal/worker/client.go +++ b/internal/worker/client.go @@ -5,6 +5,7 @@ import ( "context" "crypto/tls" "encoding/json" + "errors" "fmt" "io" "net" @@ -45,6 +46,8 @@ type Job interface { UploadArtifact(name string, reader io.Reader) error } +var ErrClientRequestJobTimeout = errors.New("Dequeue timed out, retry") + type job struct { client *Client id uuid.UUID @@ -185,6 +188,9 @@ func (c *Client) RequestJob(types []string, arch string) (Job, error) { } defer response.Body.Close() + if response.StatusCode == http.StatusNoContent { + return nil, ErrClientRequestJobTimeout + } if response.StatusCode != http.StatusCreated { return nil, errorFromResponse(response, "error requesting job") } diff --git a/internal/worker/server.go b/internal/worker/server.go index 92c0ad61e..5aa8220dd 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -26,9 +26,10 @@ import ( ) type Server struct { - jobs jobqueue.JobQueue - logger *log.Logger - artifactsDir string + jobs jobqueue.JobQueue + logger *log.Logger + artifactsDir string + requestJobTimeout time.Duration } type JobStatus struct { @@ -41,11 +42,12 @@ type JobStatus struct { var ErrInvalidToken = errors.New("token does not exist") var ErrJobNotRunning = errors.New("job isn't running") -func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, artifactsDir string, basePath string) *Server { +func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, artifactsDir string, requestJobTimeout time.Duration, basePath string) *Server { s := &Server{ - jobs: jobs, - logger: logger, - artifactsDir: artifactsDir, + jobs: jobs, + logger: logger, + artifactsDir: artifactsDir, + requestJobTimeout: requestJobTimeout, } api.BasePath = basePath @@ -217,7 +219,13 @@ func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string) jts = append(jts, t) } - jobId, token, depIDs, jobType, args, err := s.jobs.Dequeue(ctx, jts) + dequeueCtx := ctx + var cancel context.CancelFunc + if s.requestJobTimeout != 0 { + dequeueCtx, cancel = context.WithTimeout(ctx, s.requestJobTimeout) + defer cancel() + } + jobId, token, depIDs, jobType, args, err := s.jobs.Dequeue(dequeueCtx, jts) if err != nil { return uuid.Nil, uuid.Nil, "", nil, nil, err } @@ -337,6 +345,13 @@ func (h *apiHandlers) RequestJob(ctx echo.Context) error { jobId, token, jobType, jobArgs, dynamicJobArgs, err := h.server.RequestJob(ctx.Request().Context(), body.Arch, body.Types) if err != nil { + if err == jobqueue.ErrDequeueTimeout { + return ctx.JSON(http.StatusNoContent, api.ObjectReference{ + Href: fmt.Sprintf("%s/jobs", api.BasePath), + Id: uuid.Nil.String(), + Kind: "RequestJob", + }) + } return api.HTTPErrorWithInternal(api.ErrorRequestingJob, err) } diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go index bfb3e1bba..d8c746576 100644 --- a/internal/worker/server_test.go +++ b/internal/worker/server_test.go @@ -10,23 +10,25 @@ import ( "os" "strings" "testing" + "time" "github.com/google/uuid" "github.com/stretchr/testify/require" "github.com/osbuild/osbuild-composer/internal/distro" "github.com/osbuild/osbuild-composer/internal/distro/test_distro" + "github.com/osbuild/osbuild-composer/internal/jobqueue" "github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue" "github.com/osbuild/osbuild-composer/internal/test" "github.com/osbuild/osbuild-composer/internal/worker" ) -func newTestServer(t *testing.T, tempdir string, basePath string) *worker.Server { +func newTestServer(t *testing.T, tempdir string, jobRequestTimeout time.Duration, basePath string) *worker.Server { q, err := fsjobqueue.New(tempdir) if err != nil { t.Fatalf("error creating fsjobqueue: %v", err) } - return worker.NewServer(nil, q, "", basePath) + return worker.NewServer(nil, q, "", jobRequestTimeout, basePath) } // Ensure that the status request returns OK. @@ -35,7 +37,7 @@ func TestStatus(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(tempdir) - server := newTestServer(t, tempdir, "/api/worker/v1") + server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() test.TestRoute(t, handler, false, "GET", "/api/worker/v1/status", ``, http.StatusOK, `{"status":"OK", "href": "/api/worker/v1/status", "kind":"Status"}`, "message", "id") } @@ -66,7 +68,7 @@ func TestErrors(t *testing.T) { defer os.RemoveAll(tempdir) for _, c := range cases { - server := newTestServer(t, tempdir, "/api/worker/v1") + server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() test.TestRoute(t, handler, false, c.Method, c.Path, c.Body, c.ExpectedStatus, `{"kind":"Error"}`, "message", "href", "operation_id", "reason", "id", "code") } @@ -98,7 +100,7 @@ func TestErrorsAlteredBasePath(t *testing.T) { defer os.RemoveAll(tempdir) for _, c := range cases { - server := newTestServer(t, tempdir, "/api/image-builder-worker/v1") + server := newTestServer(t, tempdir, time.Duration(0), "/api/image-builder-worker/v1") handler := server.Handler() test.TestRoute(t, handler, false, c.Method, c.Path, c.Body, c.ExpectedStatus, `{"kind":"Error"}`, "message", "href", "operation_id", "reason", "id", "code") } @@ -122,7 +124,7 @@ func TestCreate(t *testing.T) { if err != nil { t.Fatalf("error creating osbuild manifest: %v", err) } - server := newTestServer(t, tempdir, "/api/worker/v1") + server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() _, err = server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) @@ -151,7 +153,7 @@ func TestCancel(t *testing.T) { if err != nil { t.Fatalf("error creating osbuild manifest: %v", err) } - server := newTestServer(t, tempdir, "/api/worker/v1") + server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) @@ -192,7 +194,7 @@ func TestUpdate(t *testing.T) { if err != nil { t.Fatalf("error creating osbuild manifest: %v", err) } - server := newTestServer(t, tempdir, "/api/worker/v1") + server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) @@ -224,7 +226,7 @@ func TestArgs(t *testing.T) { tempdir, err := ioutil.TempDir("", "worker-tests-") require.NoError(t, err) defer os.RemoveAll(tempdir) - server := newTestServer(t, tempdir, "/api/worker/v1") + server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") job := worker.OSBuildJob{ Manifest: manifest, @@ -264,7 +266,7 @@ func TestUpload(t *testing.T) { if err != nil { t.Fatalf("error creating osbuild manifest: %v", err) } - server := newTestServer(t, tempdir, "/api/worker/v1") + server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1") handler := server.Handler() jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) @@ -298,7 +300,7 @@ func TestUploadAlteredBasePath(t *testing.T) { if err != nil { t.Fatalf("error creating osbuild manifest: %v", err) } - server := newTestServer(t, tempdir, "/api/image-builder-worker/v1") + server := newTestServer(t, tempdir, time.Duration(0), "/api/image-builder-worker/v1") handler := server.Handler() jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: manifest}) @@ -321,7 +323,7 @@ func TestOAuth(t *testing.T) { q, err := fsjobqueue.New(tempdir) require.NoError(t, err) - workerServer := worker.NewServer(nil, q, tempdir, "/api/image-builder-worker/v1") + workerServer := worker.NewServer(nil, q, tempdir, time.Duration(0), "/api/image-builder-worker/v1") handler := workerServer.Handler() workSrv := httptest.NewServer(handler) @@ -386,3 +388,22 @@ func TestOAuth(t *testing.T) { c, err := job.Canceled() require.False(t, c) } + +func TestTimeout(t *testing.T) { + tempdir, err := ioutil.TempDir("", "worker-tests-") + require.NoError(t, err) + defer os.RemoveAll(tempdir) + + distroStruct := test_distro.New() + arch, err := distroStruct.GetArch(test_distro.TestArchName) + if err != nil { + t.Fatalf("error getting arch from distro: %v", err) + } + server := newTestServer(t, tempdir, time.Millisecond*10, "/api/image-builder-worker/v1") + + _, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{"osbuild"}) + require.Equal(t, jobqueue.ErrDequeueTimeout, err) + + test.TestRoute(t, server.Handler(), false, "POST", "/api/image-builder-worker/v1/jobs", `{"arch":"arch","types":["types"]}`, http.StatusNoContent, + `{"href":"/api/image-builder-worker/v1/jobs","id":"00000000-0000-0000-0000-000000000000","kind":"RequestJob"}`) +} diff --git a/templates/composer.yml b/templates/composer.yml index e6a7a0db3..c9f402e69 100644 --- a/templates/composer.yml +++ b/templates/composer.yml @@ -175,6 +175,7 @@ objects: jwt_keys_url = "${SSO_BASE_URL}/protocol/openid-connect/certs" jwt_acl_file = "${COMPOSER_CONFIG_DIR}/acl.yml" [worker] + request_job_timeout = "20s" base_path = "/api/image-builder-worker/v1" enable_tls = false enable_mtls = false diff --git a/vendor/modules.txt b/vendor/modules.txt index f7e4bd166..88bb97e84 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -213,6 +213,7 @@ github.com/gorilla/css/scanner # github.com/jackc/chunkreader/v2 v2.0.1 github.com/jackc/chunkreader/v2 # github.com/jackc/pgconn v1.10.0 +## explicit github.com/jackc/pgconn github.com/jackc/pgconn/internal/ctxwatch github.com/jackc/pgconn/stmtcache