cloudapi: prevent dangling manifest goroutines

When composer exits, it doesn't wait for the manifest generation goroutines
to finish. This is generally a bad practice so let's introduce a bit of
syncing and a new Shutdown method to prevent this.

This also prevents the manifest generation goroutine from creating weird
states when interrupted on a random line of code.

Signed-off-by: Ondřej Budai <ondrej@budai.cz>
This commit is contained in:
Ondřej Budai 2022-04-05 14:25:47 +02:00 committed by Tomáš Hozza
parent 02091ba777
commit ba236180fc
4 changed files with 34 additions and 3 deletions

View file

@ -330,7 +330,9 @@ func (c *Composer) Start() error {
logrus.Info("Shutting down.")
if c.apiListener != nil {
// First, close all listeners and then wait for all goroutines to finish.
err := composerAPI.Shutdown(context.Background())
c.api.Shutdown()
if err != nil {
panic(err)
}

View file

@ -23,3 +23,7 @@ func NewServer(workers *worker.Server, distros *distroregistry.Registry, config
func (server *Server) V2(path string) http.Handler {
return server.v2.Handler(path)
}
func (server *Server) Shutdown() {
server.v2.Shutdown()
}

View file

@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
"github.com/google/uuid"
@ -29,6 +30,10 @@ type Server struct {
workers *worker.Server
distros *distroregistry.Registry
config ServerConfig
goroutinesCtx context.Context
goroutinesCtxCancel context.CancelFunc
goroutinesGroup sync.WaitGroup
}
type ServerConfig struct {
@ -38,10 +43,14 @@ type ServerConfig struct {
}
func NewServer(workers *worker.Server, distros *distroregistry.Registry, config ServerConfig) *Server {
ctx, cancel := context.WithCancel(context.Background())
server := &Server{
workers: workers,
distros: distros,
config: config,
goroutinesCtx: ctx,
goroutinesCtxCancel: cancel,
}
return server
}
@ -62,6 +71,11 @@ func (s *Server) Handler(path string) http.Handler {
return e
}
func (s *Server) Shutdown() {
s.goroutinesCtxCancel()
s.goroutinesGroup.Wait()
}
func (s *Server) enqueueCompose(distribution distro.Distro, bp blueprint.Blueprint, manifestSeed int64, irs []imageRequest, channel string) (uuid.UUID, error) {
var id uuid.UUID
if len(irs) != 1 {
@ -99,7 +113,11 @@ func (s *Server) enqueueCompose(distribution distro.Distro, bp blueprint.Bluepri
return id, HTTPErrorWithInternal(ErrorEnqueueingJob, err)
}
go generateManifest(context.Background(), s.workers, depsolveJobID, manifestJobID, ir.imageType, ir.repositories, ir.imageOptions, manifestSeed, bp.Customizations)
s.goroutinesGroup.Add(1)
go func() {
generateManifest(s.goroutinesCtx, s.workers, depsolveJobID, manifestJobID, ir.imageType, ir.repositories, ir.imageOptions, manifestSeed, bp.Customizations)
defer s.goroutinesGroup.Done()
}()
return id, nil
}
@ -161,7 +179,13 @@ func (s *Server) enqueueKojiCompose(taskID uint64, server, name, version, releas
}
kojiFilenames = append(kojiFilenames, kojiFilename)
buildIDs = append(buildIDs, buildID)
go generateManifest(context.Background(), s.workers, depsolveJobID, manifestJobID, ir.imageType, ir.repositories, ir.imageOptions, manifestSeed, bp.Customizations)
// copy the image request while passing it into the goroutine to prevent data races
s.goroutinesGroup.Add(1)
go func(ir imageRequest) {
generateManifest(s.goroutinesCtx, s.workers, depsolveJobID, manifestJobID, ir.imageType, ir.repositories, ir.imageOptions, manifestSeed, bp.Customizations)
defer s.goroutinesGroup.Done()
}(ir)
}
id, err = s.workers.EnqueueKojiFinalize(&worker.KojiFinalizeJob{
Server: server,
@ -196,7 +220,7 @@ func generateManifest(ctx context.Context, workers *worker.Server, depsolveJobID
time.Sleep(time.Millisecond * 50)
select {
case <-ctx.Done():
logWithId.Warning("Manifest job dependencies took longer than 5 minutes to finish, returning to avoid dangling routines")
logWithId.Warning("Manifest job dependencies took longer than 5 minutes to finish, or the server is shutting down, returning to avoid dangling routines")
break
default:
continue

View file

@ -39,6 +39,7 @@ func newV2Server(t *testing.T, dir string, depsolveChannels []string, enableJWT
}
v2Server := v2.NewServer(workerServer, distros, config)
require.NotNil(t, v2Server)
t.Cleanup(v2Server.Shutdown)
// start a routine which just completes depsolve jobs
depsolveContext, cancel := context.WithCancel(context.Background())