diff --git a/internal/cloudapi/v2/v2_koji_test.go b/internal/cloudapi/v2/v2_koji_test.go index 9d1611277..7a2fa3c34 100644 --- a/internal/cloudapi/v2/v2_koji_test.go +++ b/internal/cloudapi/v2/v2_koji_test.go @@ -418,7 +418,7 @@ func TestKojiCompose(t *testing.T) { require.NoError(t, err) // handle koji-init - _, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeKojiInit}, []string{""}) + _, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeKojiInit}, []string{""}, uuid.Nil) require.NoError(t, err) require.Equal(t, worker.JobTypeKojiInit, jobType) @@ -445,7 +445,7 @@ func TestKojiCompose(t *testing.T) { // handle build jobs for i := 0; i < len(buildJobIDs); i++ { - jobID, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeOSBuild}, []string{""}) + jobID, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil) require.NoError(t, err) require.Equal(t, worker.JobTypeOSBuild, jobType) @@ -485,7 +485,7 @@ func TestKojiCompose(t *testing.T) { } // handle koji-finalize - finalizeID, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeKojiFinalize}, []string{""}) + finalizeID, token, jobType, rawJob, _, err := workerServer.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeKojiFinalize}, []string{""}, uuid.Nil) require.NoError(t, err) require.Equal(t, worker.JobTypeKojiFinalize, jobType) diff --git a/internal/cloudapi/v2/v2_test.go b/internal/cloudapi/v2/v2_test.go index 439eaf886..32c1cfc95 100644 --- a/internal/cloudapi/v2/v2_test.go +++ b/internal/cloudapi/v2/v2_test.go @@ -8,6 +8,7 @@ import ( "sync" "testing" + "github.com/google/uuid" "github.com/stretchr/testify/require" "github.com/osbuild/osbuild-composer/pkg/jobqueue" @@ -49,7 +50,7 @@ func newV2Server(t *testing.T, dir string, depsolveChannels []string, enableJWT go func() { defer wg.Done() for { - _, token, _, _, _, err := workerServer.RequestJob(depsolveContext, test_distro.TestDistroName, []string{worker.JobTypeDepsolve}, depsolveChannels) + _, token, _, _, _, err := workerServer.RequestJob(depsolveContext, test_distro.TestDistroName, []string{worker.JobTypeDepsolve}, depsolveChannels, uuid.Nil) select { case <-depsolveContext.Done(): return @@ -86,7 +87,7 @@ func newV2Server(t *testing.T, dir string, depsolveChannels []string, enableJWT go func() { defer wg.Done() for { - _, token, _, _, _, err := workerServer.RequestJob(ostreeResolveContext, test_distro.TestDistroName, []string{worker.JobTypeOSTreeResolve}, depsolveChannels) + _, token, _, _, _, err := workerServer.RequestJob(ostreeResolveContext, test_distro.TestDistroName, []string{worker.JobTypeOSTreeResolve}, depsolveChannels, uuid.Nil) select { case <-ostreeResolveContext.Done(): return @@ -597,7 +598,7 @@ func TestComposeStatusSuccess(t *testing.T) { "kind": "ComposeId" }`, "id") - jobId, token, jobType, args, dynArgs, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeOSBuild}, []string{""}) + jobId, token, jobType, args, dynArgs, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil) require.NoError(t, err) require.Equal(t, worker.JobTypeOSBuild, jobType) @@ -708,7 +709,7 @@ func TestComposeStatusFailure(t *testing.T) { "kind": "ComposeId" }`, "id") - jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeOSBuild}, []string{""}) + jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil) require.NoError(t, err) require.Equal(t, worker.JobTypeOSBuild, jobType) @@ -779,7 +780,7 @@ func TestComposeJobError(t *testing.T) { "kind": "ComposeId" }`, "id") - jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeOSBuild}, []string{""}) + jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil) require.NoError(t, err) require.Equal(t, worker.JobTypeOSBuild, jobType) @@ -844,7 +845,7 @@ func TestComposeDependencyError(t *testing.T) { "kind": "ComposeId" }`, "id") - jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeOSBuild}, []string{""}) + jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil) require.NoError(t, err) require.Equal(t, worker.JobTypeOSBuild, jobType) @@ -917,7 +918,7 @@ func TestComposeTargetErrors(t *testing.T) { "kind": "ComposeId" }`, "id") - jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeOSBuild}, []string{""}) + jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil) require.NoError(t, err) require.Equal(t, worker.JobTypeOSBuild, jobType) @@ -1337,7 +1338,7 @@ func TestImageFromCompose(t *testing.T) { "kind": "ComposeId" }`, "id") - jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeOSBuild}, []string{""}) + jobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil) require.NoError(t, err) require.Equal(t, worker.JobTypeOSBuild, jobType) @@ -1406,7 +1407,7 @@ func TestImageFromCompose(t *testing.T) { "kind": "CloneComposeId" }`, jobId), "id") - _, token, jobType, _, _, err = wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeAWSEC2Copy}, []string{""}) + _, token, jobType, _, _, err = wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeAWSEC2Copy}, []string{""}, uuid.Nil) require.NoError(t, err) require.Equal(t, worker.JobTypeAWSEC2Copy, jobType) @@ -1418,7 +1419,7 @@ func TestImageFromCompose(t *testing.T) { err = wrksrv.FinishJob(token, res) require.NoError(t, err) - imgJobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeAWSEC2Share}, []string{""}) + imgJobId, token, jobType, _, _, err := wrksrv.RequestJob(context.Background(), test_distro.TestArch3Name, []string{worker.JobTypeAWSEC2Share}, []string{""}, uuid.Nil) require.NoError(t, err) require.Equal(t, worker.JobTypeAWSEC2Share, jobType) diff --git a/internal/weldr/compose_test.go b/internal/weldr/compose_test.go index 628c6956f..38907c270 100644 --- a/internal/weldr/compose_test.go +++ b/internal/weldr/compose_test.go @@ -6,6 +6,7 @@ import ( "os" "testing" + "github.com/google/uuid" "github.com/stretchr/testify/require" "github.com/osbuild/images/pkg/distro" @@ -44,7 +45,7 @@ func TestComposeStatusFromLegacyError(t *testing.T) { jobId, err := api.workers.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "") require.NoError(t, err) - j, token, _, _, _, err := api.workers.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}) + j, token, _, _, _, err := api.workers.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil) require.NoError(t, err) require.Equal(t, jobId, j) @@ -97,7 +98,7 @@ func TestComposeStatusFromJobError(t *testing.T) { jobId, err := api.workers.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "") require.NoError(t, err) - j, token, _, _, _, err := api.workers.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}) + j, token, _, _, _, err := api.workers.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil) require.NoError(t, err) require.Equal(t, jobId, j) diff --git a/internal/worker/api/api.gen.go b/internal/worker/api/api.gen.go index 32bcbae9f..12241ac37 100644 --- a/internal/worker/api/api.gen.go +++ b/internal/worker/api/api.gen.go @@ -51,10 +51,24 @@ type ObjectReference struct { Kind string `json:"kind"` } +// PostWorkersRequest defines model for PostWorkersRequest. +type PostWorkersRequest struct { + Arch string `json:"arch"` +} + +// PostWorkersResponse defines model for PostWorkersResponse. +type PostWorkersResponse struct { + // Embedded struct due to allOf(#/components/schemas/ObjectReference) + ObjectReference `yaml:",inline"` + // Embedded fields due to inline allOf schema + WorkerId string `json:"worker_id"` +} + // RequestJobRequest defines model for RequestJobRequest. type RequestJobRequest struct { - Arch string `json:"arch"` - Types []string `json:"types"` + Arch string `json:"arch"` + Types []string `json:"types"` + WorkerId *string `json:"worker_id,omitempty"` } // RequestJobResponse defines model for RequestJobResponse. @@ -91,12 +105,18 @@ type RequestJobJSONBody RequestJobRequest // UpdateJobJSONBody defines parameters for UpdateJob. type UpdateJobJSONBody UpdateJobRequest +// PostWorkersJSONBody defines parameters for PostWorkers. +type PostWorkersJSONBody PostWorkersRequest + // RequestJobJSONRequestBody defines body for RequestJob for application/json ContentType. type RequestJobJSONRequestBody RequestJobJSONBody // UpdateJobJSONRequestBody defines body for UpdateJob for application/json ContentType. type UpdateJobJSONRequestBody UpdateJobJSONBody +// PostWorkersJSONRequestBody defines body for PostWorkers for application/json ContentType. +type PostWorkersJSONRequestBody PostWorkersJSONBody + // ServerInterface represents all server handlers. type ServerInterface interface { // Get error description @@ -120,6 +140,12 @@ type ServerInterface interface { // status // (GET /status) GetStatus(ctx echo.Context) error + // Create a new worker + // (POST /workers) + PostWorkers(ctx echo.Context) error + // Refresh worker status + // (POST /workers/{worker_id}/status) + PostWorkerStatus(ctx echo.Context, workerId string) error } // ServerInterfaceWrapper converts echo contexts to parameters. @@ -228,6 +254,31 @@ func (w *ServerInterfaceWrapper) GetStatus(ctx echo.Context) error { return err } +// PostWorkers converts echo context to params. +func (w *ServerInterfaceWrapper) PostWorkers(ctx echo.Context) error { + var err error + + // Invoke the callback with all the unmarshalled arguments + err = w.Handler.PostWorkers(ctx) + return err +} + +// PostWorkerStatus converts echo context to params. +func (w *ServerInterfaceWrapper) PostWorkerStatus(ctx echo.Context) error { + var err error + // ------------- Path parameter "worker_id" ------------- + var workerId string + + err = runtime.BindStyledParameterWithLocation("simple", false, "worker_id", runtime.ParamLocationPath, ctx.Param("worker_id"), &workerId) + if err != nil { + return echo.NewHTTPError(http.StatusBadRequest, fmt.Sprintf("Invalid format for parameter worker_id: %s", err)) + } + + // Invoke the callback with all the unmarshalled arguments + err = w.Handler.PostWorkerStatus(ctx, workerId) + return err +} + // This is a simple interface which specifies echo.Route addition functions which // are present on both echo.Echo and echo.Group, since we want to allow using // either of them for path registration @@ -263,32 +314,39 @@ func RegisterHandlersWithBaseURL(router EchoRouter, si ServerInterface, baseURL router.PUT(baseURL+"/jobs/:token/artifacts/:name", wrapper.UploadJobArtifact) router.GET(baseURL+"/openapi", wrapper.GetOpenapi) router.GET(baseURL+"/status", wrapper.GetStatus) + router.POST(baseURL+"/workers", wrapper.PostWorkers) + router.POST(baseURL+"/workers/:worker_id/status", wrapper.PostWorkerStatus) } // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/9xYX2/bNhD/KgS3hw2QLadpXwTsoemGIh26DMmKFciC4EydLSYSqZAnO4ah7z6QlP9J", - "ip0A8UPzJFk83p/f/Xh39JILXZRaoSLLkyW3IsMC/OsfxmjjXiDPLyY8uV7ynw1OeMJ/ijeb4mZHfDG+", - "Q0GXOEGDSiCvoyUvjS7RkESvUOgU3ZMWJfKEWzJSTXkd8QKthalfS9EKI0uSWvGEn4G4n4NJmbMHJMcy", - "l7Rgc0kZm2tzj8ay/6rR6FT8xmanpxHDhwpyywyC1YpHXVPOH3Dab2Xa60uztbvk1x4qaTDlyXUIZi3e", - "UrwJ6Wbtg/b48PqmjvhnpC96fIm21Mriq2IMSmCO27GNtc4RVDeClWi/j21bSdtU5h3tgfAJZO+lSg/j", - "6tHzolGw0PUu4pf4UKENGPq3rndgRNbrhvvgJSRhYZ8U4QkHY2DRcTDsj4KBQ869foLBTP3zcTDVg8b2", - "ndVqeAnzrw3paucdyQkIus21gHCaegJNFwoKKW5XSteQHNC+C1DE9xoJHw7l3a9uaeoLoZ+oVwRU2WNg", - "bb3mw743cv3ufStTINxHVYO2yukg7C2jza4+Bm6Z3IDyIiicMakmuluS/8mkZdIyUOzj3+dsos26EpNm", - "JsTIQKUsA5XmyO702A5dKZaUOzcvrs4qmafsk3PDomED9q9XwCM+Q2ODmZOmWCsoJU/46XA0HPGIl0CZ", - "xyxG151svJRp7X5Pkbq+fkbnCZPKkqt1TE8YZcj8VmZLFHIiMWXjBfNVZ13Cz9OwOXRAZ9VAgYTGelLt", - "Gjn/fUcvd8DxxHvKI66gcEF7/ZvskakwanqtcxsfoSg9Oien3a5V37i9IZM++HejUeinilD5uKEscxlO", - "SXzX9K+N+n2pDzHWPuPvv38/it4PR9FbR9yiqIykhU/LGYJBw5PrGweYrYoCzKJhQUj5duLc9thx059H", - "bXvo0xxYy8CReMg89dckYeNci3vLKkUyDyL+XMxA5jDOcdhh1KYxNGRAS2c6XbwaNt22GGBqkefkKAab", - "SuMN7uL4ySAQpu5Evxu9fzXjvUVr1/Jf2qdlDlt5iRiZBYMpSMV/NM634/Ms3jD9clV9XdQbhsdL0veo", - "tutkp9StSHmkKtMaeHtCufiT/5AVaKfMmEopqaYB/k7f6OkLPjF7W0NPLyiBwmy7m8V11z9SdekMMr3F", - "ZXQMe2+YNiFKBrvcaR/deDUM23jpqOPPcllRHwtyDekXPf7Y7ODP4aF/vISG0evR+Xlc1YKQBpYMQrEL", - "elvlU6R8c8RxiXbz7YobgTbrofnpYn/RiDwHp0adH5eZVMz57qb+AvxV48MxRtH2If+m8LFEQZg2g5wW", - "ojKOX90S7AbxvT47jDYXu957w5V00zgLUs09xrB5JkXGDFJllGUWzUyKlVDf7eFqtXK0Ctm6+b7F8tjA", - "20z7ZtZ/B/sKUrFfSqPTSrhPv7IgyyNemZwnPCMqbRLHUMqhY4fN5ISGQhfuSywLmOJg7K6laAbhOhvP", - "Tvw/Ai1mEExdkd6j3hJM8YVGgpaXiG0t3NT/BwAA//+IToO0xRUAAA==", + "H4sIAAAAAAAC/9xY224bvRF+lQFboC2w1spxeiOgF3FaBE6ROrAb/AFiI5jdHWlpc8kNOWtFEPTuP0iu", + "jruSbMC6SK5kS8M5fPNxDpyL3FS10aTZidFcuLykCsOf/7HWWP8HKnU9FqNvc/FXS2MxEn9J14fS9kR6", + "nT1Qzjc0Jks6J7FI5qK2pibLkoLC3BTkP3lWkxgJx1bqiVgkoiLncBJ+K8jlVtYsjRYjcYn54xRtAd4e", + "ssykkjyDqeQSpsY+knVw1wyHF/m/4OniIgH60aByYAmd0SLpmvL+oNf+XRa9vrRHuz+F33400lIhRt9i", + "MCvxHcXrkO5XPpiAj1jcLxLxgfijyW7I1UY7elWMUeekaDO2zBhFqLsRLEX7fdy1Ndo1VQZHeyDcg+yj", + "1MVxXAN6QTSJFrreJeKzcfxHzP8N/WjIcdc9tHl53FyQOmri9fMU2duScGxshSxGomlC+IddXh/tT1yL", + "SCDYy7CJ6oKEZKrcXhExEmgtzvz/W5Ec9jxqT/aDvun662OOdhI+f55NzFlr+8EZPbjB6af2vi68dyzH", + "mPN3ZXKMhagHhmKmsZL596XSFWBHtO/Cd9BI/OI5sIoNTX0h9FPllpGbk/DbBc3HfW/l+t37UhfIdIjI", + "llyj+CjsO0bbU30M3DC5BuVFUHhjUo9Nt5v9v5QOpAPU8O7zFYyNXTUxNmBjjIC6gBJ1oQgeTOYGvh5I", + "Vt7N69vLRqoC3ns3HFk4g1iiRCKeyLpo5rztcxprKUbiYjAcDEUiauQyYJaSb+wuncti4f+fEHd9/UDe", + "E5DasW8TYMbAJUE4Cq6mXI4lFZDNIFSsVfe7KuLhODx4qxYrYrIukGrbyNW/t/QKD5wYBU9FIjRWPuig", + "f509tg0l7Zji3aafWNUBnfOLntJ578/GTIbg3wyHcRTRTDrEjXWtZLwl6UPb+tfqD6U+xrgIGX/79etJ", + "9P7zJHoXiXCUN1byLKTlktCSFaNv9x4w11QV2lnLgpjyzcT546nnZriPxvXQp72wDtCTeACB+iuSQKZM", + "/uig0SxVFAn34gmlwkzRoMOodWNoyUCOL00xezVsuk0zwrRDnvOTGGwrTTC4jeN7S8hU+Bv9Zvj21Yz3", + "Fq1ty/8zIS1T3MhLAmxngBOUWvxqnN+NL7B4zfSbZfX1Ua8Zns7ZPJLerJOdUrck5YmqzM6u0BPK9X/F", + "L1mBtsqMbbSWehLh7/SNnr4QEnOwNfT0gho5Tr7bWVx1/RNVl84g01tchqew9xvTJkYJuM2d3aubLodh", + "l849dcJdrhvuY4EyWHw02bv2hHgOD8PHS2iYvB6dn8dVkzPxmWNLWG2DvqtyHyl/O+L4RPv5dsmNSJvV", + "0Ly/2F+3Is/BqVUXxmWQGrzv0O76PtBTjKK7l/yLpp815UxFO8iZPG+s51e3BPtB/KDPHqP1Yte7N9xK", + "P41DlGr3GAvTUuYlWOLGageO7JPMl0J928Pt8peTVcidzfd3LI8tvCFr7Z65f2CPg6af1zVN27U0rKLL", + "pCE0jSzaTLrSNKqAjKBxVHieoFLgmsz5gqQZclTKDe50J7kbz2on6rU9b4MnHuX7ngr3z/JbEO/cwijS", + "lVjmL52vXtsWGzfxeJNaP9Id6jDH3iH9ALVn3RtbciW5UENKQssZIS+3+2g9CXzyCly4+l4YPVX0ikhs", + "oPBxVFITmCeyqNSdbtlYEiouB+2TR3u6MKANQ8xtAVOpVPgiI3ikmoEt5o/eDxyzZzSwrMg0PICr8Z0u", + "rKlrKlbPMFOytJomwiqQAPvVNaxCXndG3hajZSoO8/tgDduGzzV5Tm7cKDWDJgw1S5f+5mDjGm+uKwHw", + "5UVdyYTF3j71P7d8Qqnh77U1RZP7r/4BUVYkorFKjETJXLtRmmItB74RuFKOeZCbyn+TygondJY1UhVk", + "z6Ll9Ok8PP7tNAHGiUfwgHrHOKEXGolaXiK28cP94s8AAAD//7D+9ubrGgAA", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/internal/worker/api/errors.go b/internal/worker/api/errors.go index 55c9524d7..157dea08c 100644 --- a/internal/worker/api/errors.go +++ b/internal/worker/api/errors.go @@ -23,7 +23,8 @@ const ( ErrorErrorNotFound ServiceErrorCode = 14 ErrorInvalidJobType ServiceErrorCode = 15 ErrorTenantNotFound ServiceErrorCode = 16 - // ErrorTokenNotFound ServiceErrorCode = 6 + ErrorMalformedWorkerId ServiceErrorCode = 17 + ErrorWorkerIdNotFound ServiceErrorCode = 18 // internal errors ErrorDiscardingArtifact ServiceErrorCode = 1000 @@ -34,6 +35,8 @@ const ( ErrorRetrievingJobStatus ServiceErrorCode = 1005 ErrorRequestingJob ServiceErrorCode = 1006 ErrorFailedLoadingOpenAPISpec ServiceErrorCode = 1007 + ErrorInsertingWorker ServiceErrorCode = 1008 + ErrorUpdatingWorkerStatus ServiceErrorCode = 1009 // Errors contained within this file ErrorUnspecified ServiceErrorCode = 10000 @@ -56,11 +59,20 @@ func getServiceErrors() serviceErrors { return serviceErrors{ serviceError{ErrorUnsupportedMediaType, http.StatusUnsupportedMediaType, "Only 'application/json' content is supported"}, serviceError{ErrorBodyDecodingError, http.StatusBadRequest, "Malformed json, unable to decode body"}, - serviceError{ErrorJobNotFound, http.StatusNotFound, "Token not found"}, serviceError{ErrorJobNotRunning, http.StatusBadRequest, "Job is not running"}, serviceError{ErrorMalformedJobId, http.StatusBadRequest, "Given job id is not a uuidv4"}, serviceError{ErrorMalformedJobToken, http.StatusBadRequest, "Given job id is not a uuidv4"}, + serviceError{ErrorInvalidErrorId, http.StatusBadRequest, "Invalid format for error id, it should be an integer as a string"}, + serviceError{ErrorResourceNotFound, http.StatusNotFound, "Requested resource doesn't exist"}, + serviceError{ErrorMethodNotAllowed, http.StatusMethodNotAllowed, "Requested method isn't supported for resource"}, + serviceError{ErrorNotAcceptable, http.StatusNotAcceptable, "Only 'application/json' content is supported"}, + serviceError{ErrorErrorNotFound, http.StatusNotFound, "Error with given id not found"}, + serviceError{ErrorInvalidJobType, http.StatusBadRequest, "Requested job type cannot be dequeued"}, + serviceError{ErrorTenantNotFound, http.StatusBadRequest, "Tenant not found in JWT claims"}, + serviceError{ErrorMalformedWorkerId, http.StatusBadRequest, "Given worker id is not a uuidv4"}, + serviceError{ErrorWorkerIdNotFound, http.StatusBadRequest, "Given worker id doesn't exist"}, + serviceError{ErrorDiscardingArtifact, http.StatusInternalServerError, "Error discarding artifact"}, serviceError{ErrorCreatingArtifact, http.StatusInternalServerError, "Error creating artifact"}, serviceError{ErrorWritingArtifact, http.StatusInternalServerError, "Error writing artifact"}, @@ -68,14 +80,9 @@ func getServiceErrors() serviceErrors { serviceError{ErrorFinishingJob, http.StatusInternalServerError, "Error finishing job"}, serviceError{ErrorRetrievingJobStatus, http.StatusInternalServerError, "Error requesting job"}, serviceError{ErrorRequestingJob, http.StatusInternalServerError, "Error requesting job"}, - serviceError{ErrorInvalidErrorId, http.StatusBadRequest, "Invalid format for error id, it should be an integer as a string"}, serviceError{ErrorFailedLoadingOpenAPISpec, http.StatusInternalServerError, "Unable to load openapi spec"}, - serviceError{ErrorResourceNotFound, http.StatusNotFound, "Requested resource doesn't exist"}, - serviceError{ErrorMethodNotAllowed, http.StatusMethodNotAllowed, "Requested method isn't supported for resource"}, - serviceError{ErrorNotAcceptable, http.StatusNotAcceptable, "Only 'application/json' content is supported"}, - serviceError{ErrorErrorNotFound, http.StatusNotFound, "Error with given id not found"}, - serviceError{ErrorInvalidJobType, http.StatusBadRequest, "Requested job type cannot be dequeued"}, - serviceError{ErrorTenantNotFound, http.StatusBadRequest, "Tenant not found in JWT claims"}, + serviceError{ErrorInsertingWorker, http.StatusInternalServerError, "Unable to register the worker"}, + serviceError{ErrorUpdatingWorkerStatus, http.StatusInternalServerError, "Unable update worker status"}, serviceError{ErrorUnspecified, http.StatusInternalServerError, "Unspecified internal error "}, serviceError{ErrorNotHTTPError, http.StatusInternalServerError, "Error is not an instance of HTTPError"}, diff --git a/internal/worker/api/openapi.yml b/internal/worker/api/openapi.yml index 7f19aedd8..cccca975a 100644 --- a/internal/worker/api/openapi.yml +++ b/internal/worker/api/openapi.yml @@ -1,3 +1,4 @@ + openapi: 3.0.0 info: title: OSBuild Composer - Worker @@ -203,6 +204,43 @@ paths: schema: $ref: '#/components/schemas/Error' + /workers: + post: + operationId: postWorkers + summary: Create a new worker + description: | + Creates a new worker and returns a uuid which should be used in all subsequent calls. + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/PostWorkersRequest' + responses: + '201': + description: Created a new worker + content: + application/json: + schema: + $ref: '#/components/schemas/PostWorkersResponse' + /workers/{worker_id}/status: + parameters: + - schema: + type: string + format: uuid + name: worker_id + in: path + required: true + post: + operationId: postWorkerStatus + summary: Refresh worker status + description: | + Refreshes the heartbeat of the worker, and posts stats that can be used to determine overall + worker health. Workers that do not respond will not be kept track of after a timeout. If + dropped workers were running a job, this job will be restarted. + responses: + '200': + description: succesfully updated worker's status + components: schemas: ObjectReference: @@ -261,6 +299,8 @@ components: type: string arch: type: string + worker_id: + type: string RequestJobResponse: allOf: - $ref: '#/components/schemas/ObjectReference' @@ -301,3 +341,21 @@ components: x-go-type: json.RawMessage UpdateJobResponse: $ref: '#/components/schemas/ObjectReference' + + PostWorkersRequest: + type: object + required: + - arch + properties: + arch: + type: string + PostWorkersResponse: + allOf: + - $ref: '#/components/schemas/ObjectReference' + - type: object + required: + - worker_id + properties: + worker_id: + type: string + format: uuid diff --git a/internal/worker/server.go b/internal/worker/server.go index f746461fc..254c1421c 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -72,6 +72,8 @@ type Config struct { BasePath string JWTEnabled bool TenantProviderFields []string + WorkerTimeout time.Duration + WorkerWatchFreq time.Duration } func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, config Config) *Server { @@ -81,9 +83,17 @@ func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, config Config) *Serve config: config, } + if s.config.WorkerTimeout == 0 { + s.config.WorkerTimeout = time.Hour + } + if s.config.WorkerWatchFreq == 0 { + s.config.WorkerWatchFreq = time.Second * 300 + } + api.BasePath = config.BasePath go s.WatchHeartbeats() + go s.WatchWorkers() return s } @@ -143,6 +153,26 @@ func (s *Server) WatchHeartbeats() { } } +// This function should be started as a goroutine +// Every 5 minutes it goes through all workers, removing any unresponsive ones. +func (s *Server) WatchWorkers() { + //nolint:staticcheck // avoid SA1015, this is an endless function + for range time.Tick(s.config.WorkerWatchFreq) { + workers, err := s.jobs.Workers(s.config.WorkerTimeout) + if err != nil { + logrus.Warningf("Unable to query workers: %v", err) + continue + } + for _, wID := range workers { + logrus.Infof("Removing inactive worker: %s", wID) + err = s.jobs.DeleteWorker(wID) + if err != nil { + logrus.Warningf("Unable to remove worker: %v", err) + } + } + } +} + func (s *Server) EnqueueOSBuild(arch string, job *OSBuildJob, channel string) (uuid.UUID, error) { return s.enqueue(JobTypeOSBuild+":"+arch, job, nil, channel) } @@ -575,15 +605,15 @@ func (s *Server) DeleteArtifacts(id uuid.UUID) error { return os.RemoveAll(path.Join(s.config.ArtifactsDir, id.String())) } -func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes []string, channels []string) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) { - return s.requestJob(ctx, arch, jobTypes, uuid.Nil, channels) +func (s *Server) RequestJob(ctx context.Context, arch string, jobTypes, channels []string, workerID uuid.UUID) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) { + return s.requestJob(ctx, arch, jobTypes, uuid.Nil, channels, workerID) } func (s *Server) RequestJobById(ctx context.Context, arch string, requestedJobId uuid.UUID) (uuid.UUID, uuid.UUID, string, json.RawMessage, []json.RawMessage, error) { - return s.requestJob(ctx, arch, []string{}, requestedJobId, nil) + return s.requestJob(ctx, arch, []string{}, requestedJobId, nil, uuid.Nil) } -func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, requestedJobId uuid.UUID, channels []string) ( +func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, requestedJobId uuid.UUID, channels []string, workerID uuid.UUID) ( jobId uuid.UUID, token uuid.UUID, jobType string, args json.RawMessage, dynamicArgs []json.RawMessage, err error) { // treat osbuild jobs specially until we have found a generic way to // specify dequeuing restrictions. For now, we only have one @@ -614,9 +644,9 @@ func (s *Server) requestJob(ctx context.Context, arch string, jobTypes []string, var depIDs []uuid.UUID if requestedJobId != uuid.Nil { jobId = requestedJobId - token, depIDs, jobType, args, err = s.jobs.DequeueByID(dequeueCtx, requestedJobId) + token, depIDs, jobType, args, err = s.jobs.DequeueByID(dequeueCtx, requestedJobId, workerID) } else { - jobId, token, depIDs, jobType, args, err = s.jobs.Dequeue(dequeueCtx, jts, channels) + jobId, token, depIDs, jobType, args, err = s.jobs.Dequeue(dequeueCtx, workerID, jts, channels) } if err != nil { if err != jobqueue.ErrDequeueTimeout && err != jobqueue.ErrNotPending { @@ -852,7 +882,15 @@ func (h *apiHandlers) RequestJob(ctx echo.Context) error { channel = "org-" + tenant } - jobId, jobToken, jobType, jobArgs, dynamicJobArgs, err := h.server.RequestJob(ctx.Request().Context(), body.Arch, body.Types, []string{channel}) + workerID := uuid.Nil + if body.WorkerId != nil { + workerID, err = uuid.Parse(*body.WorkerId) + if err != nil { + return api.HTTPErrorWithInternal(api.ErrorMalformedWorkerId, err) + } + } + + jobId, jobToken, jobType, jobArgs, dynamicJobArgs, err := h.server.RequestJob(ctx.Request().Context(), body.Arch, body.Types, []string{channel}, workerID) if err != nil { if err == jobqueue.ErrDequeueTimeout { return ctx.JSON(http.StatusNoContent, api.ObjectReference{ @@ -992,6 +1030,47 @@ func (h *apiHandlers) UploadJobArtifact(ctx echo.Context, tokenstr string, name return ctx.NoContent(http.StatusOK) } +func (h *apiHandlers) PostWorkers(ctx echo.Context) error { + var body api.PostWorkersRequest + err := ctx.Bind(&body) + if err != nil { + return err + } + + workerID, err := h.server.jobs.InsertWorker(body.Arch) + if err != nil { + return api.HTTPErrorWithInternal(api.ErrorInsertingWorker, err) + } + logrus.Infof("Worker (%v) registered", body.Arch) + + return ctx.JSON(http.StatusCreated, api.PostWorkersResponse{ + ObjectReference: api.ObjectReference{ + Href: fmt.Sprintf("%s/workers", api.BasePath), + Id: workerID.String(), + Kind: "WorkerID", + }, + WorkerId: workerID.String(), + }) +} + +func (h *apiHandlers) PostWorkerStatus(ctx echo.Context, workerIdstr string) error { + workerID, err := uuid.Parse(workerIdstr) + if err != nil { + return api.HTTPErrorWithInternal(api.ErrorMalformedWorkerId, err) + } + err = h.server.jobs.UpdateWorkerStatus(workerID) + + if err == jobqueue.ErrWorkerNotExist { + return api.HTTPErrorWithInternal(api.ErrorWorkerIdNotFound, err) + } + + if err != nil { + return api.HTTPErrorWithInternal(api.ErrorUpdatingWorkerStatus, err) + } + + return ctx.NoContent(http.StatusOK) +} + // A simple echo.Binder(), which only accepts application/json, but is more // strict than echo's DefaultBinder. It does not handle binding query // parameters either. diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go index eaa7fbd81..d90faef0a 100644 --- a/internal/worker/server_test.go +++ b/internal/worker/server_test.go @@ -22,24 +22,21 @@ import ( "github.com/osbuild/images/pkg/osbuild" "github.com/osbuild/images/pkg/platform" "github.com/osbuild/images/pkg/rpmmd" + "github.com/osbuild/osbuild-composer/internal/common" "github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue" "github.com/osbuild/osbuild-composer/internal/target" "github.com/osbuild/osbuild-composer/internal/test" "github.com/osbuild/osbuild-composer/internal/worker" + "github.com/osbuild/osbuild-composer/internal/worker/api" "github.com/osbuild/osbuild-composer/internal/worker/clienterrors" ) -func newTestServer(t *testing.T, tempdir string, jobRequestTimeout time.Duration, basePath string, acceptArtifacts bool) *worker.Server { +func newTestServer(t *testing.T, tempdir string, config worker.Config, acceptArtifacts bool) *worker.Server { q, err := fsjobqueue.New(tempdir) if err != nil { t.Fatalf("error creating fsjobqueue: %v", err) } - config := worker.Config{ - RequestJobTimeout: jobRequestTimeout, - BasePath: basePath, - } - if acceptArtifacts { artifactsDir := path.Join(tempdir, "artifacts") err := os.Mkdir(artifactsDir, 0755) @@ -52,9 +49,14 @@ func newTestServer(t *testing.T, tempdir string, jobRequestTimeout time.Duration return worker.NewServer(nil, q, config) } +var defaultConfig = worker.Config{ + RequestJobTimeout: time.Duration(0), + BasePath: "/api/worker/v1", +} + // Ensure that the status request returns OK. func TestStatus(t *testing.T) { - server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/worker/v1", false) + server := newTestServer(t, t.TempDir(), defaultConfig, false) handler := server.Handler() test.TestRoute(t, handler, false, "GET", "/api/worker/v1/status", ``, http.StatusOK, `{"status":"OK", "href": "/api/worker/v1/status", "kind":"Status"}`, "message", "id") } @@ -84,7 +86,7 @@ func TestErrors(t *testing.T) { tempdir := t.TempDir() for _, c := range cases { - server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1", false) + server := newTestServer(t, tempdir, defaultConfig, false) handler := server.Handler() test.TestRoute(t, handler, false, c.Method, c.Path, c.Body, c.ExpectedStatus, `{"kind":"Error"}`, "message", "href", "operation_id", "reason", "id", "code") } @@ -113,9 +115,13 @@ func TestErrorsAlteredBasePath(t *testing.T) { } tempdir := t.TempDir() + config := worker.Config{ + RequestJobTimeout: time.Duration(0), + BasePath: "/api/image-builder-worker/v1", + } for _, c := range cases { - server := newTestServer(t, tempdir, time.Duration(0), "/api/image-builder-worker/v1", false) + server := newTestServer(t, tempdir, config, false) handler := server.Handler() test.TestRoute(t, handler, false, c.Method, c.Path, c.Body, c.ExpectedStatus, `{"kind":"Error"}`, "message", "href", "operation_id", "reason", "id", "code") } @@ -139,7 +145,7 @@ func TestCreate(t *testing.T) { if err != nil { t.Fatalf("error creating osbuild manifest: %v", err) } - server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/worker/v1", false) + server := newTestServer(t, t.TempDir(), defaultConfig, false) handler := server.Handler() _, err = server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "") @@ -165,7 +171,7 @@ func TestCancel(t *testing.T) { if err != nil { t.Fatalf("error creating osbuild manifest: %v", err) } - server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/worker/v1", false) + server := newTestServer(t, t.TempDir(), defaultConfig, false) mf, err := manifest.Serialize(nil, nil, nil) if err != nil { t.Fatalf("error creating osbuild manifest: %v", err) @@ -175,7 +181,7 @@ func TestCancel(t *testing.T) { jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "") require.NoError(t, err) - j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}) + j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil) require.NoError(t, err) require.Equal(t, jobId, j) require.Equal(t, worker.JobTypeOSBuild, typ) @@ -206,7 +212,7 @@ func TestUpdate(t *testing.T) { if err != nil { t.Fatalf("error creating osbuild manifest: %v", err) } - server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/worker/v1", false) + server := newTestServer(t, t.TempDir(), defaultConfig, false) mf, err := manifest.Serialize(nil, nil, nil) if err != nil { t.Fatalf("error creating osbuild manifest: %v", err) @@ -216,7 +222,7 @@ func TestUpdate(t *testing.T) { jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "") require.NoError(t, err) - j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}) + j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil) require.NoError(t, err) require.Equal(t, jobId, j) require.Equal(t, worker.JobTypeOSBuild, typ) @@ -261,11 +267,11 @@ func TestArgs(t *testing.T) { }, } - server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/worker/v1", false) + server := newTestServer(t, t.TempDir(), defaultConfig, false) jobId, err := server.EnqueueOSBuild(arch.Name(), &job, "") require.NoError(t, err) - _, _, _, args, _, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}) + _, _, _, args, _, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil) require.NoError(t, err) require.NotNil(t, args) @@ -289,7 +295,7 @@ func TestUpload(t *testing.T) { if err != nil { t.Fatalf("error creating osbuild manifest: %v", err) } - server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/worker/v1", true) + server := newTestServer(t, t.TempDir(), defaultConfig, true) mf, err := manifest.Serialize(nil, nil, nil) if err != nil { t.Fatalf("error creating osbuild manifest: %v", err) @@ -299,7 +305,7 @@ func TestUpload(t *testing.T) { jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "") require.NoError(t, err) - j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}) + j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil) require.NoError(t, err) require.Equal(t, jobID, j) require.Equal(t, worker.JobTypeOSBuild, typ) @@ -323,7 +329,7 @@ func TestUploadNotAcceptingArtifacts(t *testing.T) { if err != nil { t.Fatalf("error creating osbuild manifest: %v", err) } - server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/worker/v1", false) + server := newTestServer(t, t.TempDir(), defaultConfig, false) handler := server.Handler() mf, _ := manifest.Serialize(nil, nil, nil) if err != nil { @@ -333,7 +339,7 @@ func TestUploadNotAcceptingArtifacts(t *testing.T) { jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "") require.NoError(t, err) - j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}) + j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil) require.NoError(t, err) require.Equal(t, jobID, j) require.Equal(t, worker.JobTypeOSBuild, typ) @@ -362,13 +368,18 @@ func TestUploadAlteredBasePath(t *testing.T) { t.Fatalf("error creating osbuild manifest: %v", err) } - server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/image-builder-worker/v1", true) + config := worker.Config{ + RequestJobTimeout: time.Duration(0), + BasePath: "/api/image-builder-worker/v1", + } + + server := newTestServer(t, t.TempDir(), config, true) handler := server.Handler() jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "") require.NoError(t, err) - j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}) + j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil) require.NoError(t, err) require.Equal(t, jobID, j) require.Equal(t, worker.JobTypeOSBuild, typ) @@ -384,9 +395,13 @@ func TestTimeout(t *testing.T) { if err != nil { t.Fatalf("error getting arch from distro: %v", err) } - server := newTestServer(t, t.TempDir(), time.Millisecond*10, "/api/image-builder-worker/v1", false) + config := worker.Config{ + RequestJobTimeout: time.Millisecond * 10, + BasePath: "/api/image-builder-worker/v1", + } + server := newTestServer(t, t.TempDir(), config, false) - _, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}) + _, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil) require.Equal(t, jobqueue.ErrDequeueTimeout, err) test.TestRoute(t, server.Handler(), false, "POST", "/api/image-builder-worker/v1/jobs", `{"arch":"arch","types":["types"]}`, http.StatusNoContent, @@ -399,7 +414,7 @@ func TestRequestJobById(t *testing.T) { if err != nil { t.Fatalf("error getting arch from distro: %v", err) } - server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/worker/v1", false) + server := newTestServer(t, t.TempDir(), defaultConfig, false) handler := server.Handler() depsolveJobId, err := server.EnqueueDepsolve(&worker.DepsolveJob{}, "") @@ -414,7 +429,7 @@ func TestRequestJobById(t *testing.T) { _, _, _, _, _, err = server.RequestJobById(context.Background(), arch.Name(), jobId) require.Error(t, jobqueue.ErrNotPending, err) - _, token, _, _, _, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeDepsolve}, []string{""}) + _, token, _, _, _, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeDepsolve}, []string{""}, uuid.Nil) require.NoError(t, err) depsolveJR, err := json.Marshal(worker.DepsolveJobResult{}) @@ -440,7 +455,11 @@ func TestMixedOSBuildJob(t *testing.T) { require := require.New(t) emptyManifestV2 := manifest.OSBuildManifest(`{"version":"2","pipelines":{}}`) - server := newTestServer(t, t.TempDir(), time.Millisecond*10, "/", false) + config := worker.Config{ + RequestJobTimeout: time.Millisecond * 10, + BasePath: "/", + } + server := newTestServer(t, t.TempDir(), config, false) fbPipelines := &worker.PipelineNames{Build: distro.BuildPipelinesFallback(), Payload: distro.PayloadPipelinesFallback()} oldJob := worker.OSBuildJob{ @@ -506,7 +525,7 @@ func TestMixedOSBuildJob(t *testing.T) { // don't block forever if the jobs weren't added or can't be retrieved ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) defer cancel() - id, token, _, _, _, err := server.RequestJob(ctx, "x", []string{worker.JobTypeOSBuild}, []string{""}) + id, token, _, _, _, err := server.RequestJob(ctx, "x", []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil) require.NoError(err) return id, token } @@ -598,12 +617,12 @@ func TestDepsolveLegacyErrorConversion(t *testing.T) { if err != nil { t.Fatalf("error getting arch from distro: %v", err) } - server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/worker/v1", false) + server := newTestServer(t, t.TempDir(), defaultConfig, false) depsolveJobId, err := server.EnqueueDepsolve(&worker.DepsolveJob{}, "") require.NoError(t, err) - _, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeDepsolve}, []string{""}) + _, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeDepsolve}, []string{""}, uuid.Nil) require.NoError(t, err) reason := "Depsolve failed" @@ -1579,7 +1598,7 @@ func TestJobDependencyChainErrors(t *testing.T) { for idx, c := range cases { t.Logf("Test case #%d", idx) - server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/worker/v1", false) + server := newTestServer(t, t.TempDir(), defaultConfig, false) ids, err := enqueueAndFinishTestJobDependencies(server, []testJob{c.job}) require.Nil(t, err) require.Len(t, ids, 1) @@ -1590,3 +1609,64 @@ func TestJobDependencyChainErrors(t *testing.T) { assert.EqualValues(t, c.expectedError, errors) } } + +func TestWorkerWatch(t *testing.T) { + config := worker.Config{ + RequestJobTimeout: time.Duration(0), + BasePath: "/api/worker/v1", + WorkerTimeout: time.Millisecond * 200, + WorkerWatchFreq: time.Millisecond * 100, + } + + server := newTestServer(t, t.TempDir(), config, false) + + reply := test.TestRouteWithReply(t, server.Handler(), false, "POST", "/api/worker/v1/workers", fmt.Sprintf(`{"arch":"%s"}`, common.CurrentArch()), 201, `{"href":"/api/worker/v1/workers","kind":"WorkerID","id": "15"}`, "id", "worker_id") + var resp api.PostWorkersResponse + require.NoError(t, json.Unmarshal(reply, &resp)) + workerID, err := uuid.Parse(resp.WorkerId) + require.NoError(t, err) + + test.TestRoute(t, server.Handler(), false, "POST", fmt.Sprintf("/api/worker/v1/workers/%s/status", workerID), "{}", 200, "") + time.Sleep(time.Millisecond * 400) + test.TestRoute(t, server.Handler(), false, "POST", fmt.Sprintf("/api/worker/v1/workers/%s/status", workerID), "", 400, + `{"href":"/api/worker/v1/errors/18","code":"IMAGE-BUILDER-WORKER-18","id":"18","kind":"Error","message":"Given worker id doesn't exist","reason":"Given worker id doesn't exist"}`, + "operation_id") +} + +func TestRequestJobForWorker(t *testing.T) { + server := newTestServer(t, t.TempDir(), defaultConfig, false) + reply := test.TestRouteWithReply(t, server.Handler(), false, "POST", "/api/worker/v1/workers", fmt.Sprintf(`{"arch":"%s"}`, common.CurrentArch()), 201, `{"href":"/api/worker/v1/workers","kind":"WorkerID","id": "15"}`, "id", "worker_id") + var resp api.PostWorkersResponse + require.NoError(t, json.Unmarshal(reply, &resp)) + workerID, err := uuid.Parse(resp.WorkerId) + require.NoError(t, err) + test.TestRoute(t, server.Handler(), false, "POST", fmt.Sprintf("/api/worker/v1/workers/%s/status", workerID), "{}", 200, "") + + distroStruct := test_distro.New() + arch, err := distroStruct.GetArch(test_distro.TestArchName) + if err != nil { + t.Fatalf("error getting arch from distro: %v", err) + } + imageType, err := arch.GetImageType(test_distro.TestImageTypeName) + if err != nil { + t.Fatalf("error getting image type from arch: %v", err) + } + manifest, _, err := imageType.Manifest(nil, distro.ImageOptions{Size: imageType.Size(0)}, nil, 0) + if err != nil { + t.Fatalf("error creating osbuild manifest: %v", err) + } + mf, err := manifest.Serialize(nil, nil, nil) + if err != nil { + t.Fatalf("error creating osbuild manifest: %v", err) + } + jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "") + require.NoError(t, err) + + // Can request a job with worker ID + j, _, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, workerID) + require.NoError(t, err) + require.Equal(t, jobId, j) + require.Equal(t, worker.JobTypeOSBuild, typ) + require.NotNil(t, args) + require.Nil(t, dynamicArgs) +}