- fix task cleanup to actually check if child processes are running (you can't use waitpid() on processes that aren't your direct children)
- send SIGINT first to give mock a chance to cleanup
This commit is contained in:
parent
691a472d1c
commit
c48620edc8
1 changed files with 115 additions and 85 deletions
200
builder/kojid
200
builder/kojid
|
|
@ -850,7 +850,6 @@ class TaskManager(object):
|
|||
#about). This will happen after a daemon restart, for example.
|
||||
self.logger.info("freeing stale tasks: %r" % stale)
|
||||
session.host.freeTasks(stale)
|
||||
w_opts = os.WNOHANG
|
||||
for id, pid in self.pids.items():
|
||||
if self._waitTask(id, pid):
|
||||
# the subprocess handles most everything, we just need to clear things out
|
||||
|
|
@ -980,111 +979,142 @@ class TaskManager(object):
|
|||
return True
|
||||
return False
|
||||
|
||||
def _killGroup(self, task_id, sig=signal.SIGTERM, timeout=5):
|
||||
"""Kill process group with signal, keep trying within timeout
|
||||
|
||||
Returns True if successful, False if not
|
||||
def _doKill(self, task_id, pid, cmd, sig, timeout, pause):
|
||||
"""
|
||||
pid = self.pids.get(task_id)
|
||||
if not pid:
|
||||
raise koji.GenericError, "No pid for task %i" % task_id
|
||||
pgrps = self._childPGIDs(pid)
|
||||
success = True
|
||||
for pgrp in pgrps[::-1]:
|
||||
# iterate in reverse order so processes whose children are killed might have
|
||||
# a chance to cleanup before they're killed
|
||||
success &= self._doKillGroup(task_id, pgrp, sig, timeout)
|
||||
return success
|
||||
|
||||
def _doKillGroup(self, task_id, pgrp, sig=signal.SIGTERM, timeout=5):
|
||||
"""Kill the process group with the given process group ID. Return True if the
|
||||
group is successfully killed in the given timeout, False otherwise."""
|
||||
incr = 1.0
|
||||
Kill the process with the given process ID.
|
||||
Return True if the process is successfully killed in
|
||||
the given timeout, False otherwise.
|
||||
"""
|
||||
signaled = False
|
||||
t = 0.0
|
||||
while t < timeout:
|
||||
try:
|
||||
pid, rv = os.waitpid(-pgrp, os.WNOHANG)
|
||||
while pid != 0:
|
||||
self.logger.info(_parseStatus(rv, 'process %i' % pid))
|
||||
pid, rv = os.waitpid(-pgrp, os.WNOHANG)
|
||||
except OSError, e:
|
||||
# means there are no processes in that process group
|
||||
self.logger.debug("Task %i (pgrp %i): %s" % (task_id, pgrp, e))
|
||||
if t == 0.0:
|
||||
self.logger.info("Task %i (pgrp %i) exited" % (task_id, pgrp))
|
||||
while True:
|
||||
status = self._getStat(pid)
|
||||
if status and status[1] == cmd and status[2] != 'Z':
|
||||
self.logger.info('"%s" (pid %i, taskID %i) is running' % (cmd, pid, task_id))
|
||||
else:
|
||||
if signaled:
|
||||
self.logger.info('"%s" (pid %i, taskID %i) was killed' % (cmd, pid, task_id))
|
||||
else:
|
||||
self.logger.info("Killed task %i (pgrp %i)" % (task_id, pgrp))
|
||||
self.logger.info('"%s" (pid %i, taskID %i) exited' % (cmd, pid, task_id))
|
||||
return True
|
||||
else:
|
||||
self.logger.info("Task %i (pgrp %i) exists" % (task_id, pgrp))
|
||||
|
||||
try:
|
||||
os.killpg(pgrp, sig)
|
||||
except OSError, e:
|
||||
# shouldn't happen
|
||||
self.logger.warn("Task %i (pgrp %i): %s" % (task_id, pgrp, e))
|
||||
continue
|
||||
else:
|
||||
self.logger.info("Sent signal %i to task %i (pgrp %i)" % (sig, task_id, pgrp))
|
||||
|
||||
time.sleep(incr)
|
||||
t += incr
|
||||
self.logger.warn("Failed to kill task %i (pgrp %i)" % (task_id, pgrp))
|
||||
return False
|
||||
|
||||
def _childPGIDs(self, pid):
|
||||
if t >= timeout:
|
||||
self.logger.warn('Failed to kill "%s" (pid %i, taskID %i) with signal %i' %
|
||||
(cmd, pid, task_id, sig))
|
||||
return False
|
||||
|
||||
try:
|
||||
os.kill(pid, sig)
|
||||
except OSError, e:
|
||||
# process probably went away, we'll find out on the next iteration
|
||||
self.logger.info('Error sending signal %i to "%s" (pid %i, taskID %i): %s' %
|
||||
(sig, cmd, pid, task_id, e))
|
||||
else:
|
||||
signaled = True
|
||||
self.logger.info('Sent signal %i to "%s" (pid %i, taskID %i)' %
|
||||
(sig, cmd, pid, task_id))
|
||||
|
||||
time.sleep(pause)
|
||||
t += pause
|
||||
|
||||
def _getStat(self, pid):
|
||||
"""
|
||||
Get the stat info for the given pid.
|
||||
Return a list of all the fields in /proc/<pid>/stat.
|
||||
The second entry will contain the full command-line instead of
|
||||
just the command name.
|
||||
If the process does not exist, return None.
|
||||
"""
|
||||
try:
|
||||
proc_path = '/proc/%i/stat' % pid
|
||||
if not os.path.isfile(proc_path):
|
||||
return None
|
||||
proc_file = file(proc_path)
|
||||
procstats = [not field.isdigit() and field or int(field) for field in proc_file.read().split()]
|
||||
proc_file.close()
|
||||
|
||||
cmd_path = '/proc/%i/cmdline' % pid
|
||||
if not os.path.isfile(cmd_path):
|
||||
return None
|
||||
cmd_file = file(cmd_path)
|
||||
procstats[1] = cmd_file.read().replace('\0', ' ').strip()
|
||||
cmd_file.close()
|
||||
if not procstats[1]:
|
||||
return None
|
||||
|
||||
return procstats
|
||||
except IOError, e:
|
||||
# process may have already gone away
|
||||
return None
|
||||
|
||||
def _childPIDs(self, pid):
|
||||
"""Recursively get the children of the process with the given ID.
|
||||
Return a list containing the process group IDs of the children
|
||||
in depth-first order, without duplicates."""
|
||||
Return a list containing the process IDs of the children
|
||||
in breadth-first order, without duplicates."""
|
||||
statsByPPID = {}
|
||||
pgids = []
|
||||
pidcmd = None
|
||||
for procdir in os.listdir('/proc'):
|
||||
if not procdir.isdigit():
|
||||
continue
|
||||
try:
|
||||
procfile = file('/proc/%s/stat' % procdir)
|
||||
procstats = [not field.isdigit() and field or int(field) for field in procfile.read().split()]
|
||||
procfile.close()
|
||||
if not statsByPPID.has_key(procstats[3]):
|
||||
statsByPPID[procstats[3]] = []
|
||||
statsByPPID[procstats[3]].append(procstats)
|
||||
if procstats[0] == pid:
|
||||
# put the pgid of the top-level process into the list
|
||||
pgids.append(procstats[4])
|
||||
except:
|
||||
# We expect IOErrors, because files in /proc may disappear between the listdir() and read().
|
||||
# Nothing we can do about it, just move on.
|
||||
procid = int(procdir)
|
||||
procstats = self._getStat(procid)
|
||||
if not procstats:
|
||||
continue
|
||||
statsByPPID.setdefault(procstats[3], []).append(procstats)
|
||||
if procid == pid:
|
||||
pidcmd = procstats[1]
|
||||
|
||||
if not pgids:
|
||||
# assume the pid and pgid of the forked task are the same
|
||||
pgids.append(pid)
|
||||
pids = [pid]
|
||||
while pids:
|
||||
for ppid in pids[:]:
|
||||
pids = []
|
||||
if pidcmd:
|
||||
# only append the pid if it still exists
|
||||
pids.append((pid, pidcmd))
|
||||
|
||||
parents = [pid]
|
||||
while parents:
|
||||
for ppid in parents[:]:
|
||||
for procstats in statsByPPID.get(ppid, []):
|
||||
# get the /proc entries with ppid as their parent, and append their pgid to the list,
|
||||
# get the /proc entries with ppid as their parent, and append their pid to the list,
|
||||
# then recheck for their children
|
||||
# pid is the 0th field, ppid is the 3rd field, pgid is the 4th field
|
||||
if procstats[4] not in pgids:
|
||||
pgids.append(procstats[4])
|
||||
pids.append(procstats[0])
|
||||
pids.remove(ppid)
|
||||
# pid is the 0th field, ppid is the 3rd field
|
||||
pids.append((procstats[0], procstats[1]))
|
||||
parents.append(procstats[0])
|
||||
parents.remove(ppid)
|
||||
|
||||
return pgids
|
||||
return pids
|
||||
|
||||
def _killChildren(self, task_id, children, sig=signal.SIGTERM, timeout=2.0, pause=1.0):
|
||||
"""
|
||||
Kill child processes of the given task, as specified in the children list,
|
||||
by sending sig.
|
||||
Retry every pause seconds, within timeout.
|
||||
Remove successfully killed processes from the "children" list.
|
||||
"""
|
||||
for childpid, cmd in children[::-1]:
|
||||
# iterate in reverse order so processes whose children are killed might have
|
||||
# a chance to cleanup before they're killed
|
||||
if self._doKill(task_id, childpid, cmd, sig, timeout, pause):
|
||||
children.remove((childpid, cmd))
|
||||
|
||||
def cleanupTask(self, task_id):
|
||||
"""Clean up after task
|
||||
|
||||
- kill children
|
||||
- expire session
|
||||
|
||||
Return True if all children were successfully killed, False otherwise.
|
||||
"""
|
||||
# clean up stray children of tasks
|
||||
ch_killed = self._killGroup(task_id, signal.SIGINT)
|
||||
if not ch_killed:
|
||||
ch_killed = self._killGroup(task_id)
|
||||
if not ch_killed:
|
||||
ch_killed = self._killGroup(task_id, signal.SIGKILL, timeout=2)
|
||||
pid = self.pids.get(task_id)
|
||||
if not pid:
|
||||
raise koji.GenericError, "No pid for task %i" % task_id
|
||||
children = self._childPIDs(pid)
|
||||
if children:
|
||||
# send SIGINT once to let mock mock try to clean up
|
||||
self._killChildren(task_id, children, sig=signal.SIGINT, pause=2.0)
|
||||
if children:
|
||||
self._killChildren(task_id, children)
|
||||
if children:
|
||||
self._killChildren(task_id, children, sig=signal.SIGKILL, timeout=3.0)
|
||||
|
||||
#expire the task's subsession
|
||||
session_id = self.subsessions.get(task_id)
|
||||
if session_id:
|
||||
|
|
@ -1095,7 +1125,7 @@ class TaskManager(object):
|
|||
except:
|
||||
#not much we can do about it
|
||||
pass
|
||||
return ch_killed
|
||||
return len(children) == 0
|
||||
|
||||
def checkSpace(self):
|
||||
"""See if we have enough space to accept another job"""
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue