diff --git a/vm/kojivmd b/vm/kojivmd index f5fbbf3f..c2993789 100755 --- a/vm/kojivmd +++ b/vm/kojivmd @@ -22,6 +22,8 @@ import koji import koji.util +from koji.daemon import TaskManager +from koji.tasks import ServerExit, BaseTaskHandler import sys import logging import os @@ -41,8 +43,6 @@ import base64 import pwd from ConfigParser import ConfigParser from optparse import OptionParser -from xmlrpclib import Fault -import errno try: import krbV except ImportError: @@ -53,995 +53,12 @@ except ImportError: def libvirt_callback(ignore, err): if err[3] != libvirt.VIR_ERR_ERROR: # Don't log libvirt errors: global error handler will do that - logging.warn("Non-error from libvirt: '%s'" % err[2]) + logging.warn("Non-error from libvirt: '%s'", err[2]) libvirt.registerErrorHandler(f=libvirt_callback, ctx=None) -#################### -# XXX All of this should be factored out into a separate module (koji.task?) and imported -# instead of being copy-and-pasted from builder/kojid, but we're saving that for an -# upcoming major refactor. Just do this for now so we can make progress. -#################### - -class ServerExit(Exception): - """Raised to shutdown the server""" - pass - -def _parseStatus(rv, prefix): - if isinstance(prefix, list) or isinstance(prefix, tuple): - prefix = ' '.join(prefix) - if os.WIFSIGNALED(rv): - return '%s was killed by signal %i' % (prefix, os.WTERMSIG(rv)) - elif os.WIFEXITED(rv): - return '%s exited with status %i' % (prefix, os.WEXITSTATUS(rv)) - else: - return '%s terminated for unknown reasons' % prefix - -def safe_rmtree(path, unmount=False, strict=True): - logger = logging.getLogger("koji.build") - #safe remove: with -xdev the find cmd will not cross filesystems - # (though it will cross bind mounts from the same filesystem) - if not os.path.exists(path): - logger.debug("No such path: %s" % path) - return - if unmount: - umount_all(path) - #first rm -f non-directories - logger.debug('Scrubbing files in %s' % path) - rv = os.system("find '%s' -xdev \\! -type d -print0 |xargs -0 rm -f" % path) - msg = 'file removal failed (code %r) for %s' % (rv,path) - if rv != 0: - logger.warn(msg) - if strict: - raise koji.GenericError, msg - else: - return rv - #them rmdir directories - #with -depth, we start at the bottom and work up - logger.debug('Scrubbing directories in %s' % path) - rv = os.system("find '%s' -xdev -depth -type d -print0 |xargs -0 rmdir" % path) - msg = 'dir removal failed (code %r) for %s' % (rv,path) - if rv != 0: - logger.warn(msg) - if strict: - raise koji.GenericError, msg - return rv - -class TaskManager(object): - - def __init__(self): - self.tasks = {} - self.pids = {} - self.subsessions = {} - self.findHandlers() - self.status = '' - self.ready = False - self.host_id = session.host.getID() - self.logger = logging.getLogger("koji.vm.TaskManager") - - def findHandlers(self): - """Find and index task handlers""" - handlers = {} - for v in globals().values(): - if type(v) == type(BaseTaskHandler) and issubclass(v,BaseTaskHandler): - for method in v.Methods: - handlers[method] = v - self.handlers = handlers - - def scanPlugin(self, plugin): - """Find task handlers in a plugin""" - # XXX - this is a very simple implementation for now. - # it should be improved - for v in vars(plugin).itervalues(): - if type(v) == type(BaseTaskHandler) and issubclass(v,BaseTaskHandler): - for method in v.Methods: - self.handlers[method] = v - - def shutdown(self): - """Attempt to shut down cleanly""" - for task_id in self.pids.keys(): - self.cleanupTask(task_id) - session.host.freeTasks(self.tasks.keys()) - session.host.updateHost(task_load=0.0,ready=False) - - def updateBuildroots(self): - """Handle buildroot cleanup/maintenance - - - examine current buildroots on system - - compare with db - - clean up as needed - - /var/lib/mock - - /etc/mock/koji - """ - local_br = self._scanLocalBuildroots() - #query buildroots in db that are not expired - states = [ koji.BR_STATES[x] for x in ('INIT','WAITING','BUILDING') ] - db_br = session.listBuildroots(hostID=self.host_id,state=tuple(states)) - # index by id - db_br = dict([(row['id'],row) for row in db_br]) - st_expired = koji.BR_STATES['EXPIRED'] - for id, br in db_br.items(): - task_id = br['task_id'] - if task_id is None: - # not associated with a task - # this makes no sense now, but may in the future - self.logger.warn("Expiring taskless buildroot: %(id)i/%(tag_name)s/%(arch)s" % br) - session.host.setBuildRootState(id,st_expired) - elif not self.tasks.has_key(task_id): - #task not running - expire the buildroot - #TODO - consider recycling hooks here (with strong sanity checks) - self.logger.info("Expiring buildroot: %(id)i/%(tag_name)s/%(arch)s" % br) - self.logger.debug("Buildroot task: %r, Current tasks: %r" % (task_id,self.tasks.keys())) - session.host.setBuildRootState(id,st_expired) - continue - # get info on local_only buildroots (most likely expired) - local_only = [id for id in local_br.iterkeys() if not db_br.has_key(id)] - if local_only: - missed_br = session.listBuildroots(buildrootID=tuple(local_only)) - #get all the task info in one call - tasks = [] - for br in missed_br: - task_id = br['task_id'] - if task_id: - tasks.append(task_id) - #index - missed_br = dict([(row['id'],row) for row in missed_br]) - tasks = dict([(row['id'],row) for row in session.getTaskInfo(tasks)]) - for id in local_only: - # Cleaning options - # - wait til later - # - "soft" clean (leaving empty root/ dir) - # - full removal - data = local_br[id] - br = missed_br.get(id) - if not br: - self.logger.warn("%(name)s: not in db" % data) - continue - desc = "%(id)i/%(tag_name)s/%(arch)s" % br - if not br['retire_ts']: - self.logger.warn("%s: no retire timestamp" % desc) - continue - age = time.time() - br['retire_ts'] - self.logger.debug("Expired/stray buildroot: %s" % desc) - if br and br['task_id']: - task = tasks.get(br['task_id']) - if not task: - self.logger.warn("%s: invalid task %s" % (desc, br['task_id'])) - continue - if (task['state'] == koji.TASK_STATES['FAILED'] and age < 3600 * 4): - #XXX - this could be smarter - # keep buildroots for failed tasks around for a little while - self.logger.debug("Keeping failed buildroot: %s" % desc) - continue - topdir = data['dir'] - rootdir = None - if topdir: - rootdir = "%s/root" % topdir - try: - st = os.lstat(rootdir) - except OSError, e: - if e.errno == errno.ENOENT: - rootdir = None - else: - self.logger.warn("%s: %s" % (desc, e)) - continue - else: - age = min(age, time.time() - st.st_mtime) - #note: https://bugzilla.redhat.com/bugzilla/show_bug.cgi?id=192153) - #If rpmlib is installing in this chroot, removing it entirely - #can lead to a world of hurt. - #We remove the rootdir contents but leave the rootdir unless it - #is really old - if age > 3600*24: - #dir untouched for a day - self.logger.info("Removing buildroot: %s" % desc) - if topdir and safe_rmtree(topdir, unmount=True, strict=False) != 0: - continue - #also remove the config - try: - os.unlink(data['cfg']) - except OSError, e: - self.logger.warn("%s: can't remove config: %s" % (desc, e)) - elif age > 120: - if rootdir: - try: - flist = os.listdir(rootdir) - except OSError, e: - self.logger.warn("%s: can't list rootdir: %s" % (desc, e)) - continue - if flist: - self.logger.info("%s: clearing rootdir" % desc) - for fn in flist: - safe_rmtree("%s/%s" % (rootdir,fn), unmount=True, strict=False) - resultdir = "%s/result" % topdir - if os.path.isdir(resultdir): - self.logger.info("%s: clearing resultdir" % desc) - safe_rmtree(resultdir, unmount=True, strict=False) - else: - self.logger.debug("Recent buildroot: %s: %i seconds" % (desc,age)) - self.logger.debug("Local buildroots: %d" % len(local_br)) - self.logger.debug("Active buildroots: %d" % len(db_br)) - self.logger.debug("Expired/stray buildroots: %d" % len(local_only)) - - def _scanLocalBuildroots(self): - #XXX - configdir = '/etc/mock/koji' - buildroots = {} - for f in os.listdir(configdir): - if not f.endswith('.cfg'): - continue - fn = "%s/%s" % (configdir,f) - if not os.path.isfile(fn): - continue - fo = file(fn,'r') - id = None - name = None - for n in xrange(10): - # data should be in first few lines - line = fo.readline() - if line.startswith('# Koji buildroot id:'): - try: - id = int(line.split(':')[1]) - except ValueError,IndexError: - continue - if line.startswith('# Koji buildroot name:'): - try: - name = line.split(':')[1].strip() - except ValueError,IndexError: - continue - if id is None or name is None: - continue - # see if there's a dir for the buildroot - vardir = "/var/lib/mock/%s" % name - #XXX - buildroots[id] = {} - buildroots[id]['name'] = name - buildroots[id]['cfg'] = fn - buildroots[id]['dir'] = None - if os.path.isdir(vardir): - buildroots[id]['dir'] = vardir - return buildroots - - def updateTasks(self): - """Read and process task statuses from server - - The processing we do is: - 1) clean up after tasks that are not longer active: - * kill off processes - * retire buildroots - * remove buildroots - - with some possible exceptions - 2) wake waiting tasks if appropriate - """ - tasks = {} - stale = [] - task_load = 0.0 - if self.pids: - self.logger.info("pids: %r" % self.pids) - for task in session.host.getHostTasks(): - self.logger.info("open task: %r" % task) - # the tasks returned are those that are open and locked - # by this host. - id = task['id'] - if not self.pids.has_key(id): - #We don't have a process for this - #Expected to happen after a restart, otherwise this is an error - stale.append(id) - continue - tasks[id] = task - if task.get('alert',False): - #wake up the process - self.logger.info("Waking up task: %r" % task) - os.kill(self.pids[id],signal.SIGUSR2) - if not task['waiting']: - task_load += task['weight'] - self.logger.debug("Task Load: %s" % task_load) - self.task_load = task_load - self.tasks = tasks - self.logger.debug("Current tasks: %r" % self.tasks) - if len(stale) > 0: - #A stale task is one which is opened to us, but we know nothing - #about). This will happen after a daemon restart, for example. - self.logger.info("freeing stale tasks: %r" % stale) - session.host.freeTasks(stale) - for id, pid in self.pids.items(): - if self._waitTask(id, pid): - # the subprocess handles most everything, we just need to clear things out - if self.cleanupTask(id, wait=False): - del self.pids[id] - if self.tasks.has_key(id): - del self.tasks[id] - for id, pid in self.pids.items(): - if not tasks.has_key(id): - # expected to happen when: - # - we are in the narrow gap between the time the task - # records its result and the time the process actually - # exits. - # - task is canceled - # - task is forcibly reassigned/unassigned - tinfo = session.getTaskInfo(id) - if tinfo is None: - raise koji.GenericError, "Invalid task %r (pid %r)" % (id,pid) - elif tinfo['state'] == koji.TASK_STATES['CANCELED']: - self.logger.info("Killing canceled task %r (pid %r)" % (id,pid)) - if self.cleanupTask(id): - del self.pids[id] - elif tinfo['host_id'] != self.host_id: - self.logger.info("Killing reassigned task %r (pid %r)" % (id,pid)) - if self.cleanupTask(id): - del self.pids[id] - else: - self.logger.info("Lingering task %r (pid %r)" % (id,pid)) - - def getNextTask(self): - self.ready = self.readyForTask() - session.host.updateHost(self.task_load,self.ready) - if not self.ready: - self.logger.info("Not ready for task") - return False - hosts, tasks = session.host.getLoadData() - self.logger.debug("Load Data:") - self.logger.debug(" hosts: %r" % hosts) - self.logger.debug(" tasks: %r" % tasks) - #now we organize this data into channel-arch bins - bin_hosts = {} #hosts indexed by bin - bins = {} #bins for this host - our_avail = None - for host in hosts: - host['bins'] = [] - if host['id'] == self.host_id: - #note: task_load reported by server might differ from what we - #sent due to precision variation - our_avail = host['capacity'] - host['task_load'] - for chan in host['channels']: - for arch in host['arches'].split() + ['noarch']: - bin = "%s:%s" % (chan,arch) - bin_hosts.setdefault(bin,[]).append(host) - if host['id'] == self.host_id: - bins[bin] = 1 - self.logger.debug("bins: %r" % bins) - if our_avail is None: - self.logger.info("Server did not report this host. Are we disabled?") - return False - elif not bins: - self.logger.info("No bins for this host. Missing channel/arch config?") - return False - #sort available capacities for each of our bins - avail = {} - for bin in bins.iterkeys(): - avail[bin] = [host['capacity'] - host['task_load'] for host in bin_hosts[bin]] - avail[bin].sort() - avail[bin].reverse() - for task in tasks: - # note: tasks are in priority order - self.logger.debug("task: %r" % task) - if self.tasks.has_key(task['id']): - # we were running this task, but it apparently has been - # freed or reassigned. We can't do anything with it until - # updateTasks notices this and cleans up. - self.logger.debug("Task %(id)s freed or reassigned", task) - continue - if task['state'] == koji.TASK_STATES['ASSIGNED']: - self.logger.debug("task is assigned") - if self.host_id == task['host_id']: - #assigned to us, we can take it regardless - if self.takeTask(task['id']): - return True - elif task['state'] == koji.TASK_STATES['FREE']: - bin = "%(channel_id)s:%(arch)s" % task - self.logger.debug("task is free, bin=%r" % bin) - if not bins.has_key(bin): - continue - #see where our available capacity is compared to other hosts for this bin - #(note: the hosts in this bin are exactly those that could - #accept this task) - bin_avail = avail.get(bin, [0]) - self.logger.debug("available capacities for bin: %r" % bin_avail) - median = bin_avail[(len(bin_avail)-1)/2] - self.logger.debug("ours: %.2f, median: %.2f" % (our_avail, median)) - if our_avail < median: - self.logger.debug("Skipping - available capacity in lower half") - #decline for now and give the upper half a chance - return False - #otherwise, we attempt to open the task - if self.takeTask(task['id']): - return True - else: - #should not happen - raise Exception, "Invalid task state reported by server" - return False - - def _waitTask(self, task_id, pid=None): - """Wait (nohang) on the task, return true if finished""" - if pid is None: - pid = self.pids.get(task_id) - if not pid: - raise koji.GenericError, "No pid for task %i" % task_id - prefix = "Task %i (pid %i)" % (task_id, pid) - try: - (childpid, status) = os.waitpid(pid, os.WNOHANG) - except OSError, e: - #check errno - if e.errno != errno.ECHILD: - #should not happen - raise - #otherwise assume the process is gone - self.logger.info("%s: %s" % (prefix, e)) - return True - if childpid != 0: - self.logger.info(_parseStatus(status, prefix)) - return True - return False - - def _doKill(self, task_id, pid, cmd, sig, timeout, pause): - """ - Kill the process with the given process ID. - Return True if the process is successfully killed in - the given timeout, False otherwise. - """ - self.logger.info('Checking "%s" (pid %i, taskID %i)...' % (cmd, pid, task_id)) - execname = cmd.split()[0] - signaled = False - t = 0.0 - while True: - status = self._getStat(pid) - if status and status[1] == cmd and status[2] != 'Z': - self.logger.info('%s (pid %i, taskID %i) is running' % (execname, pid, task_id)) - else: - if signaled: - self.logger.info('%s (pid %i, taskID %i) was killed by signal %i' % (execname, pid, task_id, sig)) - else: - self.logger.info('%s (pid %i, taskID %i) exited' % (execname, pid, task_id)) - return True - - if t >= timeout: - self.logger.warn('Failed to kill %s (pid %i, taskID %i) with signal %i' % - (execname, pid, task_id, sig)) - return False - - try: - os.kill(pid, sig) - except OSError, e: - # process probably went away, we'll find out on the next iteration - self.logger.info('Error sending signal %i to %s (pid %i, taskID %i): %s' % - (sig, execname, pid, task_id, e)) - else: - signaled = True - self.logger.info('Sent signal %i to %s (pid %i, taskID %i)' % - (sig, execname, pid, task_id)) - - time.sleep(pause) - t += pause - - def _getStat(self, pid): - """ - Get the stat info for the given pid. - Return a list of all the fields in /proc//stat. - The second entry will contain the full command-line instead of - just the command name. - If the process does not exist, return None. - """ - try: - proc_path = '/proc/%i/stat' % pid - if not os.path.isfile(proc_path): - return None - proc_file = file(proc_path) - procstats = [not field.isdigit() and field or int(field) for field in proc_file.read().split()] - proc_file.close() - - cmd_path = '/proc/%i/cmdline' % pid - if not os.path.isfile(cmd_path): - return None - cmd_file = file(cmd_path) - procstats[1] = cmd_file.read().replace('\0', ' ').strip() - cmd_file.close() - if not procstats[1]: - return None - - return procstats - except IOError, e: - # process may have already gone away - return None - - def _childPIDs(self, pid): - """Recursively get the children of the process with the given ID. - Return a list containing the process IDs of the children - in breadth-first order, without duplicates.""" - statsByPPID = {} - pidcmd = None - for procdir in os.listdir('/proc'): - if not procdir.isdigit(): - continue - procid = int(procdir) - procstats = self._getStat(procid) - if not procstats: - continue - statsByPPID.setdefault(procstats[3], []).append(procstats) - if procid == pid: - pidcmd = procstats[1] - - pids = [] - if pidcmd: - # only append the pid if it still exists - pids.append((pid, pidcmd)) - - parents = [pid] - while parents: - for ppid in parents[:]: - for procstats in statsByPPID.get(ppid, []): - # get the /proc entries with ppid as their parent, and append their pid to the list, - # then recheck for their children - # pid is the 0th field, ppid is the 3rd field - pids.append((procstats[0], procstats[1])) - parents.append(procstats[0]) - parents.remove(ppid) - - return pids - - def _killChildren(self, task_id, children, sig=signal.SIGTERM, timeout=2.0, pause=1.0): - """ - Kill child processes of the given task, as specified in the children list, - by sending sig. - Retry every pause seconds, within timeout. - Remove successfully killed processes from the "children" list. - """ - for childpid, cmd in children[::-1]: - # iterate in reverse order so processes whose children are killed might have - # a chance to cleanup before they're killed - if self._doKill(task_id, childpid, cmd, sig, timeout, pause): - children.remove((childpid, cmd)) - - def cleanupTask(self, task_id, wait=True): - """Clean up after task - - - kill children - - expire session - - Return True if all children were successfully killed, False otherwise. - """ - pid = self.pids.get(task_id) - if not pid: - raise koji.GenericError, "No pid for task %i" % task_id - children = self._childPIDs(pid) - if children: - # send SIGINT once to let mock mock try to clean up - self._killChildren(task_id, children, sig=signal.SIGINT, pause=3.0) - if children: - self._killChildren(task_id, children) - if children: - self._killChildren(task_id, children, sig=signal.SIGKILL, timeout=3.0) - - #expire the task's subsession - session_id = self.subsessions.get(task_id) - if session_id: - self.logger.info("Expiring subsession %i (task %i)" % (session_id, task_id)) - try: - session.logoutChild(session_id) - del self.subsessions[task_id] - except: - #not much we can do about it - pass - if wait: - return self._waitTask(task_id, pid) - else: - # task has already been waited on, and we've cleaned - # up as much as we can - return True - - def checkSpace(self): - """See if we have enough space to accept another job""" - global options - br_path = options.mockdir - if not os.path.exists(br_path): - self.logger.error("No such directory: %s" % br_path) - raise IOError, "No such directory: %s" % br_path - fs_stat = os.statvfs(br_path) - available = fs_stat.f_bavail * fs_stat.f_bsize - availableMB = available / 1024 / 1024 - self.logger.debug("disk space available in '%s': %i MB", br_path, availableMB) - if availableMB < options.minspace: - self.status = "Insufficient disk space: %i MB, %i MB required" % (availableMB, options.minspace) - self.logger.warn(self.status) - return False - return True - - def readyForTask(self): - """Determine if the system is ready to accept a new task. - - This function measures the system load and tries to determine - if there is room to accept a new task.""" - # key resources to track: - # disk_space - # df -P path - # df -iP path ? - # memory (meminfo/vmstat) - # vmstat fields 3-6 (also 7-8 for swap) - # http://www.redhat.com/advice/tips/meminfo.html - # cpu cycles (vmstat?) - # vmstat fields 13-16 (and others?) - # others?: - # io (iostat/vmstat) - # network (netstat?) - global options - hostdata = session.host.getHost() - self.logger.debug('hostdata: %r' % hostdata) - if not hostdata['enabled']: - self.status = "Host is disabled" - self.logger.info(self.status) - return False - if self.task_load > hostdata['capacity']: - self.status = "Over capacity" - self.logger.info("Task load (%.2f) exceeds capacity (%.2f)" % (self.task_load, hostdata['capacity'])) - return False - if len(self.tasks) >= options.maxjobs: - # This serves as a backup to the capacity check and prevents - # a tremendous number of low weight jobs from piling up - self.status = "Full queue" - self.logger.info(self.status) - return False - if not self.checkSpace(): - # checkSpace() does its own logging - return False - loadavgs = os.getloadavg() - # this likely treats HT processors the same as real ones - # but that's fine, it's a conservative test - maxload = 4.0 * os.sysconf('SC_NPROCESSORS_ONLN') - if loadavgs[0] > maxload: - self.status = "Load average %.2f > %.2f" % (loadavgs[0], maxload) - self.logger.info(self.status) - return False - #XXX - add more checks - return True - - def takeTask(self,task_id): - """Attempt to open the specified task - - Returns True if successful, False otherwise - """ - self.logger.info("Attempting to take task %s" %task_id) - data = session.host.openTask(task_id) - if data is None: - self.logger.warn("Could not open") - return False - if not data.has_key('request') or data['request'] is None: - self.logger.warn("Task '%s' has no request" % task_id) - return False - id = data['id'] - request = data['request'] - self.tasks[id] = data - params, method = xmlrpclib.loads(request) - if self.handlers.has_key(method): - handlerClass = self.handlers[method] - elif self.handlers.has_key('default'): - handlerClass = self.handlers['default'] - else: - raise koji.GenericError, "No handler found for method '%s'" % method - if issubclass(handlerClass, BaseTaskHandler): - #new style handler needs session and options passed - # handler = handlerClass(id,method,params,session,options) - handler = handlerClass(id,method,params) - else: - handler = handlerClass(id,method,params) - # set weight - session.host.setTaskWeight(task_id,handler.weight()) - if handler.Foreground: - self.logger.info("running task in foreground") - handler.setManager(self) - self.runTask(handler) - else: - pid, session_id = self.forkTask(handler) - self.pids[id] = pid - self.subsessions[id] = session_id - return True - - def forkTask(self,handler): - global session - #get the subsession before we fork - newhub = session.subsession() - session_id = newhub.sinfo['session-id'] - pid = os.fork() - if pid: - newhub._forget() - return pid, session_id - #in no circumstance should we return after the fork - #nor should any exceptions propagate past here - try: - session._forget() - #set process group - os.setpgrp() - #use the subsession - session = newhub - if hasattr(handler, 'session'): - handler.session = session - #set a do-nothing handler for sigusr2 - signal.signal(signal.SIGUSR2,lambda *args: None) - self.runTask(handler) - finally: - #diediedie - try: - session.logout() - finally: - os._exit(0) - - def runTask(self,handler): - fail = False - try: - response = (handler.run(),) - # note that we wrap response in a singleton tuple - response = xmlrpclib.dumps(response, methodresponse=1, allow_none=1) - self.logger.info("RESPONSE: %r" % response) - except Fault, fault: - fail = True - response = xmlrpclib.dumps(fault) - tb = ''.join(traceback.format_exception(*sys.exc_info())).replace(r"\n", "\n") - self.logger.warn("FAULT:\n%s" % tb) - except (SystemExit,ServerExit,KeyboardInterrupt): - #we do not trap these - raise - except: - fail = True - # report exception back to server - e_class, e = sys.exc_info()[:2] - faultCode = getattr(e_class,'faultCode',1) - if issubclass(e_class, koji.GenericError): - #just pass it through - tb = str(e) - self.logger.warn(tb) - else: - tb = ''.join(traceback.format_exception(*sys.exc_info())) - self.logger.warn("TRACEBACK: %s" % tb) - response = xmlrpclib.dumps(xmlrpclib.Fault(faultCode, tb)) - - if fail: - session.host.failTask(handler.id, response) - else: - session.host.closeTask(handler.id, response) - -class BaseTaskHandler(object): - """The base class for task handlers - - Each task handler is a class, a new instance of which is created - to handle each task. - """ - - # list of methods the class can handle - Methods = [] - - # Options: - Foreground = False - - def __init__(self, id, method, params, workdir=None): - global options - self.id = id #task id - if method not in self.Methods: - raise koji.GenericError, 'method "%s" is not supported' % method - self.method = method - # handle named parameters - self.params,self.opts = koji.decode_args(*params) - if workdir is None: - workdir = "%s/%s" % (options.workdir, koji.pathinfo.taskrelpath(id)) - self.workdir = workdir - self.logger = logging.getLogger("koji.vm.BaseTaskHandler") - - def setManager(self,manager): - """Set the manager attribute - - This is only used for foreground tasks to give them access - to their task manager. - """ - if not self.Foreground: - return - self.manager = manager - - def handler(self): - """(abstract) the handler for the task.""" - raise NotImplementedError - - def run(self): - """Execute the task""" - self.createWorkdir() - try: - return self.handler(*self.params,**self.opts) - finally: - self.removeWorkdir() - - _taskWeight = 1.0 - - def weight(self): - """Return the weight of the task. - - This is run by the taskmanager before the task is run to determine - the weight of the task. The weight is an abstract measure of the - total load the task places on the system while running. - - A task may set _taskWeight for a constant weight different from 1, or - override this function for more complicated situations. - - Note that task weight is partially ignored while the task is sleeping. - """ - return getattr(self,'_taskWeight',1.0) - - def createWorkdir(self): - if self.workdir is None: - return - self.removeWorkdir() - os.makedirs(self.workdir) - - def removeWorkdir(self): - if self.workdir is None: - return - safe_rmtree(self.workdir, unmount=False, strict=True) - #os.spawnvp(os.P_WAIT, 'rm', ['rm', '-rf', self.workdir]) - - def wait(self, subtasks=None, all=False, failany=False): - """Wait on subtasks - - subtasks is a list of integers (or an integer). If more than one subtask - is specified, then the default behavior is to return when any of those - tasks complete. However, if all is set to True, then it waits for all of - them to complete. If all and failany are both set to True, then each - finished task will be checked for failure, and a failure will cause all - of the unfinished tasks to be cancelled. - - special values: - subtasks = None specify all subtasks - - Implementation notes: - The build daemon forks all tasks as separate processes. This function - uses signal.pause to sleep. The main process watches subtasks in - the database and will send the subprocess corresponding to the - subtask a SIGUSR2 to wake it up when subtasks complete. - """ - if isinstance(subtasks,int): - # allow single integer w/o enclosing list - subtasks = [subtasks] - session.host.taskSetWait(self.id,subtasks) - self.logger.debug("Waiting on %r" % subtasks) - while True: - finished, unfinished = session.host.taskWait(self.id) - if len(unfinished) == 0: - #all done - break - elif len(finished) > 0: - if all: - if failany: - failed = False - for task in finished: - try: - result = session.getTaskResult(task) - except (koji.GenericError, Fault), task_error: - self.logger.info("task %s failed or was canceled" % task) - failed = True - break - if failed: - self.logger.info("at least one task failed or was canceled, cancelling unfinished tasks") - session.cancelTaskChildren(self.id) - # reraise the original error now, rather than waiting for - # an error in taskWaitResults() - raise task_error - else: - # at least one done - break - # signal handler set by TaskManager.forkTask - self.logger.debug("Pausing...") - signal.pause() - # main process will wake us up with SIGUSR2 - self.logger.debug("...waking up") - self.logger.debug("Finished waiting") - return dict(session.host.taskWaitResults(self.id,subtasks)) - - def getUploadDir(self): - return koji.pathinfo.taskrelpath(self.id) - - def uploadFile(self, filename, relPath=None, remoteName=None): - """Upload the file with the given name to the task output directory - on the hub.""" - uploadPath = self.getUploadDir() - if relPath: - relPath = relPath.strip('/') - uploadPath += '/' + relPath - # Only upload files with content - if os.path.isfile(filename) and os.stat(filename).st_size > 0: - session.uploadWrapper(filename, uploadPath, remoteName) - - def uploadTree(self, dirpath, flatten=False): - """Upload the directory tree at dirpath to the task directory on the - hub, preserving the directory structure""" - dirpath = dirpath.rstrip('/') - for path, dirs, files in os.walk(dirpath): - if flatten: - relpath = None - else: - relpath = path[len(dirpath) + 1:] - for filename in files: - self.uploadFile(os.path.join(path, filename), relpath) - - def localPath(self, relpath): - """Return a local path to a remote file. - - If the file is on an nfs mount, use that, otherwise download a copy""" - if options.topurl: - fn = "%s/local/%s" % (self.workdir, relpath) - if os.path.exists(fn): - # We've already downloaded this file, - # just return the existing local path - return fn - self.logger.debug("Downloading %s", relpath) - url = "%s/%s" % (options.topurl, relpath) - fsrc = urllib2.urlopen(url) - if not os.path.exists(os.path.dirname(fn)): - os.makedirs(os.path.dirname(fn)) - fdst = file(fn, 'w') - shutil.copyfileobj(fsrc, fdst) - fsrc.close() - fdst.close() - else: - fn = "%s/%s" % (options.topdir, relpath) - return fn - - def subtask(self, method, arglist, **opts): - return session.host.subtask(method, arglist, self.id, **opts) - - def subtask2(self, __taskopts, __method, *args, **kwargs): - return session.host.subtask2(self.id, __taskopts, __method, *args, **kwargs) - - def find_arch(self, arch, host, tag): - """ - For noarch tasks, find a canonical arch that is supported by both the host and tag. - If the arch is anything other than noarch, return it unmodified. - """ - if arch != "noarch": - return arch - - # We need a concrete arch. Pick one that: - # a) this host can handle - # b) the build tag can support - # c) is canonical - host_arches = host['arches'] - if not host_arches: - raise koji.BuildError, "No arch list for this host: %s" % host['name'] - tag_arches = tag['arches'] - if not tag_arches: - raise koji.BuildError, "No arch list for tag: %s" % tag['name'] - # index canonical host arches - host_arches = set([koji.canonArch(a) for a in host_arches.split()]) - # index canonical tag arches - tag_arches = set([koji.canonArch(a) for a in tag_arches.split()]) - # find the intersection of host and tag arches - common_arches = list(host_arches & tag_arches) - if common_arches: - # pick one of the common arches randomly - # need to re-seed the prng or we'll get the same arch every time, - # because we just forked from a common parent - random.seed() - arch = random.choice(common_arches) - self.logger.info('Valid arches: %s, using: %s' % (' '.join(common_arches), arch)) - return arch - else: - # no overlap - raise koji.BuildError, "host %s (%s) does not support any arches of tag %s (%s)" % \ - (host['name'], ', '.join(host_arches), tag['name'], ', '.join(tag_arches)) - -class DefaultTask(BaseTaskHandler): - """Used when no matching method is found""" - Methods = ['default'] - _taskWeight = 0.1 - def __init__(self, id, method, params, workdir=None): - self.id = id #task id - self.method = method - self.params = params - self.workdir = None - self.opts = {} - def handler(self,*args,**opts): - raise koji.GenericError, "Invalid method: %s" % self.method - -#################### -# XXX End heinous copy-and-paste (mostly) -#################### - def get_options(): """process options from command line and config file""" - global options # parse command line args parser = OptionParser() parser.add_option("-c", "--config", dest="configFile", @@ -1139,6 +156,8 @@ def get_options(): if not options.server: parser.error("--server argument required") + return options + def quit(msg=None, code=1): if msg: logging.getLogger("koji.vm").error(msg) @@ -1146,24 +165,23 @@ def quit(msg=None, code=1): sys.stderr.flush() sys.exit(code) -def main(): - global session - global options +def main(options, session): logger = logging.getLogger("koji.vm") logger.info('Starting up') - tm = VMTaskManager() + tm = VMTaskManager(options, session) + tm.findHandlers(globals()) if options.plugin: #load plugins pt = koji.plugin.PluginTracker(path=options.pluginpath.split(':')) for name in options.plugin: logger.info('Loading plugin: %s' % name) - tm.scanPlugin(pt.load(name)) + tm.findHandlers(vars(pt.load(name))) def shutdown(*args): raise SystemExit signal.signal(signal.SIGTERM,shutdown) taken = False tm.cleanupAllVMs() - while 1: + while True: try: tm.updateTasks() taken = tm.getNextTask() @@ -1250,9 +268,8 @@ class VMTask(BaseTaskHandler): QCOW2_EXT = '.qcow2' def __init__(self, *args, **kw): - global options super(VMTask, self).__init__(*args, **kw) - self.task_manager = xmlrpclib.ServerProxy('http://%s:%s/' % (options.privaddr, options.portbase), + self.task_manager = xmlrpclib.ServerProxy('http://%s:%s/' % (self.options.privaddr, self.options.portbase), allow_none=True) self.port = None self.server = None @@ -1261,9 +278,8 @@ class VMTask(BaseTaskHandler): self.success = None def mkqcow2(self, clone_name, source_disk, disk_num): - global options new_name = clone_name + '-disk-' + str(disk_num) + self.QCOW2_EXT - new_path = os.path.join(options.imagedir, new_name) + new_path = os.path.join(self.options.imagedir, new_name) cmd = ['/usr/bin/qemu-img', 'create', '-b', source_disk, '-f', 'qcow2', new_path] proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) ret = proc.wait() @@ -1271,7 +287,7 @@ class VMTask(BaseTaskHandler): output = proc.stdout.read() raise koji.BuildError, 'unable to create qcow2 image, "%s" returned %s; output was: %s' % \ (' '.join(cmd), ret, output) - vm_user = pwd.getpwnam(options.vmuser) + vm_user = pwd.getpwnam(self.options.vmuser) os.chown(new_path, vm_user.pw_uid, vm_user.pw_gid) return new_path @@ -1404,8 +420,7 @@ class VMTask(BaseTaskHandler): Setup the task-specific xmlrpc server to listen to requests from the VM. """ - global options - self.server = TaskXMLRPCServer(options.privaddr, self.port, self) + self.server = TaskXMLRPCServer(self.options.privaddr, self.port, self) thr = threading.Thread(name='task_%s_thread' % self.id, target=self.server.handle_while_active) thr.setDaemon(True) @@ -1418,7 +433,6 @@ class VMTask(BaseTaskHandler): - timeout (int): number of minutes to let the VM run before destroying it and failing the task, default: 1440 """ - global options if not opts: opts = {} timeout = opts.get('timeout', 720) @@ -1446,7 +460,7 @@ class VMTask(BaseTaskHandler): registered = False while not registered: # loop in case the port is already taken - self.port = options.portbase + random.randint(1, 100) + self.port = self.options.portbase + random.randint(1, 100) registered = self.task_manager.registerVM(macaddr, clone_name, self.id, self.port) self.setupTaskServer() vm.create() @@ -1489,8 +503,8 @@ class ManagerXMLRPCServer(DaemonXMLRPCServer): self.register_function(manager.getPort) class VMTaskManager(TaskManager): - def __init__(self): - super(VMTaskManager, self).__init__() + def __init__(self, options, session): + super(VMTaskManager, self).__init__(options, session) self.libvirt_conn = libvirt.open(None) self.macaddrs = {} self.macaddr_lock = threading.Lock() @@ -1531,8 +545,7 @@ class VMTaskManager(TaskManager): self.macaddr_lock.release() def setupServer(self): - global options - self.server = ManagerXMLRPCServer(options.privaddr, options.portbase, self) + self.server = ManagerXMLRPCServer(self.options.privaddr, self.options.portbase, self) thr = threading.Thread(name='manager_thread', target=self.server.handle_while_active) thr.setDaemon(True) thr.start() @@ -1552,22 +565,20 @@ class VMTaskManager(TaskManager): return disks def checkDisk(self): - global options - if not os.path.exists(options.imagedir): - self.logger.error('No such directory: %s' % options.imagedir) - raise IOError, 'No such directory: %s' % options.imagedir - fs_stat = os.statvfs(options.imagedir) + if not os.path.exists(self.options.imagedir): + self.logger.error('No such directory: %s' % self.options.imagedir) + raise IOError, 'No such directory: %s' % self.options.imagedir + fs_stat = os.statvfs(self.options.imagedir) available = fs_stat.f_bavail * fs_stat.f_bsize availableMB = available / 1024 / 1024 - self.logger.debug('disk space available in %s: %i MB', options.imagedir, availableMB) - if availableMB < options.minspace: - self.status = 'Insufficient disk space: %i MB, %i MB required' % (availableMB, options.minspace) + self.logger.debug('disk space available in %s: %i MB', self.options.imagedir, availableMB) + if availableMB < self.options.minspace: + self.status = 'Insufficient disk space: %i MB, %i MB required' % (availableMB, self.options.minspace) self.logger.warn(self.status) return False return True def checkMem(self): - global options phys_mem = os.sysconf('SC_PHYS_PAGES') * os.sysconf('SC_PAGE_SIZE') / 1024 vm_mem = 0 for vm_id in self.libvirt_conn.listDomainsID(): @@ -1578,7 +589,7 @@ class VMTaskManager(TaskManager): vm_mem += info[2] avail_mem = phys_mem - vm_mem # options.minmem is listed in mbytes - min_mem = options.minmem * 1024 + min_mem = self.options.minmem * 1024 self.logger.debug('physical mem: %sk, allocated mem: %sk, available mem: %sk' % \ (phys_mem, vm_mem, avail_mem)) if avail_mem < min_mem: @@ -1649,7 +660,7 @@ class VMTaskManager(TaskManager): if ret: for macaddr, (vm_name, id, port) in self.macaddrs.items(): if task_id == id: - self.expired_vms[vm_name] = session.getTaskInfo(task_id) + self.expired_vms[vm_name] = self.session.getTaskInfo(task_id) del self.macaddrs[macaddr] self.logger.info('unregistered MAC address %s' % macaddr) break @@ -1658,20 +669,19 @@ class VMTaskManager(TaskManager): self.macaddr_lock.release() def shutdown(self): - super(VMTaskManager, self).shutdown() - self.libvirt_conn.close() self.server.server_close() + self.libvirt_conn.close() + super(VMTaskManager, self).shutdown() + #################### # Boilerplate startup code #################### if __name__ == "__main__": - global options - koji.add_file_logger("koji", "/var/log/kojivmd.log") #note we're setting logging params for all of koji* - get_options() + options = get_options() if options.debug: logging.getLogger("koji").setLevel(logging.DEBUG) elif options.verbose: @@ -1742,10 +752,7 @@ if __name__ == "__main__": if options.daemon: #detach koji.daemonize() - main() - # not reached - assert False + main(options, session) elif not options.skip_main: koji.add_stderr_logger("koji") - main() - + main(options, session)