worker: add ability to track workers serverside

Unresponsive workers (>=1 hour of no status update) are cleaned up.

Several things are enabled by keeping track of workers, in future the
worker server could:
- keep track of how many workers are active
- see if a worker for a specific architecture is available
This commit is contained in:
Sanne Raymaekers 2023-10-25 13:52:56 +02:00 committed by Achilleas Koutsou
parent d784075d31
commit 794acd8e34
8 changed files with 368 additions and 84 deletions

View file

@ -22,24 +22,21 @@ import (
"github.com/osbuild/images/pkg/osbuild"
"github.com/osbuild/images/pkg/platform"
"github.com/osbuild/images/pkg/rpmmd"
"github.com/osbuild/osbuild-composer/internal/common"
"github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue"
"github.com/osbuild/osbuild-composer/internal/target"
"github.com/osbuild/osbuild-composer/internal/test"
"github.com/osbuild/osbuild-composer/internal/worker"
"github.com/osbuild/osbuild-composer/internal/worker/api"
"github.com/osbuild/osbuild-composer/internal/worker/clienterrors"
)
func newTestServer(t *testing.T, tempdir string, jobRequestTimeout time.Duration, basePath string, acceptArtifacts bool) *worker.Server {
func newTestServer(t *testing.T, tempdir string, config worker.Config, acceptArtifacts bool) *worker.Server {
q, err := fsjobqueue.New(tempdir)
if err != nil {
t.Fatalf("error creating fsjobqueue: %v", err)
}
config := worker.Config{
RequestJobTimeout: jobRequestTimeout,
BasePath: basePath,
}
if acceptArtifacts {
artifactsDir := path.Join(tempdir, "artifacts")
err := os.Mkdir(artifactsDir, 0755)
@ -52,9 +49,14 @@ func newTestServer(t *testing.T, tempdir string, jobRequestTimeout time.Duration
return worker.NewServer(nil, q, config)
}
var defaultConfig = worker.Config{
RequestJobTimeout: time.Duration(0),
BasePath: "/api/worker/v1",
}
// Ensure that the status request returns OK.
func TestStatus(t *testing.T) {
server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/worker/v1", false)
server := newTestServer(t, t.TempDir(), defaultConfig, false)
handler := server.Handler()
test.TestRoute(t, handler, false, "GET", "/api/worker/v1/status", ``, http.StatusOK, `{"status":"OK", "href": "/api/worker/v1/status", "kind":"Status"}`, "message", "id")
}
@ -84,7 +86,7 @@ func TestErrors(t *testing.T) {
tempdir := t.TempDir()
for _, c := range cases {
server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1", false)
server := newTestServer(t, tempdir, defaultConfig, false)
handler := server.Handler()
test.TestRoute(t, handler, false, c.Method, c.Path, c.Body, c.ExpectedStatus, `{"kind":"Error"}`, "message", "href", "operation_id", "reason", "id", "code")
}
@ -113,9 +115,13 @@ func TestErrorsAlteredBasePath(t *testing.T) {
}
tempdir := t.TempDir()
config := worker.Config{
RequestJobTimeout: time.Duration(0),
BasePath: "/api/image-builder-worker/v1",
}
for _, c := range cases {
server := newTestServer(t, tempdir, time.Duration(0), "/api/image-builder-worker/v1", false)
server := newTestServer(t, tempdir, config, false)
handler := server.Handler()
test.TestRoute(t, handler, false, c.Method, c.Path, c.Body, c.ExpectedStatus, `{"kind":"Error"}`, "message", "href", "operation_id", "reason", "id", "code")
}
@ -139,7 +145,7 @@ func TestCreate(t *testing.T) {
if err != nil {
t.Fatalf("error creating osbuild manifest: %v", err)
}
server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/worker/v1", false)
server := newTestServer(t, t.TempDir(), defaultConfig, false)
handler := server.Handler()
_, err = server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "")
@ -165,7 +171,7 @@ func TestCancel(t *testing.T) {
if err != nil {
t.Fatalf("error creating osbuild manifest: %v", err)
}
server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/worker/v1", false)
server := newTestServer(t, t.TempDir(), defaultConfig, false)
mf, err := manifest.Serialize(nil, nil, nil)
if err != nil {
t.Fatalf("error creating osbuild manifest: %v", err)
@ -175,7 +181,7 @@ func TestCancel(t *testing.T) {
jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "")
require.NoError(t, err)
j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""})
j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil)
require.NoError(t, err)
require.Equal(t, jobId, j)
require.Equal(t, worker.JobTypeOSBuild, typ)
@ -206,7 +212,7 @@ func TestUpdate(t *testing.T) {
if err != nil {
t.Fatalf("error creating osbuild manifest: %v", err)
}
server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/worker/v1", false)
server := newTestServer(t, t.TempDir(), defaultConfig, false)
mf, err := manifest.Serialize(nil, nil, nil)
if err != nil {
t.Fatalf("error creating osbuild manifest: %v", err)
@ -216,7 +222,7 @@ func TestUpdate(t *testing.T) {
jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "")
require.NoError(t, err)
j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""})
j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil)
require.NoError(t, err)
require.Equal(t, jobId, j)
require.Equal(t, worker.JobTypeOSBuild, typ)
@ -261,11 +267,11 @@ func TestArgs(t *testing.T) {
},
}
server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/worker/v1", false)
server := newTestServer(t, t.TempDir(), defaultConfig, false)
jobId, err := server.EnqueueOSBuild(arch.Name(), &job, "")
require.NoError(t, err)
_, _, _, args, _, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""})
_, _, _, args, _, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil)
require.NoError(t, err)
require.NotNil(t, args)
@ -289,7 +295,7 @@ func TestUpload(t *testing.T) {
if err != nil {
t.Fatalf("error creating osbuild manifest: %v", err)
}
server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/worker/v1", true)
server := newTestServer(t, t.TempDir(), defaultConfig, true)
mf, err := manifest.Serialize(nil, nil, nil)
if err != nil {
t.Fatalf("error creating osbuild manifest: %v", err)
@ -299,7 +305,7 @@ func TestUpload(t *testing.T) {
jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "")
require.NoError(t, err)
j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""})
j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil)
require.NoError(t, err)
require.Equal(t, jobID, j)
require.Equal(t, worker.JobTypeOSBuild, typ)
@ -323,7 +329,7 @@ func TestUploadNotAcceptingArtifacts(t *testing.T) {
if err != nil {
t.Fatalf("error creating osbuild manifest: %v", err)
}
server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/worker/v1", false)
server := newTestServer(t, t.TempDir(), defaultConfig, false)
handler := server.Handler()
mf, _ := manifest.Serialize(nil, nil, nil)
if err != nil {
@ -333,7 +339,7 @@ func TestUploadNotAcceptingArtifacts(t *testing.T) {
jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "")
require.NoError(t, err)
j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""})
j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil)
require.NoError(t, err)
require.Equal(t, jobID, j)
require.Equal(t, worker.JobTypeOSBuild, typ)
@ -362,13 +368,18 @@ func TestUploadAlteredBasePath(t *testing.T) {
t.Fatalf("error creating osbuild manifest: %v", err)
}
server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/image-builder-worker/v1", true)
config := worker.Config{
RequestJobTimeout: time.Duration(0),
BasePath: "/api/image-builder-worker/v1",
}
server := newTestServer(t, t.TempDir(), config, true)
handler := server.Handler()
jobID, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "")
require.NoError(t, err)
j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""})
j, token, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil)
require.NoError(t, err)
require.Equal(t, jobID, j)
require.Equal(t, worker.JobTypeOSBuild, typ)
@ -384,9 +395,13 @@ func TestTimeout(t *testing.T) {
if err != nil {
t.Fatalf("error getting arch from distro: %v", err)
}
server := newTestServer(t, t.TempDir(), time.Millisecond*10, "/api/image-builder-worker/v1", false)
config := worker.Config{
RequestJobTimeout: time.Millisecond * 10,
BasePath: "/api/image-builder-worker/v1",
}
server := newTestServer(t, t.TempDir(), config, false)
_, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""})
_, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil)
require.Equal(t, jobqueue.ErrDequeueTimeout, err)
test.TestRoute(t, server.Handler(), false, "POST", "/api/image-builder-worker/v1/jobs", `{"arch":"arch","types":["types"]}`, http.StatusNoContent,
@ -399,7 +414,7 @@ func TestRequestJobById(t *testing.T) {
if err != nil {
t.Fatalf("error getting arch from distro: %v", err)
}
server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/worker/v1", false)
server := newTestServer(t, t.TempDir(), defaultConfig, false)
handler := server.Handler()
depsolveJobId, err := server.EnqueueDepsolve(&worker.DepsolveJob{}, "")
@ -414,7 +429,7 @@ func TestRequestJobById(t *testing.T) {
_, _, _, _, _, err = server.RequestJobById(context.Background(), arch.Name(), jobId)
require.Error(t, jobqueue.ErrNotPending, err)
_, token, _, _, _, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeDepsolve}, []string{""})
_, token, _, _, _, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeDepsolve}, []string{""}, uuid.Nil)
require.NoError(t, err)
depsolveJR, err := json.Marshal(worker.DepsolveJobResult{})
@ -440,7 +455,11 @@ func TestMixedOSBuildJob(t *testing.T) {
require := require.New(t)
emptyManifestV2 := manifest.OSBuildManifest(`{"version":"2","pipelines":{}}`)
server := newTestServer(t, t.TempDir(), time.Millisecond*10, "/", false)
config := worker.Config{
RequestJobTimeout: time.Millisecond * 10,
BasePath: "/",
}
server := newTestServer(t, t.TempDir(), config, false)
fbPipelines := &worker.PipelineNames{Build: distro.BuildPipelinesFallback(), Payload: distro.PayloadPipelinesFallback()}
oldJob := worker.OSBuildJob{
@ -506,7 +525,7 @@ func TestMixedOSBuildJob(t *testing.T) {
// don't block forever if the jobs weren't added or can't be retrieved
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
id, token, _, _, _, err := server.RequestJob(ctx, "x", []string{worker.JobTypeOSBuild}, []string{""})
id, token, _, _, _, err := server.RequestJob(ctx, "x", []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil)
require.NoError(err)
return id, token
}
@ -598,12 +617,12 @@ func TestDepsolveLegacyErrorConversion(t *testing.T) {
if err != nil {
t.Fatalf("error getting arch from distro: %v", err)
}
server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/worker/v1", false)
server := newTestServer(t, t.TempDir(), defaultConfig, false)
depsolveJobId, err := server.EnqueueDepsolve(&worker.DepsolveJob{}, "")
require.NoError(t, err)
_, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeDepsolve}, []string{""})
_, _, _, _, _, err = server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeDepsolve}, []string{""}, uuid.Nil)
require.NoError(t, err)
reason := "Depsolve failed"
@ -1579,7 +1598,7 @@ func TestJobDependencyChainErrors(t *testing.T) {
for idx, c := range cases {
t.Logf("Test case #%d", idx)
server := newTestServer(t, t.TempDir(), time.Duration(0), "/api/worker/v1", false)
server := newTestServer(t, t.TempDir(), defaultConfig, false)
ids, err := enqueueAndFinishTestJobDependencies(server, []testJob{c.job})
require.Nil(t, err)
require.Len(t, ids, 1)
@ -1590,3 +1609,64 @@ func TestJobDependencyChainErrors(t *testing.T) {
assert.EqualValues(t, c.expectedError, errors)
}
}
func TestWorkerWatch(t *testing.T) {
config := worker.Config{
RequestJobTimeout: time.Duration(0),
BasePath: "/api/worker/v1",
WorkerTimeout: time.Millisecond * 200,
WorkerWatchFreq: time.Millisecond * 100,
}
server := newTestServer(t, t.TempDir(), config, false)
reply := test.TestRouteWithReply(t, server.Handler(), false, "POST", "/api/worker/v1/workers", fmt.Sprintf(`{"arch":"%s"}`, common.CurrentArch()), 201, `{"href":"/api/worker/v1/workers","kind":"WorkerID","id": "15"}`, "id", "worker_id")
var resp api.PostWorkersResponse
require.NoError(t, json.Unmarshal(reply, &resp))
workerID, err := uuid.Parse(resp.WorkerId)
require.NoError(t, err)
test.TestRoute(t, server.Handler(), false, "POST", fmt.Sprintf("/api/worker/v1/workers/%s/status", workerID), "{}", 200, "")
time.Sleep(time.Millisecond * 400)
test.TestRoute(t, server.Handler(), false, "POST", fmt.Sprintf("/api/worker/v1/workers/%s/status", workerID), "", 400,
`{"href":"/api/worker/v1/errors/18","code":"IMAGE-BUILDER-WORKER-18","id":"18","kind":"Error","message":"Given worker id doesn't exist","reason":"Given worker id doesn't exist"}`,
"operation_id")
}
func TestRequestJobForWorker(t *testing.T) {
server := newTestServer(t, t.TempDir(), defaultConfig, false)
reply := test.TestRouteWithReply(t, server.Handler(), false, "POST", "/api/worker/v1/workers", fmt.Sprintf(`{"arch":"%s"}`, common.CurrentArch()), 201, `{"href":"/api/worker/v1/workers","kind":"WorkerID","id": "15"}`, "id", "worker_id")
var resp api.PostWorkersResponse
require.NoError(t, json.Unmarshal(reply, &resp))
workerID, err := uuid.Parse(resp.WorkerId)
require.NoError(t, err)
test.TestRoute(t, server.Handler(), false, "POST", fmt.Sprintf("/api/worker/v1/workers/%s/status", workerID), "{}", 200, "")
distroStruct := test_distro.New()
arch, err := distroStruct.GetArch(test_distro.TestArchName)
if err != nil {
t.Fatalf("error getting arch from distro: %v", err)
}
imageType, err := arch.GetImageType(test_distro.TestImageTypeName)
if err != nil {
t.Fatalf("error getting image type from arch: %v", err)
}
manifest, _, err := imageType.Manifest(nil, distro.ImageOptions{Size: imageType.Size(0)}, nil, 0)
if err != nil {
t.Fatalf("error creating osbuild manifest: %v", err)
}
mf, err := manifest.Serialize(nil, nil, nil)
if err != nil {
t.Fatalf("error creating osbuild manifest: %v", err)
}
jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "")
require.NoError(t, err)
// Can request a job with worker ID
j, _, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, workerID)
require.NoError(t, err)
require.Equal(t, jobId, j)
require.Equal(t, worker.JobTypeOSBuild, typ)
require.NotNil(t, args)
require.Nil(t, dynamicArgs)
}