worker/server: add tests for job heartbeats
This commit is contained in:
parent
14bd8d38ca
commit
2eb3c9f44c
2 changed files with 78 additions and 4 deletions
|
|
@ -72,6 +72,8 @@ type Config struct {
|
|||
BasePath string
|
||||
JWTEnabled bool
|
||||
TenantProviderFields []string
|
||||
JobTimeout time.Duration
|
||||
JobWatchFreq time.Duration
|
||||
WorkerTimeout time.Duration
|
||||
WorkerWatchFreq time.Duration
|
||||
}
|
||||
|
|
@ -83,6 +85,12 @@ func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, config Config) *Serve
|
|||
config: config,
|
||||
}
|
||||
|
||||
if s.config.JobTimeout == 0 {
|
||||
s.config.JobTimeout = time.Second * 120
|
||||
}
|
||||
if s.config.JobWatchFreq == 0 {
|
||||
s.config.JobWatchFreq = time.Second * 30
|
||||
}
|
||||
if s.config.WorkerTimeout == 0 {
|
||||
s.config.WorkerTimeout = time.Hour
|
||||
}
|
||||
|
|
@ -125,12 +133,13 @@ func (s *Server) Handler() http.Handler {
|
|||
const maxHeartbeatRetries = 2
|
||||
|
||||
// This function should be started as a goroutine
|
||||
// Every 30 seconds it goes through all running jobs, removing any unresponsive ones.
|
||||
// It fails jobs which fail to check if they cancelled for more than 2 minutes.
|
||||
|
||||
// With default durations it goes through all running jobs every 30 seconds and fails any unresponsive
|
||||
// ones. Unresponsive jobs haven't checked whether or not they're cancelled in the past 2 minutes.
|
||||
func (s *Server) WatchHeartbeats() {
|
||||
//nolint:staticcheck // avoid SA1015, this is an endless function
|
||||
for range time.Tick(time.Second * 30) {
|
||||
for _, token := range s.jobs.Heartbeats(time.Second * 120) {
|
||||
for range time.Tick(s.config.JobWatchFreq) {
|
||||
for _, token := range s.jobs.Heartbeats(s.config.JobTimeout) {
|
||||
id, _ := s.jobs.IdFromToken(token)
|
||||
logrus.Infof("Removing unresponsive job: %s\n", id)
|
||||
|
||||
|
|
|
|||
|
|
@ -1573,3 +1573,68 @@ func TestRequestJobForWorker(t *testing.T) {
|
|||
require.NotNil(t, args)
|
||||
require.Nil(t, dynamicArgs)
|
||||
}
|
||||
|
||||
func TestJobHeartbeats(t *testing.T) {
|
||||
config := defaultConfig
|
||||
config.JobTimeout = time.Millisecond * 1
|
||||
config.JobWatchFreq = time.Millisecond * 100
|
||||
server := newTestServer(t, t.TempDir(), config, false)
|
||||
|
||||
distroStruct := newTestDistro(t)
|
||||
arch, err := distroStruct.GetArch(test_distro.TestArchName)
|
||||
if err != nil {
|
||||
t.Fatalf("error getting arch from distro: %v", err)
|
||||
}
|
||||
imageType, err := arch.GetImageType(test_distro.TestImageTypeName)
|
||||
if err != nil {
|
||||
t.Fatalf("error getting image type from arch: %v", err)
|
||||
}
|
||||
manifest, _, err := imageType.Manifest(nil, distro.ImageOptions{Size: imageType.Size(0)}, nil, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating osbuild manifest: %v", err)
|
||||
}
|
||||
mf, err := manifest.Serialize(nil, nil, nil, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("error creating osbuild manifest: %v", err)
|
||||
}
|
||||
jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, float64(1), promtest.ToFloat64(prometheus.PendingJobs))
|
||||
|
||||
// Can request a job with worker ID
|
||||
j, _, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, jobId, j)
|
||||
require.Equal(t, worker.JobTypeOSBuild, typ)
|
||||
require.NotNil(t, args)
|
||||
require.Nil(t, dynamicArgs)
|
||||
require.Equal(t, float64(0), promtest.ToFloat64(prometheus.PendingJobs))
|
||||
require.Equal(t, float64(1), promtest.ToFloat64(prometheus.RunningJobs))
|
||||
|
||||
var jobInfo *worker.JobInfo
|
||||
var jobRes worker.OSBuildJobResult
|
||||
retries := 0
|
||||
for i := 0; i < 3 && retries < 3; i++ {
|
||||
//wait until job is completely failed
|
||||
jobInfo, err = server.OSBuildJobInfo(j, &jobRes)
|
||||
require.NoError(t, err)
|
||||
if jobInfo.JobStatus.Started.IsZero() {
|
||||
require.Equal(t, float64(1), promtest.ToFloat64(prometheus.PendingJobs))
|
||||
require.Equal(t, float64(0), promtest.ToFloat64(prometheus.RunningJobs))
|
||||
j, _, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, jobId, j)
|
||||
require.Equal(t, worker.JobTypeOSBuild, typ)
|
||||
require.NotNil(t, args)
|
||||
require.Nil(t, dynamicArgs)
|
||||
retries += 1
|
||||
}
|
||||
time.Sleep(time.Millisecond * 200)
|
||||
}
|
||||
_, err = server.OSBuildJobInfo(j, &jobRes)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, jobRes.JobError)
|
||||
require.Equal(t, clienterrors.ErrorJobMissingHeartbeat, jobRes.JobError.ID)
|
||||
require.Equal(t, float64(0), promtest.ToFloat64(prometheus.PendingJobs))
|
||||
require.Equal(t, float64(0), promtest.ToFloat64(prometheus.RunningJobs))
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue