jobqueue: add channel to workers

Stores the channel alongside the worker.
This commit is contained in:
Sanne Raymaekers 2024-04-19 11:52:21 +02:00
parent ede798ae6c
commit 1b4935c325
9 changed files with 46 additions and 26 deletions

View file

@ -65,7 +65,7 @@ func executeTests(m *testing.M) int {
fixture := rpmmd_mock.BaseFixture(path.Join(tmpdir, "/jobs"), test_distro.TestDistro1Name, test_distro.TestArchName)
defer fixture.StoreFixture.Cleanup()
_, err = fixture.Workers.RegisterWorker(fixture.StoreFixture.HostArchName)
_, err = fixture.Workers.RegisterWorker("", fixture.StoreFixture.HostArchName)
if err != nil {
panic(err)
}

View file

@ -62,6 +62,7 @@ type fsJobQueue struct {
}
type worker struct {
Channel string `json:"channel"`
Arch string `json:"arch"`
Heartbeat time.Time `json:"heartbeat"`
Tokens map[uuid.UUID]struct{}
@ -467,12 +468,13 @@ func (q *fsJobQueue) RefreshHeartbeat(token uuid.UUID) {
}
}
func (q *fsJobQueue) InsertWorker(arch string) (uuid.UUID, error) {
func (q *fsJobQueue) InsertWorker(channel, arch string) (uuid.UUID, error) {
q.mu.Lock()
defer q.mu.Unlock()
wID := uuid.New()
q.workers[wID] = worker{
Channel: channel,
Arch: arch,
Heartbeat: time.Now(),
Tokens: make(map[uuid.UUID]struct{}),
@ -502,8 +504,9 @@ func (q *fsJobQueue) Workers(olderThan time.Duration) ([]jobqueue.Worker, error)
for wID, w := range q.workers {
if now.Sub(w.Heartbeat) > olderThan {
workers = append(workers, jobqueue.Worker{
ID: wID,
Arch: w.Arch,
ID: wID,
Channel: w.Channel,
Arch: w.Arch,
})
}
}

View file

@ -704,22 +704,23 @@ func test100dequeuers(t *testing.T, q jobqueue.JobQueue) {
// Registers workers and runs jobs against them
func testWorkers(t *testing.T, q jobqueue.JobQueue) {
one := pushTestJob(t, q, "octopus", nil, nil, "")
one := pushTestJob(t, q, "octopus", nil, nil, "chan")
w1, err := q.InsertWorker("x86_64")
w1, err := q.InsertWorker("chan", "x86_64")
require.NoError(t, err)
w2, err := q.InsertWorker("aarch64")
w2, err := q.InsertWorker("chan", "aarch64")
require.NoError(t, err)
workers, err := q.Workers(0)
require.NoError(t, err)
require.Len(t, workers, 2)
require.Equal(t, "chan", workers[0].Channel)
workers, err = q.Workers(time.Hour * 24)
require.NoError(t, err)
require.Len(t, workers, 0)
_, _, _, _, _, err = q.Dequeue(context.Background(), w1, []string{"octopus"}, []string{""})
_, _, _, _, _, err = q.Dequeue(context.Background(), w1, []string{"octopus"}, []string{"chan"})
require.NoError(t, err)
err = q.DeleteWorker(w1)

View file

@ -1338,7 +1338,7 @@ func TestCompose(t *testing.T) {
api, sf := createTestWeldrAPI(t.TempDir(), test_distro.TestDistro1Name, test_distro.TestArchName, rpmmd_mock.NoComposesFixture, nil)
t.Cleanup(sf.Cleanup)
_, err = api.workers.RegisterWorker(arch.Name())
_, err = api.workers.RegisterWorker("", arch.Name())
require.NoError(t, err)
test.TestRoute(t, api, c.External, c.Method, c.Path, c.Body, c.ExpectedStatus, c.ExpectedJSON, c.IgnoreFields...)
@ -2329,7 +2329,7 @@ func TestComposePOST_ImageTypeDenylist(t *testing.T) {
t.Run(fmt.Sprintf("case %d", idx), func(t *testing.T) {
api, sf := createTestWeldrAPI(t.TempDir(), distro2.Name(), arch.Name(), rpmmd_mock.NoComposesFixture, c.imageTypeDenylist)
t.Cleanup(sf.Cleanup)
_, err = api.workers.RegisterWorker(arch.Name())
_, err = api.workers.RegisterWorker("", arch.Name())
require.NoError(t, err)
test.TestRoute(t, api, true, "POST", c.Path, c.Body, c.ExpectedStatus, c.ExpectedJSON, c.IgnoreFields...)

View file

@ -43,7 +43,7 @@ func TestComposeStatusFromLegacyError(t *testing.T) {
t.Fatalf("error serializing osbuild manifest: %v", err)
}
_, err = api.workers.RegisterWorker(arch.Name())
_, err = api.workers.RegisterWorker("", arch.Name())
require.NoError(t, err)
jobId, err := api.workers.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "")
require.NoError(t, err)
@ -99,7 +99,7 @@ func TestComposeStatusFromJobError(t *testing.T) {
t.Fatalf("error serializing osbuild manifest: %v", err)
}
_, err = api.workers.RegisterWorker(arch.Name())
_, err = api.workers.RegisterWorker("", arch.Name())
require.NoError(t, err)
jobId, err := api.workers.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "")
require.NoError(t, err)

View file

@ -823,8 +823,8 @@ func (s *Server) RequeueOrFinishJob(token uuid.UUID, maxRetries uint64, result j
return nil
}
func (s *Server) RegisterWorker(a string) (uuid.UUID, error) {
workerID, err := s.jobs.InsertWorker(a)
func (s *Server) RegisterWorker(c, a string) (uuid.UUID, error) {
workerID, err := s.jobs.InsertWorker(c, a)
if err != nil {
return uuid.Nil, err
}
@ -1059,7 +1059,18 @@ func (h *apiHandlers) PostWorkers(ctx echo.Context) error {
return err
}
workerID, err := h.server.RegisterWorker(body.Arch)
var channel string
if h.server.config.JWTEnabled {
tenant, err := auth.GetFromClaims(ctx.Request().Context(), h.server.config.TenantProviderFields)
if err != nil {
return api.HTTPErrorWithInternal(api.ErrorTenantNotFound, err)
}
// prefix the tenant to prevent collisions if support for specifying channels in a request is ever added
channel = "org-" + tenant
}
workerID, err := h.server.RegisterWorker(channel, body.Arch)
if err != nil {
return api.HTTPErrorWithInternal(api.ErrorInsertingWorker, err)
}