diff --git a/internal/worker/server.go b/internal/worker/server.go index 00cf9dffc..137ea1d7a 100644 --- a/internal/worker/server.go +++ b/internal/worker/server.go @@ -72,6 +72,8 @@ type Config struct { BasePath string JWTEnabled bool TenantProviderFields []string + JobTimeout time.Duration + JobWatchFreq time.Duration WorkerTimeout time.Duration WorkerWatchFreq time.Duration } @@ -83,6 +85,12 @@ func NewServer(logger *log.Logger, jobs jobqueue.JobQueue, config Config) *Serve config: config, } + if s.config.JobTimeout == 0 { + s.config.JobTimeout = time.Second * 120 + } + if s.config.JobWatchFreq == 0 { + s.config.JobWatchFreq = time.Second * 30 + } if s.config.WorkerTimeout == 0 { s.config.WorkerTimeout = time.Hour } @@ -125,12 +133,13 @@ func (s *Server) Handler() http.Handler { const maxHeartbeatRetries = 2 // This function should be started as a goroutine -// Every 30 seconds it goes through all running jobs, removing any unresponsive ones. -// It fails jobs which fail to check if they cancelled for more than 2 minutes. + +// With default durations it goes through all running jobs every 30 seconds and fails any unresponsive +// ones. Unresponsive jobs haven't checked whether or not they're cancelled in the past 2 minutes. func (s *Server) WatchHeartbeats() { //nolint:staticcheck // avoid SA1015, this is an endless function - for range time.Tick(time.Second * 30) { - for _, token := range s.jobs.Heartbeats(time.Second * 120) { + for range time.Tick(s.config.JobWatchFreq) { + for _, token := range s.jobs.Heartbeats(s.config.JobTimeout) { id, _ := s.jobs.IdFromToken(token) logrus.Infof("Removing unresponsive job: %s\n", id) diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go index 34f65d24d..f7e77983b 100644 --- a/internal/worker/server_test.go +++ b/internal/worker/server_test.go @@ -1573,3 +1573,68 @@ func TestRequestJobForWorker(t *testing.T) { require.NotNil(t, args) require.Nil(t, dynamicArgs) } + +func TestJobHeartbeats(t *testing.T) { + config := defaultConfig + config.JobTimeout = time.Millisecond * 1 + config.JobWatchFreq = time.Millisecond * 100 + server := newTestServer(t, t.TempDir(), config, false) + + distroStruct := newTestDistro(t) + arch, err := distroStruct.GetArch(test_distro.TestArchName) + if err != nil { + t.Fatalf("error getting arch from distro: %v", err) + } + imageType, err := arch.GetImageType(test_distro.TestImageTypeName) + if err != nil { + t.Fatalf("error getting image type from arch: %v", err) + } + manifest, _, err := imageType.Manifest(nil, distro.ImageOptions{Size: imageType.Size(0)}, nil, 0) + if err != nil { + t.Fatalf("error creating osbuild manifest: %v", err) + } + mf, err := manifest.Serialize(nil, nil, nil, nil) + if err != nil { + t.Fatalf("error creating osbuild manifest: %v", err) + } + jobId, err := server.EnqueueOSBuild(arch.Name(), &worker.OSBuildJob{Manifest: mf}, "") + require.NoError(t, err) + require.Equal(t, float64(1), promtest.ToFloat64(prometheus.PendingJobs)) + + // Can request a job with worker ID + j, _, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil) + require.NoError(t, err) + require.Equal(t, jobId, j) + require.Equal(t, worker.JobTypeOSBuild, typ) + require.NotNil(t, args) + require.Nil(t, dynamicArgs) + require.Equal(t, float64(0), promtest.ToFloat64(prometheus.PendingJobs)) + require.Equal(t, float64(1), promtest.ToFloat64(prometheus.RunningJobs)) + + var jobInfo *worker.JobInfo + var jobRes worker.OSBuildJobResult + retries := 0 + for i := 0; i < 3 && retries < 3; i++ { + //wait until job is completely failed + jobInfo, err = server.OSBuildJobInfo(j, &jobRes) + require.NoError(t, err) + if jobInfo.JobStatus.Started.IsZero() { + require.Equal(t, float64(1), promtest.ToFloat64(prometheus.PendingJobs)) + require.Equal(t, float64(0), promtest.ToFloat64(prometheus.RunningJobs)) + j, _, typ, args, dynamicArgs, err := server.RequestJob(context.Background(), arch.Name(), []string{worker.JobTypeOSBuild}, []string{""}, uuid.Nil) + require.NoError(t, err) + require.Equal(t, jobId, j) + require.Equal(t, worker.JobTypeOSBuild, typ) + require.NotNil(t, args) + require.Nil(t, dynamicArgs) + retries += 1 + } + time.Sleep(time.Millisecond * 200) + } + _, err = server.OSBuildJobInfo(j, &jobRes) + require.NoError(t, err) + require.NotNil(t, jobRes.JobError) + require.Equal(t, clienterrors.ErrorJobMissingHeartbeat, jobRes.JobError.ID) + require.Equal(t, float64(0), promtest.ToFloat64(prometheus.PendingJobs)) + require.Equal(t, float64(0), promtest.ToFloat64(prometheus.RunningJobs)) +}