worker: test mixed new and old jobs in jobqueue

Two new tests, one for OSBuild and one for Koji jobs. Both follow the
same flow:
- Enqueue a job that doesn't specify PipelineNames (oldJob)
- Enqueue a job that does specify PipelineNames (newJob)
- Read the job data for the oldJob and check that the default
  PipelineNames were added
- Read the job data for the newJob and check that it's unchanged
- Finish oldJob and add results without specifying PipelineNames
- Finish newJob and add results with PipelineNames
- Read the oldJob result and check that the default PipelineNames were
  added
- Read the newJob result and check that it's unchanged

This is meant to test several scenarios that can occur when upgrading
the service:
1. The existing jobqueue has old jobs in it that were queued before the
   PipelineNames were part of the data structure. The worker should be
   able to read these and add the fallback data.
2. New jobs are added while old jobs still exist in the queue and the
   worker can read both types.
3. The existing jobqueue has old finished jobs in it that were finished
   and had results written before the PipelineNames were part of the
   result data structure. The worker should be able to read these and
   add the fallback data.
4. New jobs are finished and results are written while old jobs still
   exist in the queue and the worker can read both result types.
This commit is contained in:
Achilleas Koutsou 2021-09-15 13:42:31 +02:00 committed by Ondřej Budai
parent 51870676cc
commit 778b2de3c0

View file

@ -19,6 +19,7 @@ import (
"github.com/osbuild/osbuild-composer/internal/distro/test_distro"
"github.com/osbuild/osbuild-composer/internal/jobqueue"
"github.com/osbuild/osbuild-composer/internal/jobqueue/fsjobqueue"
"github.com/osbuild/osbuild-composer/internal/osbuild2"
"github.com/osbuild/osbuild-composer/internal/test"
"github.com/osbuild/osbuild-composer/internal/worker"
)
@ -231,6 +232,10 @@ func TestArgs(t *testing.T) {
job := worker.OSBuildJob{
Manifest: manifest,
ImageName: "test-image",
PipelineNames: &worker.PipelineNames{
Build: []string{"b"},
Payload: []string{"x", "y", "z"},
},
}
jobId, err := server.EnqueueOSBuild(arch.Name(), &job)
require.NoError(t, err)
@ -451,3 +456,305 @@ func TestRequestJobById(t *testing.T) {
test.TestRoute(t, handler, false, "GET", fmt.Sprintf("/api/worker/v1/jobs/%s", token), `{}`, http.StatusOK,
fmt.Sprintf(`{"canceled":false,"href":"/api/worker/v1/jobs/%s","id":"%s","kind":"JobStatus"}`, token, token))
}
// Enqueue OSBuild jobs with and without additional data and read them off the queue to
// check if the fallbacks are added for the old job and the new data are kept
// for the new job.
func TestMixedOSBuildJob(t *testing.T) {
require := require.New(t)
tempdir, err := ioutil.TempDir("", "worker-tests-")
require.NoError(err)
defer os.RemoveAll(tempdir)
emptyManifestV2 := distro.Manifest(`{"version":"2","pipelines":{}}`)
server := newTestServer(t, tempdir, time.Millisecond*10, "/")
fbPipelines := &worker.PipelineNames{Build: distro.BuildPipelinesFallback(), Payload: distro.PayloadPipelinesFallback()}
oldJob := worker.OSBuildJob{
Manifest: emptyManifestV2,
ImageName: "no-pipeline-names",
}
oldJobID, err := server.EnqueueOSBuild("x", &oldJob)
require.NoError(err)
newJob := worker.OSBuildJob{
Manifest: emptyManifestV2,
ImageName: "with-pipeline-names",
PipelineNames: &worker.PipelineNames{
Build: []string{"build"},
Payload: []string{"other", "pipelines"},
},
}
newJobID, err := server.EnqueueOSBuild("x", &newJob)
require.NoError(err)
oldJobRead := new(worker.OSBuildJob)
_, _, _, err = server.Job(oldJobID, oldJobRead)
require.NoError(err)
require.NotNil(oldJobRead.PipelineNames)
// OldJob gets default pipeline names when read
require.Equal(fbPipelines, oldJobRead.PipelineNames)
require.Equal(oldJob.Manifest, oldJobRead.Manifest)
require.Equal(oldJob.ImageName, oldJobRead.ImageName)
// Not entirely equal
require.NotEqual(oldJob, oldJobRead)
// NewJob the same when read back
newJobRead := new(worker.OSBuildJob)
_, _, _, err = server.Job(newJobID, newJobRead)
require.NoError(err)
require.NotNil(newJobRead.PipelineNames)
require.Equal(newJob.PipelineNames, newJobRead.PipelineNames)
// Dequeue the jobs (via RequestJob) to get their tokens and update them to
// test the result retrieval
getJob := func() (uuid.UUID, uuid.UUID) {
// don't block forever if the jobs weren't added or can't be retrieved
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
id, token, _, _, _, err := server.RequestJob(ctx, "x", []string{"osbuild"})
require.NoError(err)
return id, token
}
getJobTokens := func(n uint) map[uuid.UUID]uuid.UUID {
tokens := make(map[uuid.UUID]uuid.UUID, n)
for idx := uint(0); idx < n; idx++ {
id, token := getJob()
tokens[id] = token
}
return tokens
}
jobTokens := getJobTokens(2)
// make sure we got them both as expected
require.Contains(jobTokens, oldJobID)
require.Contains(jobTokens, newJobID)
oldJobResult := &worker.OSBuildJobResult{
Success: true,
OSBuildOutput: &osbuild2.Result{
Type: "result",
Success: true,
Log: map[string]osbuild2.PipelineResult{
"build-old": {
osbuild2.StageResult{
ID: "---",
Type: "org.osbuild.test",
Output: "<test output>",
Success: true,
},
},
},
},
}
oldJobResultRaw, err := json.Marshal(oldJobResult)
require.NoError(err)
oldJobToken := jobTokens[oldJobID]
err = server.FinishJob(oldJobToken, oldJobResultRaw)
require.NoError(err)
oldJobResultRead := new(worker.OSBuildJobResult)
_, _, err = server.JobStatus(oldJobID, oldJobResultRead)
require.NoError(err)
// oldJobResultRead should have PipelineNames now
require.NotEqual(oldJobResult, oldJobResultRead)
require.Equal(fbPipelines, oldJobResultRead.PipelineNames)
require.NotNil(oldJobResultRead.PipelineNames)
require.Equal(oldJobResult.OSBuildOutput, oldJobResultRead.OSBuildOutput)
require.Equal(oldJobResult.Success, oldJobResultRead.Success)
newJobResult := &worker.OSBuildJobResult{
Success: true,
PipelineNames: &worker.PipelineNames{
Build: []string{"build-result"},
Payload: []string{"result-test-payload", "result-test-assembler"},
},
OSBuildOutput: &osbuild2.Result{
Type: "result",
Success: true,
Log: map[string]osbuild2.PipelineResult{
"build-new": {
osbuild2.StageResult{
ID: "---",
Type: "org.osbuild.test",
Output: "<test output new>",
Success: true,
},
},
},
},
}
newJobResultRaw, err := json.Marshal(newJobResult)
require.NoError(err)
newJobToken := jobTokens[newJobID]
err = server.FinishJob(newJobToken, newJobResultRaw)
require.NoError(err)
newJobResultRead := new(worker.OSBuildJobResult)
_, _, err = server.JobStatus(newJobID, newJobResultRead)
require.NoError(err)
require.Equal(newJobResult, newJobResultRead)
}
// Enqueue Koji jobs with and without additional data and read them off the queue to
// check if the fallbacks are added for the old job and the new data are kept
// for the new job.
func TestMixedOSBuildKojiJob(t *testing.T) {
require := require.New(t)
tempdir, err := ioutil.TempDir("", "worker-tests-")
require.NoError(err)
defer os.RemoveAll(tempdir)
emptyManifestV2 := distro.Manifest(`{"version":"2","pipelines":{}}`)
server := newTestServer(t, tempdir, time.Duration(0), "/api/worker/v1")
fbPipelines := &worker.PipelineNames{Build: distro.BuildPipelinesFallback(), Payload: distro.PayloadPipelinesFallback()}
enqueueKojiJob := func(job *worker.OSBuildKojiJob) uuid.UUID {
initJob := new(worker.KojiInitJob)
initJobID, err := server.EnqueueKojiInit(initJob)
require.NoError(err)
jobID, err := server.EnqueueOSBuildKoji("k", job, initJobID)
require.NoError(err)
return jobID
}
oldJob := worker.OSBuildKojiJob{
Manifest: emptyManifestV2,
ImageName: "no-pipeline-names",
}
oldJobID := enqueueKojiJob(&oldJob)
newJob := worker.OSBuildKojiJob{
Manifest: emptyManifestV2,
ImageName: "with-pipeline-names",
PipelineNames: &worker.PipelineNames{
Build: []string{"build"},
Payload: []string{"other", "pipelines"},
},
}
newJobID := enqueueKojiJob(&newJob)
oldJobRead := new(worker.OSBuildKojiJob)
_, _, _, err = server.Job(oldJobID, oldJobRead)
require.NoError(err)
require.NotNil(oldJobRead.PipelineNames)
// OldJob gets default pipeline names when read
require.Equal(fbPipelines, oldJobRead.PipelineNames)
require.Equal(oldJob.Manifest, oldJobRead.Manifest)
require.Equal(oldJob.ImageName, oldJobRead.ImageName)
// Not entirely equal
require.NotEqual(oldJob, oldJobRead)
// NewJob the same when read back
newJobRead := new(worker.OSBuildKojiJob)
_, _, _, err = server.Job(newJobID, newJobRead)
require.NoError(err)
require.NotNil(newJobRead.PipelineNames)
require.Equal(newJob.PipelineNames, newJobRead.PipelineNames)
// Dequeue the jobs (via RequestJob) to get their tokens and update them to
// test the result retrieval
// Finish init jobs
for idx := uint(0); idx < 2; idx++ {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
_, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"koji-init"})
require.NoError(err)
require.NoError(server.FinishJob(token, nil))
}
getJob := func() (uuid.UUID, uuid.UUID) {
// don't block forever if the jobs weren't added or can't be retrieved
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
id, token, _, _, _, err := server.RequestJob(ctx, "k", []string{"osbuild-koji"})
require.NoError(err)
return id, token
}
getJobTokens := func(n uint) map[uuid.UUID]uuid.UUID {
tokens := make(map[uuid.UUID]uuid.UUID, n)
for idx := uint(0); idx < n; idx++ {
id, token := getJob()
tokens[id] = token
}
return tokens
}
jobTokens := getJobTokens(2)
// make sure we got them both as expected
require.Contains(jobTokens, oldJobID)
require.Contains(jobTokens, newJobID)
oldJobResult := &worker.OSBuildKojiJobResult{
HostOS: "rhel-10",
Arch: "k",
OSBuildOutput: &osbuild2.Result{
Type: "result",
Success: true,
Log: map[string]osbuild2.PipelineResult{
"build-old": {
osbuild2.StageResult{
ID: "---",
Type: "org.osbuild.test",
Output: "<test output>",
Success: true,
},
},
},
},
}
oldJobResultRaw, err := json.Marshal(oldJobResult)
require.NoError(err)
oldJobToken := jobTokens[oldJobID]
err = server.FinishJob(oldJobToken, oldJobResultRaw)
require.NoError(err)
oldJobResultRead := new(worker.OSBuildKojiJobResult)
_, _, err = server.JobStatus(oldJobID, oldJobResultRead)
require.NoError(err)
// oldJobResultRead should have PipelineNames now
require.NotEqual(oldJobResult, oldJobResultRead)
require.Equal(fbPipelines, oldJobResultRead.PipelineNames)
require.NotNil(oldJobResultRead.PipelineNames)
require.Equal(oldJobResult.OSBuildOutput, oldJobResultRead.OSBuildOutput)
require.Equal(oldJobResult.HostOS, oldJobResultRead.HostOS)
require.Equal(oldJobResult.Arch, oldJobResultRead.Arch)
newJobResult := &worker.OSBuildKojiJobResult{
HostOS: "rhel-10",
Arch: "k",
PipelineNames: &worker.PipelineNames{
Build: []string{"build-result"},
Payload: []string{"result-test-payload", "result-test-assembler"},
},
OSBuildOutput: &osbuild2.Result{
Type: "result",
Success: true,
Log: map[string]osbuild2.PipelineResult{
"build-new": {
osbuild2.StageResult{
ID: "---",
Type: "org.osbuild.test",
Output: "<test output new>",
Success: true,
},
},
},
},
}
newJobResultRaw, err := json.Marshal(newJobResult)
require.NoError(err)
newJobToken := jobTokens[newJobID]
err = server.FinishJob(newJobToken, newJobResultRaw)
require.NoError(err)
newJobResultRead := new(worker.OSBuildKojiJobResult)
_, _, err = server.JobStatus(newJobID, newJobResultRead)
require.NoError(err)
require.Equal(newJobResult, newJobResultRead)
}