scheduler check_ts fragment
This commit is contained in:
parent
1cbd13f738
commit
5ba62a75bb
2 changed files with 54 additions and 1 deletions
|
|
@ -1,3 +1,4 @@
|
|||
import json
|
||||
import logging
|
||||
import time
|
||||
|
||||
|
|
@ -124,12 +125,17 @@ class TaskScheduler(object):
|
|||
self.ready_timeout = 180
|
||||
self.assign_timeout = 300
|
||||
self.host_timeout = 900
|
||||
self.run_interval = 60
|
||||
|
||||
def run(self):
|
||||
if not db_lock('scheduler', wait=False):
|
||||
# already running elsewhere
|
||||
return False
|
||||
|
||||
if not self.check_ts():
|
||||
# already ran too recently
|
||||
return False
|
||||
|
||||
self.get_tasks()
|
||||
self.get_hosts()
|
||||
self.check_hosts()
|
||||
|
|
@ -138,6 +144,47 @@ class TaskScheduler(object):
|
|||
|
||||
return True
|
||||
|
||||
def check_ts(self):
|
||||
"""Check the last run timestamp
|
||||
|
||||
Returns True if the scheduler should run, False otherwise
|
||||
"""
|
||||
|
||||
# get last ts
|
||||
query = QueryProcessor(
|
||||
tables=['scheduler_sys_data'],
|
||||
columns=['data'],
|
||||
clauses=['name = %(name)s'],
|
||||
values={'name': 'last_run_ts'},
|
||||
)
|
||||
last = query.singleValue(strict=False) or 0
|
||||
|
||||
now = time.time()
|
||||
delta = now - last
|
||||
|
||||
if delta < 0:
|
||||
logger.error('Last run in the future by %i seconds', -delta)
|
||||
ret = False
|
||||
# update the ts so that a system time rollback doesn't keep us from running
|
||||
elif delta < self.run_interval:
|
||||
logger.debug('Skipping run due to run_interval setting')
|
||||
# return now without updating ts
|
||||
return False
|
||||
else:
|
||||
ret = True
|
||||
|
||||
# save current ts
|
||||
# XXX need an UPSERT
|
||||
update = UpdateProcessor(
|
||||
'scheduler_sys_data',
|
||||
clauses=['name = %(name)s'],
|
||||
values={'name': 'last_run_ts'},
|
||||
data={'data': json.dumps(now)},
|
||||
)
|
||||
update.execute()
|
||||
|
||||
return ret
|
||||
|
||||
def do_schedule(self):
|
||||
# debug
|
||||
logger.info('Running task scheduler')
|
||||
|
|
@ -294,7 +341,7 @@ class TaskScheduler(object):
|
|||
update = UpdateProcessor(
|
||||
'host',
|
||||
data={'ready': False},
|
||||
clauses=['host_id IN %(host_ids)s'],
|
||||
clauses=['id IN %(host_ids)s'],
|
||||
values={'host_ids': [h['id'] for h in hosts_to_mark]},
|
||||
)
|
||||
update.execute()
|
||||
|
|
|
|||
|
|
@ -1010,6 +1010,12 @@ CREATE TABLE scheduler_host_data (
|
|||
) WITHOUT OIDS;
|
||||
|
||||
|
||||
CREATE TABLE scheduler_sys_data (
|
||||
name TEXT NOT NULL PRIMARY KEY,
|
||||
data JSONB
|
||||
) WITHOUT OIDS;
|
||||
|
||||
|
||||
CREATE TABLE scheduler_map (
|
||||
id SERIAL NOT NULL PRIMARY KEY,
|
||||
task_id INTEGER REFERENCES task (id) NOT NULL,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue