diff --git a/vm/kojikamid b/vm/kojikamid new file mode 100755 index 00000000..b296edbe --- /dev/null +++ b/vm/kojikamid @@ -0,0 +1,177 @@ +#!/usr/bin/python + +# Koji daemon that runs in a Windows VM and executes commands associated +# with a task. +# Copyright (c) 2010 Red Hat +# +# Koji is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; +# version 2.1 of the License. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this software; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: +# Mike Bonnet + +# To register this script as a service on Windows 2008 (with Cygwin 1.7.5 installed) run: +# kojiwind --install +# in a cygwin shell. + +import datetime +import os +import subprocess +import sys +import time +import xmlrpclib +import base64 +import hashlib +import traceback + +MANAGER_PORT = 7000 + +def log(msg): + print >> sys.stderr, '%s: %s' % (datetime.datetime.now().ctime(), msg) + +def run(cmd): + shell = False + if isinstance(cmd, (str, unicode)) and len(cmd.split()) > 1: + shell = True + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, + close_fds=True, shell=shell) + ret = proc.wait() + output = proc.stdout.read() + return ret, output + +def find_net_info(): + """ + Find the network gateway configured for this VM. + """ + ret, output = run(['ipconfig', '/all']) + if ret: + raise RuntimeError, 'error running ipconfig, output was: %s' % output + macaddr = None + gateway = None + for line in output.splitlines(): + line = line.strip() + # take the first values we find + if line.startswith('Physical Address'): + if not macaddr: + macaddr = line.split()[-1] + # format it to be consistent with the libvirt MAC address + macaddr = macaddr.replace('-', ':').lower() + elif line.startswith('Default Gateway'): + if not gateway: + gateway = line.split()[-1] + + # check that we have valid values + if macaddr and len(macaddr) != 17: + macaddr = None + if gateway and (len(gateway) < 7 or len(gateway) > 15): + gateway = None + return macaddr, gateway + +def uploadFile(server, prefix, path): + fobj = file(os.path.join(prefix, path), 'r') + offset = 0 + sum = hashlib.sha1() + while True: + data = fobj.read(131072) + if not data: + break + encoded = base64.b64encode(data) + server.upload(path, offset, encoded) + offset += len(data) + sum.update(data) + fobj.close() + server.verifyChecksum(path, sum.hexdigest(), 'sha1') + +def uploadDir(server, root): + for dirpath, dirnames, filenames in os.walk(root): + for filename in filenames: + filepath = os.path.join(dirpath, filename) + relpath = filepath[len(root) + 1:] + uploadFile(server, root, relpath) + +def main(): + macaddr, gateway = find_net_info() + while not (macaddr and gateway): + # wait for the network connection to come up and get an address + time.sleep(5) + macaddr, gateway = find_net_info() + log('found MAC address %s, connecting to %s:%s' % (macaddr, gateway, MANAGER_PORT)) + server = xmlrpclib.ServerProxy('http://%s:%s/' % (gateway, MANAGER_PORT), allow_none=True) + # we would set a timeout on the socket here, but that is apparently not supported + # by python/cygwin/Windows + task_port = server.getPort(macaddr) + log('found task-specific port %s' % task_port) + server = xmlrpclib.ServerProxy('http://%s:%s/' % (gateway, task_port), allow_none=True) + + ret = 1 + output = 'unknown error' + exc_info = None + + try: + task_info = server.getTaskInfo() + if task_info: + cmd = task_info[0] + os.mkdir('/tmp/output') + log('running command: %s' % cmd) + ret, output = run(cmd) + else: + ret = 1 + output = 'no command provided' + uploadDir(server, '/tmp/output') + except: + exc_info = sys.exc_info() + finally: + if exc_info: + tb = ''.join(traceback.format_exception(*exc_info)) + server.failTask(tb) + elif ret: + server.failTask('"%s" failed, return code was %s, output was %s' % (cmd, ret, output)) + else: + server.closeTask(output) + +def usage(): + print '%s: Runs Koji tasks assigned to a VM' + print ' run with no options to start the daemon' + print + print 'Options:' + print ' --help show this help message and exit' + print ' --install install this daemon as the "kojiwind" Windows service' + print ' --uninstall uninstall the "kojiwind" Windows service' + +if __name__ == '__main__': + prog = os.path.abspath(sys.argv[0]) + if len(sys.argv) > 1: + opt = sys.argv[1] + if opt == '--install': + ret, output = run(['cygrunsrv', '--install', 'kojiwind', + '--path', sys.executable, '--args', prog, + '--type', 'auto', '--dep', 'Dhcp', + '--disp', 'Koji Windows Daemon', + '--desc', 'Runs Koji tasks assigned to a VM']) + if ret: + print 'Error installing kojiwind service, output was: %s' % output + sys.exit(1) + else: + print 'Successfully installed the kojiwind service' + elif opt == '--uninstall': + ret, output = run(['cygrunsrv', '--remove', 'kojiwind']) + if ret: + print 'Error removing the kojiwind service, output was: %s' % output + sys.exit(1) + else: + print 'Successfully removed the kojiwind service' + else: + usage() + else: + main() diff --git a/vm/kojivmd b/vm/kojivmd new file mode 100755 index 00000000..f5fbbf3f --- /dev/null +++ b/vm/kojivmd @@ -0,0 +1,1751 @@ +#!/usr/bin/python + +# Koji virtual machine management daemon +# Copyright (c) 2010 Red Hat +# +# Koji is free software; you can redistribute it and/or +# modify it under the terms of the GNU Lesser General Public +# License as published by the Free Software Foundation; +# version 2.1 of the License. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +# Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public +# License along with this software; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Authors: +# Mike Bonnet + +import koji +import koji.util +import sys +import logging +import os +import xmlrpclib +import signal +import time +import traceback +import subprocess +import libvirt +import libxml2 +import virtinst +import random +import socket +import SimpleXMLRPCServer +import threading +import base64 +import pwd +from ConfigParser import ConfigParser +from optparse import OptionParser +from xmlrpclib import Fault +import errno +try: + import krbV +except ImportError: + pass + + +# Register libvirt handler +def libvirt_callback(ignore, err): + if err[3] != libvirt.VIR_ERR_ERROR: + # Don't log libvirt errors: global error handler will do that + logging.warn("Non-error from libvirt: '%s'" % err[2]) +libvirt.registerErrorHandler(f=libvirt_callback, ctx=None) + + +#################### +# XXX All of this should be factored out into a separate module (koji.task?) and imported +# instead of being copy-and-pasted from builder/kojid, but we're saving that for an +# upcoming major refactor. Just do this for now so we can make progress. +#################### + +class ServerExit(Exception): + """Raised to shutdown the server""" + pass + +def _parseStatus(rv, prefix): + if isinstance(prefix, list) or isinstance(prefix, tuple): + prefix = ' '.join(prefix) + if os.WIFSIGNALED(rv): + return '%s was killed by signal %i' % (prefix, os.WTERMSIG(rv)) + elif os.WIFEXITED(rv): + return '%s exited with status %i' % (prefix, os.WEXITSTATUS(rv)) + else: + return '%s terminated for unknown reasons' % prefix + +def safe_rmtree(path, unmount=False, strict=True): + logger = logging.getLogger("koji.build") + #safe remove: with -xdev the find cmd will not cross filesystems + # (though it will cross bind mounts from the same filesystem) + if not os.path.exists(path): + logger.debug("No such path: %s" % path) + return + if unmount: + umount_all(path) + #first rm -f non-directories + logger.debug('Scrubbing files in %s' % path) + rv = os.system("find '%s' -xdev \\! -type d -print0 |xargs -0 rm -f" % path) + msg = 'file removal failed (code %r) for %s' % (rv,path) + if rv != 0: + logger.warn(msg) + if strict: + raise koji.GenericError, msg + else: + return rv + #them rmdir directories + #with -depth, we start at the bottom and work up + logger.debug('Scrubbing directories in %s' % path) + rv = os.system("find '%s' -xdev -depth -type d -print0 |xargs -0 rmdir" % path) + msg = 'dir removal failed (code %r) for %s' % (rv,path) + if rv != 0: + logger.warn(msg) + if strict: + raise koji.GenericError, msg + return rv + +class TaskManager(object): + + def __init__(self): + self.tasks = {} + self.pids = {} + self.subsessions = {} + self.findHandlers() + self.status = '' + self.ready = False + self.host_id = session.host.getID() + self.logger = logging.getLogger("koji.vm.TaskManager") + + def findHandlers(self): + """Find and index task handlers""" + handlers = {} + for v in globals().values(): + if type(v) == type(BaseTaskHandler) and issubclass(v,BaseTaskHandler): + for method in v.Methods: + handlers[method] = v + self.handlers = handlers + + def scanPlugin(self, plugin): + """Find task handlers in a plugin""" + # XXX - this is a very simple implementation for now. + # it should be improved + for v in vars(plugin).itervalues(): + if type(v) == type(BaseTaskHandler) and issubclass(v,BaseTaskHandler): + for method in v.Methods: + self.handlers[method] = v + + def shutdown(self): + """Attempt to shut down cleanly""" + for task_id in self.pids.keys(): + self.cleanupTask(task_id) + session.host.freeTasks(self.tasks.keys()) + session.host.updateHost(task_load=0.0,ready=False) + + def updateBuildroots(self): + """Handle buildroot cleanup/maintenance + + - examine current buildroots on system + - compare with db + - clean up as needed + - /var/lib/mock + - /etc/mock/koji + """ + local_br = self._scanLocalBuildroots() + #query buildroots in db that are not expired + states = [ koji.BR_STATES[x] for x in ('INIT','WAITING','BUILDING') ] + db_br = session.listBuildroots(hostID=self.host_id,state=tuple(states)) + # index by id + db_br = dict([(row['id'],row) for row in db_br]) + st_expired = koji.BR_STATES['EXPIRED'] + for id, br in db_br.items(): + task_id = br['task_id'] + if task_id is None: + # not associated with a task + # this makes no sense now, but may in the future + self.logger.warn("Expiring taskless buildroot: %(id)i/%(tag_name)s/%(arch)s" % br) + session.host.setBuildRootState(id,st_expired) + elif not self.tasks.has_key(task_id): + #task not running - expire the buildroot + #TODO - consider recycling hooks here (with strong sanity checks) + self.logger.info("Expiring buildroot: %(id)i/%(tag_name)s/%(arch)s" % br) + self.logger.debug("Buildroot task: %r, Current tasks: %r" % (task_id,self.tasks.keys())) + session.host.setBuildRootState(id,st_expired) + continue + # get info on local_only buildroots (most likely expired) + local_only = [id for id in local_br.iterkeys() if not db_br.has_key(id)] + if local_only: + missed_br = session.listBuildroots(buildrootID=tuple(local_only)) + #get all the task info in one call + tasks = [] + for br in missed_br: + task_id = br['task_id'] + if task_id: + tasks.append(task_id) + #index + missed_br = dict([(row['id'],row) for row in missed_br]) + tasks = dict([(row['id'],row) for row in session.getTaskInfo(tasks)]) + for id in local_only: + # Cleaning options + # - wait til later + # - "soft" clean (leaving empty root/ dir) + # - full removal + data = local_br[id] + br = missed_br.get(id) + if not br: + self.logger.warn("%(name)s: not in db" % data) + continue + desc = "%(id)i/%(tag_name)s/%(arch)s" % br + if not br['retire_ts']: + self.logger.warn("%s: no retire timestamp" % desc) + continue + age = time.time() - br['retire_ts'] + self.logger.debug("Expired/stray buildroot: %s" % desc) + if br and br['task_id']: + task = tasks.get(br['task_id']) + if not task: + self.logger.warn("%s: invalid task %s" % (desc, br['task_id'])) + continue + if (task['state'] == koji.TASK_STATES['FAILED'] and age < 3600 * 4): + #XXX - this could be smarter + # keep buildroots for failed tasks around for a little while + self.logger.debug("Keeping failed buildroot: %s" % desc) + continue + topdir = data['dir'] + rootdir = None + if topdir: + rootdir = "%s/root" % topdir + try: + st = os.lstat(rootdir) + except OSError, e: + if e.errno == errno.ENOENT: + rootdir = None + else: + self.logger.warn("%s: %s" % (desc, e)) + continue + else: + age = min(age, time.time() - st.st_mtime) + #note: https://bugzilla.redhat.com/bugzilla/show_bug.cgi?id=192153) + #If rpmlib is installing in this chroot, removing it entirely + #can lead to a world of hurt. + #We remove the rootdir contents but leave the rootdir unless it + #is really old + if age > 3600*24: + #dir untouched for a day + self.logger.info("Removing buildroot: %s" % desc) + if topdir and safe_rmtree(topdir, unmount=True, strict=False) != 0: + continue + #also remove the config + try: + os.unlink(data['cfg']) + except OSError, e: + self.logger.warn("%s: can't remove config: %s" % (desc, e)) + elif age > 120: + if rootdir: + try: + flist = os.listdir(rootdir) + except OSError, e: + self.logger.warn("%s: can't list rootdir: %s" % (desc, e)) + continue + if flist: + self.logger.info("%s: clearing rootdir" % desc) + for fn in flist: + safe_rmtree("%s/%s" % (rootdir,fn), unmount=True, strict=False) + resultdir = "%s/result" % topdir + if os.path.isdir(resultdir): + self.logger.info("%s: clearing resultdir" % desc) + safe_rmtree(resultdir, unmount=True, strict=False) + else: + self.logger.debug("Recent buildroot: %s: %i seconds" % (desc,age)) + self.logger.debug("Local buildroots: %d" % len(local_br)) + self.logger.debug("Active buildroots: %d" % len(db_br)) + self.logger.debug("Expired/stray buildroots: %d" % len(local_only)) + + def _scanLocalBuildroots(self): + #XXX + configdir = '/etc/mock/koji' + buildroots = {} + for f in os.listdir(configdir): + if not f.endswith('.cfg'): + continue + fn = "%s/%s" % (configdir,f) + if not os.path.isfile(fn): + continue + fo = file(fn,'r') + id = None + name = None + for n in xrange(10): + # data should be in first few lines + line = fo.readline() + if line.startswith('# Koji buildroot id:'): + try: + id = int(line.split(':')[1]) + except ValueError,IndexError: + continue + if line.startswith('# Koji buildroot name:'): + try: + name = line.split(':')[1].strip() + except ValueError,IndexError: + continue + if id is None or name is None: + continue + # see if there's a dir for the buildroot + vardir = "/var/lib/mock/%s" % name + #XXX + buildroots[id] = {} + buildroots[id]['name'] = name + buildroots[id]['cfg'] = fn + buildroots[id]['dir'] = None + if os.path.isdir(vardir): + buildroots[id]['dir'] = vardir + return buildroots + + def updateTasks(self): + """Read and process task statuses from server + + The processing we do is: + 1) clean up after tasks that are not longer active: + * kill off processes + * retire buildroots + * remove buildroots + - with some possible exceptions + 2) wake waiting tasks if appropriate + """ + tasks = {} + stale = [] + task_load = 0.0 + if self.pids: + self.logger.info("pids: %r" % self.pids) + for task in session.host.getHostTasks(): + self.logger.info("open task: %r" % task) + # the tasks returned are those that are open and locked + # by this host. + id = task['id'] + if not self.pids.has_key(id): + #We don't have a process for this + #Expected to happen after a restart, otherwise this is an error + stale.append(id) + continue + tasks[id] = task + if task.get('alert',False): + #wake up the process + self.logger.info("Waking up task: %r" % task) + os.kill(self.pids[id],signal.SIGUSR2) + if not task['waiting']: + task_load += task['weight'] + self.logger.debug("Task Load: %s" % task_load) + self.task_load = task_load + self.tasks = tasks + self.logger.debug("Current tasks: %r" % self.tasks) + if len(stale) > 0: + #A stale task is one which is opened to us, but we know nothing + #about). This will happen after a daemon restart, for example. + self.logger.info("freeing stale tasks: %r" % stale) + session.host.freeTasks(stale) + for id, pid in self.pids.items(): + if self._waitTask(id, pid): + # the subprocess handles most everything, we just need to clear things out + if self.cleanupTask(id, wait=False): + del self.pids[id] + if self.tasks.has_key(id): + del self.tasks[id] + for id, pid in self.pids.items(): + if not tasks.has_key(id): + # expected to happen when: + # - we are in the narrow gap between the time the task + # records its result and the time the process actually + # exits. + # - task is canceled + # - task is forcibly reassigned/unassigned + tinfo = session.getTaskInfo(id) + if tinfo is None: + raise koji.GenericError, "Invalid task %r (pid %r)" % (id,pid) + elif tinfo['state'] == koji.TASK_STATES['CANCELED']: + self.logger.info("Killing canceled task %r (pid %r)" % (id,pid)) + if self.cleanupTask(id): + del self.pids[id] + elif tinfo['host_id'] != self.host_id: + self.logger.info("Killing reassigned task %r (pid %r)" % (id,pid)) + if self.cleanupTask(id): + del self.pids[id] + else: + self.logger.info("Lingering task %r (pid %r)" % (id,pid)) + + def getNextTask(self): + self.ready = self.readyForTask() + session.host.updateHost(self.task_load,self.ready) + if not self.ready: + self.logger.info("Not ready for task") + return False + hosts, tasks = session.host.getLoadData() + self.logger.debug("Load Data:") + self.logger.debug(" hosts: %r" % hosts) + self.logger.debug(" tasks: %r" % tasks) + #now we organize this data into channel-arch bins + bin_hosts = {} #hosts indexed by bin + bins = {} #bins for this host + our_avail = None + for host in hosts: + host['bins'] = [] + if host['id'] == self.host_id: + #note: task_load reported by server might differ from what we + #sent due to precision variation + our_avail = host['capacity'] - host['task_load'] + for chan in host['channels']: + for arch in host['arches'].split() + ['noarch']: + bin = "%s:%s" % (chan,arch) + bin_hosts.setdefault(bin,[]).append(host) + if host['id'] == self.host_id: + bins[bin] = 1 + self.logger.debug("bins: %r" % bins) + if our_avail is None: + self.logger.info("Server did not report this host. Are we disabled?") + return False + elif not bins: + self.logger.info("No bins for this host. Missing channel/arch config?") + return False + #sort available capacities for each of our bins + avail = {} + for bin in bins.iterkeys(): + avail[bin] = [host['capacity'] - host['task_load'] for host in bin_hosts[bin]] + avail[bin].sort() + avail[bin].reverse() + for task in tasks: + # note: tasks are in priority order + self.logger.debug("task: %r" % task) + if self.tasks.has_key(task['id']): + # we were running this task, but it apparently has been + # freed or reassigned. We can't do anything with it until + # updateTasks notices this and cleans up. + self.logger.debug("Task %(id)s freed or reassigned", task) + continue + if task['state'] == koji.TASK_STATES['ASSIGNED']: + self.logger.debug("task is assigned") + if self.host_id == task['host_id']: + #assigned to us, we can take it regardless + if self.takeTask(task['id']): + return True + elif task['state'] == koji.TASK_STATES['FREE']: + bin = "%(channel_id)s:%(arch)s" % task + self.logger.debug("task is free, bin=%r" % bin) + if not bins.has_key(bin): + continue + #see where our available capacity is compared to other hosts for this bin + #(note: the hosts in this bin are exactly those that could + #accept this task) + bin_avail = avail.get(bin, [0]) + self.logger.debug("available capacities for bin: %r" % bin_avail) + median = bin_avail[(len(bin_avail)-1)/2] + self.logger.debug("ours: %.2f, median: %.2f" % (our_avail, median)) + if our_avail < median: + self.logger.debug("Skipping - available capacity in lower half") + #decline for now and give the upper half a chance + return False + #otherwise, we attempt to open the task + if self.takeTask(task['id']): + return True + else: + #should not happen + raise Exception, "Invalid task state reported by server" + return False + + def _waitTask(self, task_id, pid=None): + """Wait (nohang) on the task, return true if finished""" + if pid is None: + pid = self.pids.get(task_id) + if not pid: + raise koji.GenericError, "No pid for task %i" % task_id + prefix = "Task %i (pid %i)" % (task_id, pid) + try: + (childpid, status) = os.waitpid(pid, os.WNOHANG) + except OSError, e: + #check errno + if e.errno != errno.ECHILD: + #should not happen + raise + #otherwise assume the process is gone + self.logger.info("%s: %s" % (prefix, e)) + return True + if childpid != 0: + self.logger.info(_parseStatus(status, prefix)) + return True + return False + + def _doKill(self, task_id, pid, cmd, sig, timeout, pause): + """ + Kill the process with the given process ID. + Return True if the process is successfully killed in + the given timeout, False otherwise. + """ + self.logger.info('Checking "%s" (pid %i, taskID %i)...' % (cmd, pid, task_id)) + execname = cmd.split()[0] + signaled = False + t = 0.0 + while True: + status = self._getStat(pid) + if status and status[1] == cmd and status[2] != 'Z': + self.logger.info('%s (pid %i, taskID %i) is running' % (execname, pid, task_id)) + else: + if signaled: + self.logger.info('%s (pid %i, taskID %i) was killed by signal %i' % (execname, pid, task_id, sig)) + else: + self.logger.info('%s (pid %i, taskID %i) exited' % (execname, pid, task_id)) + return True + + if t >= timeout: + self.logger.warn('Failed to kill %s (pid %i, taskID %i) with signal %i' % + (execname, pid, task_id, sig)) + return False + + try: + os.kill(pid, sig) + except OSError, e: + # process probably went away, we'll find out on the next iteration + self.logger.info('Error sending signal %i to %s (pid %i, taskID %i): %s' % + (sig, execname, pid, task_id, e)) + else: + signaled = True + self.logger.info('Sent signal %i to %s (pid %i, taskID %i)' % + (sig, execname, pid, task_id)) + + time.sleep(pause) + t += pause + + def _getStat(self, pid): + """ + Get the stat info for the given pid. + Return a list of all the fields in /proc//stat. + The second entry will contain the full command-line instead of + just the command name. + If the process does not exist, return None. + """ + try: + proc_path = '/proc/%i/stat' % pid + if not os.path.isfile(proc_path): + return None + proc_file = file(proc_path) + procstats = [not field.isdigit() and field or int(field) for field in proc_file.read().split()] + proc_file.close() + + cmd_path = '/proc/%i/cmdline' % pid + if not os.path.isfile(cmd_path): + return None + cmd_file = file(cmd_path) + procstats[1] = cmd_file.read().replace('\0', ' ').strip() + cmd_file.close() + if not procstats[1]: + return None + + return procstats + except IOError, e: + # process may have already gone away + return None + + def _childPIDs(self, pid): + """Recursively get the children of the process with the given ID. + Return a list containing the process IDs of the children + in breadth-first order, without duplicates.""" + statsByPPID = {} + pidcmd = None + for procdir in os.listdir('/proc'): + if not procdir.isdigit(): + continue + procid = int(procdir) + procstats = self._getStat(procid) + if not procstats: + continue + statsByPPID.setdefault(procstats[3], []).append(procstats) + if procid == pid: + pidcmd = procstats[1] + + pids = [] + if pidcmd: + # only append the pid if it still exists + pids.append((pid, pidcmd)) + + parents = [pid] + while parents: + for ppid in parents[:]: + for procstats in statsByPPID.get(ppid, []): + # get the /proc entries with ppid as their parent, and append their pid to the list, + # then recheck for their children + # pid is the 0th field, ppid is the 3rd field + pids.append((procstats[0], procstats[1])) + parents.append(procstats[0]) + parents.remove(ppid) + + return pids + + def _killChildren(self, task_id, children, sig=signal.SIGTERM, timeout=2.0, pause=1.0): + """ + Kill child processes of the given task, as specified in the children list, + by sending sig. + Retry every pause seconds, within timeout. + Remove successfully killed processes from the "children" list. + """ + for childpid, cmd in children[::-1]: + # iterate in reverse order so processes whose children are killed might have + # a chance to cleanup before they're killed + if self._doKill(task_id, childpid, cmd, sig, timeout, pause): + children.remove((childpid, cmd)) + + def cleanupTask(self, task_id, wait=True): + """Clean up after task + + - kill children + - expire session + + Return True if all children were successfully killed, False otherwise. + """ + pid = self.pids.get(task_id) + if not pid: + raise koji.GenericError, "No pid for task %i" % task_id + children = self._childPIDs(pid) + if children: + # send SIGINT once to let mock mock try to clean up + self._killChildren(task_id, children, sig=signal.SIGINT, pause=3.0) + if children: + self._killChildren(task_id, children) + if children: + self._killChildren(task_id, children, sig=signal.SIGKILL, timeout=3.0) + + #expire the task's subsession + session_id = self.subsessions.get(task_id) + if session_id: + self.logger.info("Expiring subsession %i (task %i)" % (session_id, task_id)) + try: + session.logoutChild(session_id) + del self.subsessions[task_id] + except: + #not much we can do about it + pass + if wait: + return self._waitTask(task_id, pid) + else: + # task has already been waited on, and we've cleaned + # up as much as we can + return True + + def checkSpace(self): + """See if we have enough space to accept another job""" + global options + br_path = options.mockdir + if not os.path.exists(br_path): + self.logger.error("No such directory: %s" % br_path) + raise IOError, "No such directory: %s" % br_path + fs_stat = os.statvfs(br_path) + available = fs_stat.f_bavail * fs_stat.f_bsize + availableMB = available / 1024 / 1024 + self.logger.debug("disk space available in '%s': %i MB", br_path, availableMB) + if availableMB < options.minspace: + self.status = "Insufficient disk space: %i MB, %i MB required" % (availableMB, options.minspace) + self.logger.warn(self.status) + return False + return True + + def readyForTask(self): + """Determine if the system is ready to accept a new task. + + This function measures the system load and tries to determine + if there is room to accept a new task.""" + # key resources to track: + # disk_space + # df -P path + # df -iP path ? + # memory (meminfo/vmstat) + # vmstat fields 3-6 (also 7-8 for swap) + # http://www.redhat.com/advice/tips/meminfo.html + # cpu cycles (vmstat?) + # vmstat fields 13-16 (and others?) + # others?: + # io (iostat/vmstat) + # network (netstat?) + global options + hostdata = session.host.getHost() + self.logger.debug('hostdata: %r' % hostdata) + if not hostdata['enabled']: + self.status = "Host is disabled" + self.logger.info(self.status) + return False + if self.task_load > hostdata['capacity']: + self.status = "Over capacity" + self.logger.info("Task load (%.2f) exceeds capacity (%.2f)" % (self.task_load, hostdata['capacity'])) + return False + if len(self.tasks) >= options.maxjobs: + # This serves as a backup to the capacity check and prevents + # a tremendous number of low weight jobs from piling up + self.status = "Full queue" + self.logger.info(self.status) + return False + if not self.checkSpace(): + # checkSpace() does its own logging + return False + loadavgs = os.getloadavg() + # this likely treats HT processors the same as real ones + # but that's fine, it's a conservative test + maxload = 4.0 * os.sysconf('SC_NPROCESSORS_ONLN') + if loadavgs[0] > maxload: + self.status = "Load average %.2f > %.2f" % (loadavgs[0], maxload) + self.logger.info(self.status) + return False + #XXX - add more checks + return True + + def takeTask(self,task_id): + """Attempt to open the specified task + + Returns True if successful, False otherwise + """ + self.logger.info("Attempting to take task %s" %task_id) + data = session.host.openTask(task_id) + if data is None: + self.logger.warn("Could not open") + return False + if not data.has_key('request') or data['request'] is None: + self.logger.warn("Task '%s' has no request" % task_id) + return False + id = data['id'] + request = data['request'] + self.tasks[id] = data + params, method = xmlrpclib.loads(request) + if self.handlers.has_key(method): + handlerClass = self.handlers[method] + elif self.handlers.has_key('default'): + handlerClass = self.handlers['default'] + else: + raise koji.GenericError, "No handler found for method '%s'" % method + if issubclass(handlerClass, BaseTaskHandler): + #new style handler needs session and options passed + # handler = handlerClass(id,method,params,session,options) + handler = handlerClass(id,method,params) + else: + handler = handlerClass(id,method,params) + # set weight + session.host.setTaskWeight(task_id,handler.weight()) + if handler.Foreground: + self.logger.info("running task in foreground") + handler.setManager(self) + self.runTask(handler) + else: + pid, session_id = self.forkTask(handler) + self.pids[id] = pid + self.subsessions[id] = session_id + return True + + def forkTask(self,handler): + global session + #get the subsession before we fork + newhub = session.subsession() + session_id = newhub.sinfo['session-id'] + pid = os.fork() + if pid: + newhub._forget() + return pid, session_id + #in no circumstance should we return after the fork + #nor should any exceptions propagate past here + try: + session._forget() + #set process group + os.setpgrp() + #use the subsession + session = newhub + if hasattr(handler, 'session'): + handler.session = session + #set a do-nothing handler for sigusr2 + signal.signal(signal.SIGUSR2,lambda *args: None) + self.runTask(handler) + finally: + #diediedie + try: + session.logout() + finally: + os._exit(0) + + def runTask(self,handler): + fail = False + try: + response = (handler.run(),) + # note that we wrap response in a singleton tuple + response = xmlrpclib.dumps(response, methodresponse=1, allow_none=1) + self.logger.info("RESPONSE: %r" % response) + except Fault, fault: + fail = True + response = xmlrpclib.dumps(fault) + tb = ''.join(traceback.format_exception(*sys.exc_info())).replace(r"\n", "\n") + self.logger.warn("FAULT:\n%s" % tb) + except (SystemExit,ServerExit,KeyboardInterrupt): + #we do not trap these + raise + except: + fail = True + # report exception back to server + e_class, e = sys.exc_info()[:2] + faultCode = getattr(e_class,'faultCode',1) + if issubclass(e_class, koji.GenericError): + #just pass it through + tb = str(e) + self.logger.warn(tb) + else: + tb = ''.join(traceback.format_exception(*sys.exc_info())) + self.logger.warn("TRACEBACK: %s" % tb) + response = xmlrpclib.dumps(xmlrpclib.Fault(faultCode, tb)) + + if fail: + session.host.failTask(handler.id, response) + else: + session.host.closeTask(handler.id, response) + +class BaseTaskHandler(object): + """The base class for task handlers + + Each task handler is a class, a new instance of which is created + to handle each task. + """ + + # list of methods the class can handle + Methods = [] + + # Options: + Foreground = False + + def __init__(self, id, method, params, workdir=None): + global options + self.id = id #task id + if method not in self.Methods: + raise koji.GenericError, 'method "%s" is not supported' % method + self.method = method + # handle named parameters + self.params,self.opts = koji.decode_args(*params) + if workdir is None: + workdir = "%s/%s" % (options.workdir, koji.pathinfo.taskrelpath(id)) + self.workdir = workdir + self.logger = logging.getLogger("koji.vm.BaseTaskHandler") + + def setManager(self,manager): + """Set the manager attribute + + This is only used for foreground tasks to give them access + to their task manager. + """ + if not self.Foreground: + return + self.manager = manager + + def handler(self): + """(abstract) the handler for the task.""" + raise NotImplementedError + + def run(self): + """Execute the task""" + self.createWorkdir() + try: + return self.handler(*self.params,**self.opts) + finally: + self.removeWorkdir() + + _taskWeight = 1.0 + + def weight(self): + """Return the weight of the task. + + This is run by the taskmanager before the task is run to determine + the weight of the task. The weight is an abstract measure of the + total load the task places on the system while running. + + A task may set _taskWeight for a constant weight different from 1, or + override this function for more complicated situations. + + Note that task weight is partially ignored while the task is sleeping. + """ + return getattr(self,'_taskWeight',1.0) + + def createWorkdir(self): + if self.workdir is None: + return + self.removeWorkdir() + os.makedirs(self.workdir) + + def removeWorkdir(self): + if self.workdir is None: + return + safe_rmtree(self.workdir, unmount=False, strict=True) + #os.spawnvp(os.P_WAIT, 'rm', ['rm', '-rf', self.workdir]) + + def wait(self, subtasks=None, all=False, failany=False): + """Wait on subtasks + + subtasks is a list of integers (or an integer). If more than one subtask + is specified, then the default behavior is to return when any of those + tasks complete. However, if all is set to True, then it waits for all of + them to complete. If all and failany are both set to True, then each + finished task will be checked for failure, and a failure will cause all + of the unfinished tasks to be cancelled. + + special values: + subtasks = None specify all subtasks + + Implementation notes: + The build daemon forks all tasks as separate processes. This function + uses signal.pause to sleep. The main process watches subtasks in + the database and will send the subprocess corresponding to the + subtask a SIGUSR2 to wake it up when subtasks complete. + """ + if isinstance(subtasks,int): + # allow single integer w/o enclosing list + subtasks = [subtasks] + session.host.taskSetWait(self.id,subtasks) + self.logger.debug("Waiting on %r" % subtasks) + while True: + finished, unfinished = session.host.taskWait(self.id) + if len(unfinished) == 0: + #all done + break + elif len(finished) > 0: + if all: + if failany: + failed = False + for task in finished: + try: + result = session.getTaskResult(task) + except (koji.GenericError, Fault), task_error: + self.logger.info("task %s failed or was canceled" % task) + failed = True + break + if failed: + self.logger.info("at least one task failed or was canceled, cancelling unfinished tasks") + session.cancelTaskChildren(self.id) + # reraise the original error now, rather than waiting for + # an error in taskWaitResults() + raise task_error + else: + # at least one done + break + # signal handler set by TaskManager.forkTask + self.logger.debug("Pausing...") + signal.pause() + # main process will wake us up with SIGUSR2 + self.logger.debug("...waking up") + self.logger.debug("Finished waiting") + return dict(session.host.taskWaitResults(self.id,subtasks)) + + def getUploadDir(self): + return koji.pathinfo.taskrelpath(self.id) + + def uploadFile(self, filename, relPath=None, remoteName=None): + """Upload the file with the given name to the task output directory + on the hub.""" + uploadPath = self.getUploadDir() + if relPath: + relPath = relPath.strip('/') + uploadPath += '/' + relPath + # Only upload files with content + if os.path.isfile(filename) and os.stat(filename).st_size > 0: + session.uploadWrapper(filename, uploadPath, remoteName) + + def uploadTree(self, dirpath, flatten=False): + """Upload the directory tree at dirpath to the task directory on the + hub, preserving the directory structure""" + dirpath = dirpath.rstrip('/') + for path, dirs, files in os.walk(dirpath): + if flatten: + relpath = None + else: + relpath = path[len(dirpath) + 1:] + for filename in files: + self.uploadFile(os.path.join(path, filename), relpath) + + def localPath(self, relpath): + """Return a local path to a remote file. + + If the file is on an nfs mount, use that, otherwise download a copy""" + if options.topurl: + fn = "%s/local/%s" % (self.workdir, relpath) + if os.path.exists(fn): + # We've already downloaded this file, + # just return the existing local path + return fn + self.logger.debug("Downloading %s", relpath) + url = "%s/%s" % (options.topurl, relpath) + fsrc = urllib2.urlopen(url) + if not os.path.exists(os.path.dirname(fn)): + os.makedirs(os.path.dirname(fn)) + fdst = file(fn, 'w') + shutil.copyfileobj(fsrc, fdst) + fsrc.close() + fdst.close() + else: + fn = "%s/%s" % (options.topdir, relpath) + return fn + + def subtask(self, method, arglist, **opts): + return session.host.subtask(method, arglist, self.id, **opts) + + def subtask2(self, __taskopts, __method, *args, **kwargs): + return session.host.subtask2(self.id, __taskopts, __method, *args, **kwargs) + + def find_arch(self, arch, host, tag): + """ + For noarch tasks, find a canonical arch that is supported by both the host and tag. + If the arch is anything other than noarch, return it unmodified. + """ + if arch != "noarch": + return arch + + # We need a concrete arch. Pick one that: + # a) this host can handle + # b) the build tag can support + # c) is canonical + host_arches = host['arches'] + if not host_arches: + raise koji.BuildError, "No arch list for this host: %s" % host['name'] + tag_arches = tag['arches'] + if not tag_arches: + raise koji.BuildError, "No arch list for tag: %s" % tag['name'] + # index canonical host arches + host_arches = set([koji.canonArch(a) for a in host_arches.split()]) + # index canonical tag arches + tag_arches = set([koji.canonArch(a) for a in tag_arches.split()]) + # find the intersection of host and tag arches + common_arches = list(host_arches & tag_arches) + if common_arches: + # pick one of the common arches randomly + # need to re-seed the prng or we'll get the same arch every time, + # because we just forked from a common parent + random.seed() + arch = random.choice(common_arches) + self.logger.info('Valid arches: %s, using: %s' % (' '.join(common_arches), arch)) + return arch + else: + # no overlap + raise koji.BuildError, "host %s (%s) does not support any arches of tag %s (%s)" % \ + (host['name'], ', '.join(host_arches), tag['name'], ', '.join(tag_arches)) + +class DefaultTask(BaseTaskHandler): + """Used when no matching method is found""" + Methods = ['default'] + _taskWeight = 0.1 + def __init__(self, id, method, params, workdir=None): + self.id = id #task id + self.method = method + self.params = params + self.workdir = None + self.opts = {} + def handler(self,*args,**opts): + raise koji.GenericError, "Invalid method: %s" % self.method + +#################### +# XXX End heinous copy-and-paste (mostly) +#################### + +def get_options(): + """process options from command line and config file""" + global options + # parse command line args + parser = OptionParser() + parser.add_option("-c", "--config", dest="configFile", + help="use alternate configuration file", metavar="FILE", + default="/etc/kojivmd/kojivmd.conf") + parser.add_option("--user", help="specify user") + parser.add_option("--password", help="specify password") + parser.add_option("-f", "--fg", dest="daemon", + action="store_false", default=True, + help="run in foreground") + parser.add_option("--force-lock", action="store_true", default=False, + help="force lock for exclusive session") + parser.add_option("-v", "--verbose", action="store_true", default=False, + help="show verbose output") + parser.add_option("-d", "--debug", action="store_true", default=False, + help="show debug output") + parser.add_option("--debug-task", action="store_true", default=False, + help="enable debug output for tasks") + parser.add_option("--debug-xmlrpc", action="store_true", default=False, + help="show xmlrpc debug output") + parser.add_option("--skip-main", action="store_true", default=False, + help="don't actually run main") + parser.add_option("--maxjobs", type='int', help="Specify maxjobs") + parser.add_option("--sleeptime", type='int', help="Specify the polling interval") + parser.add_option("--admin-emails", help="Address(es) to send error notices to") + parser.add_option("--workdir", help="Specify workdir") + parser.add_option("--pluginpath", help="Specify plugin search path") + parser.add_option("--plugin", action="append", help="Load specified plugin") + parser.add_option("-s", "--server", help="url of XMLRPC server") + (options, args) = parser.parse_args() + + if args: + parser.error("incorrect number of arguments") + #not reached + assert False + + # load local config + config = ConfigParser() + config.read(options.configFile) + for x in config.sections(): + if x != 'kojivmd': + quit('invalid section found in config file: %s' % x) + defaults = {'sleeptime': 15, + 'maxjobs': 5, + 'minspace': 8192, + 'minmem': 4096, + 'vmuser': 'qemu', + 'admin_emails': None, + 'workdir': '/tmp/koji', + 'imagedir': '/var/lib/libvirt/images', + 'pluginpath': '/usr/lib/koji-vm-plugins', + 'privaddr': '192.168.122.1', + 'portbase': 7000, + 'smtphost': 'example.com', + 'from_addr': 'Koji Build System ', + 'krb_principal': None, + 'host_principal_format': 'compile/%s@EXAMPLE.COM', + 'keytab': '/etc/kojivmd/kojivmd.keytab', + 'ccache': '/var/tmp/kojivmd.ccache', + 'server': None, + 'user': None, + 'password': None, + 'retry_interval': 60, + 'max_retries': 120, + 'offline_retry': True, + 'offline_retry_interval': 120, + 'cert': '/etc/kojivmd/client.crt', + 'ca': '/etc/kojivmd/clientca.crt', + 'serverca': '/etc/kojivmd/serverca.crt'} + if config.has_section('kojivmd'): + for name, value in config.items('kojivmd'): + if name in ['sleeptime', 'maxjobs', 'minspace', 'minmem', + 'retry_interval', 'max_retries', 'offline_retry_interval', + 'portbase']: + try: + defaults[name] = int(value) + except ValueError: + quit("value for %s option must be a valid integer" % name) + elif name in ['offline_retry']: + defaults[name] = config.getboolean('kojivmd', name) + elif name in ['plugin', 'plugins']: + defaults['plugin'] = value.split() + elif name in defaults.keys(): + defaults[name] = value + else: + quit("unknown config option: %s" % name) + for name, value in defaults.items(): + if getattr(options, name, None) is None: + setattr(options, name, value) + + #make sure workdir exists + if not os.path.exists(options.workdir): + koji.ensuredir(options.workdir) + + if not options.server: + parser.error("--server argument required") + +def quit(msg=None, code=1): + if msg: + logging.getLogger("koji.vm").error(msg) + sys.stderr.write('%s\n' % msg) + sys.stderr.flush() + sys.exit(code) + +def main(): + global session + global options + logger = logging.getLogger("koji.vm") + logger.info('Starting up') + tm = VMTaskManager() + if options.plugin: + #load plugins + pt = koji.plugin.PluginTracker(path=options.pluginpath.split(':')) + for name in options.plugin: + logger.info('Loading plugin: %s' % name) + tm.scanPlugin(pt.load(name)) + def shutdown(*args): + raise SystemExit + signal.signal(signal.SIGTERM,shutdown) + taken = False + tm.cleanupAllVMs() + while 1: + try: + tm.updateTasks() + taken = tm.getNextTask() + tm.cleanupExpiredVMs() + except (SystemExit,ServerExit,KeyboardInterrupt): + logger.warn("Exiting") + break + except koji.AuthExpired: + logger.error('Session expired') + break + except koji.RetryError: + raise + except: + # XXX - this is a little extreme + # log the exception and continue + logger.error(''.join(traceback.format_exception(*sys.exc_info()))) + try: + if not taken: + # Only sleep if we didn't take a task, otherwise retry immediately. + # The load-balancing code in getNextTask() will prevent a single builder + # from getting overloaded. + time.sleep(options.sleeptime) + except (SystemExit,KeyboardInterrupt): + logger.warn("Exiting") + break + logger.warn("Shutting down, please wait...") + tm.shutdown() + session.logout() + sys.exit(0) + + +#################### +# Tasks for handling VM lifecycle +#################### + +class DaemonXMLRPCServer(SimpleXMLRPCServer.SimpleXMLRPCServer): + allow_reuse_address = True + + def __init__(self, addr, port): + SimpleXMLRPCServer.SimpleXMLRPCServer.__init__(self, (addr, port), logRequests=False) + self.logger = logging.getLogger('koji.vm.' + self.__class__.__name__) + self.socket.settimeout(5) + self.active = True + + def server_close(self): + self.active = False + SimpleXMLRPCServer.SimpleXMLRPCServer.server_close(self) + + def handle_while_active(self): + while self.active: + try: + conn, (ipaddr, port) = self.get_request() + self.logger.info('request from %s:%s' % (ipaddr, port)) + if self.verify_request(conn, (ipaddr, port)): + try: + self.process_request(conn, (ipaddr, port)) + finally: + self.close_request(conn) + except socket.timeout: + pass + except: + tb = ''.join(traceback.format_exception(*sys.exc_info())) + self.logger.error('Error handling requests: %s' % tb) + +class TaskXMLRPCServer(DaemonXMLRPCServer): + + def __init__(self, addr, port, task_handler): + DaemonXMLRPCServer.__init__(self, addr, port) + self.register_function(task_handler.getTaskInfo) + self.register_function(task_handler.closeTask) + self.register_function(task_handler.failTask) + self.register_function(task_handler.upload) + self.register_function(task_handler.verifyChecksum) + +class VMTask(BaseTaskHandler): + """ + Handles the startup, state-tracking, and shutdown of a VM + for the purposes for executing a single task. + """ + + Methods = ['vmExec'] + _taskWeight = 3.0 + CLONE_PREFIX = 'koji-clone-' + QCOW2_EXT = '.qcow2' + + def __init__(self, *args, **kw): + global options + super(VMTask, self).__init__(*args, **kw) + self.task_manager = xmlrpclib.ServerProxy('http://%s:%s/' % (options.privaddr, options.portbase), + allow_none=True) + self.port = None + self.server = None + self.task_info = None + self.output = None + self.success = None + + def mkqcow2(self, clone_name, source_disk, disk_num): + global options + new_name = clone_name + '-disk-' + str(disk_num) + self.QCOW2_EXT + new_path = os.path.join(options.imagedir, new_name) + cmd = ['/usr/bin/qemu-img', 'create', '-b', source_disk, '-f', 'qcow2', new_path] + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True) + ret = proc.wait() + if ret: + output = proc.stdout.read() + raise koji.BuildError, 'unable to create qcow2 image, "%s" returned %s; output was: %s' % \ + (' '.join(cmd), ret, output) + vm_user = pwd.getpwnam(options.vmuser) + os.chown(new_path, vm_user.pw_uid, vm_user.pw_gid) + return new_path + + def fixDriverXML(self, xml): + doc = libxml2.parseDoc(xml) + ctx = doc.xpathNewContext() + disks = ctx.xpathEval('/domain/devices/disk[@device="disk" and @type="file"]') + for disk in disks: + drivers = disk.xpathEval('driver') + if drivers and drivers[0].hasProp('type'): + source = disk.xpathEval('source')[0] + srcfile = os.path.basename(source.prop('file')) + if srcfile.startswith(self.CLONE_PREFIX) and srcfile.endswith(self.QCOW2_EXT): + drivers[0].setProp('type', 'qcow2') + fixed_xml = str(doc) + ctx.xpathFreeContext() + doc.freeDoc() + return fixed_xml + + def clone(self, conn, name): + """ + Clone the VM named "name" and return the name of the cloned VM. + All disks will be qcow2 images backed by the storage of the original + VM. The original VM must be shutdown, or this will raise an error. + """ + clone_name = self.CLONE_PREFIX + str(self.id) + '-' + name + design = virtinst.CloneManager.CloneDesign(conn) + design.original_guest = name + design.clone_name = clone_name + design.setup_original() + + i = 0 + for orig_disk in design.original_devices: + new_disk = self.mkqcow2(clone_name, orig_disk, i) + design.clone_devices = new_disk + i += 1 + + design.setup_clone() + # The CloneManager API doesn't fix the type= of the entry in + # the XML (if it exists), so do it manually. + clone_xml = self.fixDriverXML(design.clone_xml) + conn.defineXML(clone_xml) + + return clone_name + + def macAddr(self, vm): + """ + Return the MAC address of the first network interface configured for the given VM. + """ + doc = libxml2.parseDoc(vm.XMLDesc(0)) + ctx = doc.xpathNewContext() + nodelist = ctx.xpathEval('/domain/devices/interface[@type="network"]/mac') + if not nodelist: + raise koji.BuildError, 'no network interfaces configured for %s' % vm.name() + addr = nodelist[0].prop('address') + ctx.xpathFreeContext() + doc.freeDoc() + return addr + + def getTaskInfo(self): + """ + Get the command-line to run in the VM. + """ + return self.task_info + + def upload(self, path, offset, contents): + local_path = os.path.abspath(os.path.join(self.workdir, path)) + if not local_path.startswith(self.workdir): + raise koji.BuildError, 'invalid upload path: %s' % path + koji.ensuredir(os.path.dirname(local_path)) + # accept offset as a str to avoid problems with files larger than 2**32 + offset = int(offset) + if offset == 0: + if os.path.exists(local_path): + raise koji.BuildError, 'cannot overwrite %s' % local_path + fobj = file(local_path, 'w') + else: + if not os.path.isfile(local_path): + raise koji.BuildError, '% does not exist' % local_path + size = os.path.getsize(local_path) + if offset != size: + raise koji.BuildError, 'cannot write to %s at offset %s, size is %s' % \ + (local_path, offset, size) + fobj = file(local_path, 'r+') + fobj.seek(offset) + data = base64.b64decode(contents) + fobj.write(data) + fobj.close() + return len(data) + + def verifyChecksum(self, path, checksum, algo='sha1'): + local_path = os.path.abspath(os.path.join(self.workdir, path)) + if not local_path.startswith(self.workdir): + raise koji.BuildError, 'invalid path: %s' % path + if not os.path.isfile(local_path): + raise koji.BuildError, '%s does not exist' % local_path + + if algo == 'sha1': + sum = koji.util.sha1_constructor() + elif algo == 'md5': + sum = koji.util.md5_constructor() + else: + raise koji.BuildError, 'unsupported checksum algorithm: %s' % algo + + fobj = file(local_path, 'r') + while True: + data = fobj.read(1048576) + if not data: + break + sum.update(data) + fobj.close() + if sum.hexdigest() == checksum: + return True + else: + raise koji.BuildError, '%s checksum validation failed for %s, %s (computed) != %s (provided)' % \ + (algo, local_path, sum.hexdigest(), checksum) + + def closeTask(self, output): + self.output = output + self.success = True + return True + + def failTask(self, output): + self.output = output + self.success = False + return True + + def setupTaskServer(self): + """ + Setup the task-specific xmlrpc server to listen to requests from + the VM. + """ + global options + self.server = TaskXMLRPCServer(options.privaddr, self.port, self) + thr = threading.Thread(name='task_%s_thread' % self.id, + target=self.server.handle_while_active) + thr.setDaemon(True) + thr.start() + + def handler(self, name, task_info, opts=None): + """ + Clone the VM named "name", and provide the data in "task_info" to it. + Available options: + - timeout (int): number of minutes to let the VM run before + destroying it and failing the task, default: 1440 + """ + global options + if not opts: + opts = {} + timeout = opts.get('timeout', 720) + + self.task_info = task_info + + conn = libvirt.open(None) + clone_name = self.clone(conn, name) + self.logger.debug('Cloned VM %s to %s' % (name, clone_name)) + try: + vm = conn.lookupByName(clone_name) + vm_info = vm.info() + cpus = opts.get('cpus') + if cpus and cpus != vm_info[3]: + vm.setVcpus(cpus) + mem = opts.get('mem') + if mem: + # mem is in mbytes, libvirt expects kbytes + mem = mem * 1024 + if vm_info[1] < mem: + vm.setMaxMemory(mem) + if vm_info[2] != mem: + vm.setMemory(mem) + macaddr = self.macAddr(vm) + registered = False + while not registered: + # loop in case the port is already taken + self.port = options.portbase + random.randint(1, 100) + registered = self.task_manager.registerVM(macaddr, clone_name, self.id, self.port) + self.setupTaskServer() + vm.create() + self.logger.info('Started VM %s' % clone_name) + except libvirt.libvirtError, e: + raise koji.PreBuildError, 'error starting VM %s, error was: %s' % \ + (clone_name, e) + + start = time.time() + while True: + time.sleep(15) + info = vm.info() + if info[0] in (libvirt.VIR_DOMAIN_CRASHED, libvirt.VIR_DOMAIN_SHUTOFF): + self.logger.warn('VM %s crashed' % clone_name) + self.server.server_close() + raise koji.BuildError, 'VM %s crashed' % clone_name + if self.success is None: + # task is still running + # make sure it hasn't exceeded the timeout + mins = (time.time() - start) / 60 + if mins > timeout: + vm.destroy() + self.server.server_close() + raise koji.BuildError, 'Task did not complete after %.2f minutes, VM %s has been destroyed' % \ + (mins, clone_name) + else: + vm.destroy() + self.server.server_close() + self.uploadTree(self.workdir) + if self.success: + return self.output + else: + raise koji.BuildError, self.output + +class ManagerXMLRPCServer(DaemonXMLRPCServer): + + def __init__(self, addr, port, manager): + DaemonXMLRPCServer.__init__(self, addr, port) + self.register_function(manager.registerVM) + self.register_function(manager.getPort) + +class VMTaskManager(TaskManager): + def __init__(self): + super(VMTaskManager, self).__init__() + self.libvirt_conn = libvirt.open(None) + self.macaddrs = {} + self.macaddr_lock = threading.Lock() + self.expired_vms = {} + self.setupServer() + + def registerVM(self, macaddr, vm_name, task_id, port): + """ + Register a VM instance with the task manager. + """ + self.macaddr_lock.acquire() + try: + macaddr = macaddr.lower() + ports = [d[2] for d in self.macaddrs.values()] + if port in ports: + return False + if macaddr in self.macaddrs: + raise koji.PreBuildError, 'duplicate MAC address: %s' % macaddr + self.macaddrs[macaddr] = (vm_name, task_id, port) + self.logger.info('registered MAC address %s for VM %s (task ID %s, port %s)' % (macaddr, vm_name, task_id, port)) + return True + finally: + self.macaddr_lock.release() + + def getPort(self, macaddr): + """ + Get the port that the daemon associated with VM with the given MAC address is listening on. + """ + self.macaddr_lock.acquire() + try: + macaddr = macaddr.lower() + data = self.macaddrs.get(macaddr) + if data: + return data[2] + else: + raise koji.PreBuildError, 'unknown MAC address: %s' % macaddr + finally: + self.macaddr_lock.release() + + def setupServer(self): + global options + self.server = ManagerXMLRPCServer(options.privaddr, options.portbase, self) + thr = threading.Thread(name='manager_thread', target=self.server.handle_while_active) + thr.setDaemon(True) + thr.start() + + def getCloneDisks(self, vm): + doc = libxml2.parseDoc(vm.XMLDesc(0)) + ctx = doc.xpathNewContext() + nodelist = ctx.xpathEval('/domain/devices/disk[@device="disk" and @type="file"]/source') + disks = [] + for node in nodelist: + disk = node.prop('file') + if os.path.basename(disk).startswith(VMTask.CLONE_PREFIX) and \ + disk.endswith(VMTask.QCOW2_EXT): + disks.append(disk) + ctx.xpathFreeContext() + doc.freeDoc() + return disks + + def checkDisk(self): + global options + if not os.path.exists(options.imagedir): + self.logger.error('No such directory: %s' % options.imagedir) + raise IOError, 'No such directory: %s' % options.imagedir + fs_stat = os.statvfs(options.imagedir) + available = fs_stat.f_bavail * fs_stat.f_bsize + availableMB = available / 1024 / 1024 + self.logger.debug('disk space available in %s: %i MB', options.imagedir, availableMB) + if availableMB < options.minspace: + self.status = 'Insufficient disk space: %i MB, %i MB required' % (availableMB, options.minspace) + self.logger.warn(self.status) + return False + return True + + def checkMem(self): + global options + phys_mem = os.sysconf('SC_PHYS_PAGES') * os.sysconf('SC_PAGE_SIZE') / 1024 + vm_mem = 0 + for vm_id in self.libvirt_conn.listDomainsID(): + vm = self.libvirt_conn.lookupByID(vm_id) + info = vm.info() + # info[1] is the max. memory allocatable to the VM, and info[2] is the amount of + # memory currently used by the VM (in kbytes). We're interested in the latter. + vm_mem += info[2] + avail_mem = phys_mem - vm_mem + # options.minmem is listed in mbytes + min_mem = options.minmem * 1024 + self.logger.debug('physical mem: %sk, allocated mem: %sk, available mem: %sk' % \ + (phys_mem, vm_mem, avail_mem)) + if avail_mem < min_mem: + self.status = 'Insufficient memory: %sk allocated, %sk available, %sk required' % \ + (vm_mem, avail_mem, min_mem) + self.logger.warn(self.status) + return False + return True + + def checkSpace(self): + """See if we have enough space to accept another job""" + return self.checkDisk() and self.checkMem() + + def cleanupVM(self, vm_name): + """ + Cleanup a single VM with the given name. + """ + vm = self.libvirt_conn.lookupByName(vm_name) + info = vm.info() + if info[0] not in (libvirt.VIR_DOMAIN_SHUTOFF, libvirt.VIR_DOMAIN_CRASHED): + vm.destroy() + self.logger.info('Shut down VM %s' % vm_name) + disks = self.getCloneDisks(vm) + for disk in disks: + try: + if os.path.isfile(disk): + os.unlink(disk) + self.logger.debug('Removed disk file %s for VM %s' % (disk, vm_name)) + except: + tb = ''.join(traceback.format_exception(*sys.exc_info())) + self.logger.error('Error removing disk file %s for VM %s, error was: %s' % \ + (disk, vm_name, tb)) + return False + else: + # Removed all the disks successfully, so undefine the VM + vm.undefine() + self.logger.info('Cleaned up VM %s' % vm_name) + return True + + def cleanupAllVMs(self): + """ + Cleanup shutdown and clean up all cloned Koji VMs. + Only called once at daemon startup, so we start with a clean slate. + """ + vms = self.libvirt_conn.listDefinedDomains() + self.libvirt_conn.listDomainsID() + for vm_name in vms: + if type(vm_name) == int: + vm_name = self.libvirt_conn.lookupByID(vm_name).name() + if vm_name.startswith(VMTask.CLONE_PREFIX): + self.cleanupVM(vm_name) + + def cleanupExpiredVMs(self): + for vm_name, task in self.expired_vms.items(): + if task['state'] == koji.TASK_STATES['FAILED']: + if time.time() - task['completion_ts'] < 3600 * 4: + # task failed, so we'll keep the VM image around for 4 hours + # for debugging purposes + continue + ret = self.cleanupVM(vm_name) + if ret: + # successfully cleaned up the VM, so remove it from the expired list + del self.expired_vms[vm_name] + + def cleanupTask(self, task_id, wait=True): + ret = super(VMTaskManager, self).cleanupTask(task_id, wait) + self.macaddr_lock.acquire() + try: + if ret: + for macaddr, (vm_name, id, port) in self.macaddrs.items(): + if task_id == id: + self.expired_vms[vm_name] = session.getTaskInfo(task_id) + del self.macaddrs[macaddr] + self.logger.info('unregistered MAC address %s' % macaddr) + break + return ret + finally: + self.macaddr_lock.release() + + def shutdown(self): + super(VMTaskManager, self).shutdown() + self.libvirt_conn.close() + self.server.server_close() + +#################### +# Boilerplate startup code +#################### + +if __name__ == "__main__": + global options + + koji.add_file_logger("koji", "/var/log/kojivmd.log") + #note we're setting logging params for all of koji* + get_options() + if options.debug: + logging.getLogger("koji").setLevel(logging.DEBUG) + elif options.verbose: + logging.getLogger("koji").setLevel(logging.INFO) + else: + logging.getLogger("koji").setLevel(logging.WARN) + if options.debug_task: + logging.getLogger("koji.vm.BaseTaskHandler").setLevel(logging.DEBUG) + if options.admin_emails: + koji.add_mail_logger("koji", options.admin_emails) + + #build session options + session_opts = {} + for k in ('user','password','debug_xmlrpc', 'debug', + 'retry_interval', 'max_retries', 'offline_retry', 'offline_retry_interval'): + v = getattr(options, k, None) + if v is not None: + session_opts[k] = v + #start a session and login + session = koji.ClientSession(options.server, session_opts) + if os.path.isfile(options.cert): + try: + # authenticate using SSL client certificates + session.ssl_login(options.cert, options.ca, + options.serverca) + except koji.AuthError, e: + quit("Error: Unable to log in: %s" % e) + except xmlrpclib.ProtocolError: + quit("Error: Unable to connect to server %s" % (options.server)) + elif options.user: + try: + # authenticate using user/password + session.login() + except koji.AuthError: + quit("Error: Unable to log in. Bad credentials?") + except xmlrpclib.ProtocolError: + quit("Error: Unable to connect to server %s" % (options.server)) + elif sys.modules.has_key('krbV'): + krb_principal = options.krb_principal + if krb_principal is None: + krb_principal = options.host_principal_format % socket.getfqdn() + try: + session.krb_login(principal=krb_principal, + keytab=options.keytab, + ccache=options.ccache) + except krbV.Krb5Error, e: + quit("Kerberos authentication failed: '%s' (%s)" % (e.args[1], e.args[0])) + except socket.error, e: + quit("Could not connect to Kerberos authentication service: '%s'" % e.args[1]) + else: + quit("No username/password supplied and Kerberos missing or not configured") + #make session exclusive + try: + session.exclusiveSession(force=options.force_lock) + except koji.AuthLockError: + quit("Error: Unable to get lock. Trying using --force-lock") + if not session.logged_in: + quit("Error: Unknown login error") + #make sure it works + try: + ret = session.echo("OK") + except xmlrpclib.ProtocolError: + quit("Error: Unable to connect to server %s" % (options.server)) + if ret != ["OK"]: + quit("Error: incorrect server response: %r" % (ret)) + + # run main + if options.daemon: + #detach + koji.daemonize() + main() + # not reached + assert False + elif not options.skip_main: + koji.add_stderr_logger("koji") + main() + diff --git a/vm/kojivmd.conf b/vm/kojivmd.conf new file mode 100644 index 00000000..09101ab9 --- /dev/null +++ b/vm/kojivmd.conf @@ -0,0 +1,35 @@ +[kojivmd] +; The number of seconds to sleep between tasks +; sleeptime=15 + +; The maximum number of jobs that kojivmd will handle at a time +; maxjobs=10 + +; Minimum amount of memory (in MBs) not allocated to a VM for kojivmd to take a new task +; minmem=4096 + +; The user the VM/emulator runs as (cloned disk images will be readable and writable by this user) +; vmuser=qemu + +; The directory root for temporary storage +; workdir=/tmp/koji + +; The URL for the xmlrpc server +server=http://hub.example.com/kojihub + +; The mail host to use for sending email notifications +smtphost=example.com + +; The From address used when sending email notifications +from_addr=Koji Build System + +;configuration for SSL authentication + +;client certificate +;cert = /etc/kojivmd/client.crt + +;certificate of the CA that issued the client certificate +;ca = /etc/kojivmd/clientca.crt + +;certificate of the CA that issued the HTTP server certificate +;serverca = /etc/kojivmd/serverca.crt diff --git a/vm/run-vm-task b/vm/run-vm-task new file mode 100755 index 00000000..44694462 --- /dev/null +++ b/vm/run-vm-task @@ -0,0 +1,40 @@ +#!/usr/bin/python + +import koji +import optparse + +# cli/koji -c ~/.koji/config-mead call --python makeTask '"vmExec"' '["Win2k8-x86-vstudio-devel", ["wget -q -O /tmp/test-build.sh http://download.lab.bos.redhat.com/devel/mikeb/mead/debug/test-build.sh && chmod 755 /tmp/test-build.sh && /tmp/test-build.sh &> /tmp/output/build.log && echo build successful"], {"cpus": 2, "mem": 2048}]' --kwargs '{"channel": "vm"}' + +parser = optparse.OptionParser('%prog VM-NAME COMMAND-TO-RUN') +parser.add_option('--server', help='Koji hub') +parser.add_option('--cert', help='Client certificate') +parser.add_option('--ca', help='Client CA') +parser.add_option('--server-ca', help='Server CA') +parser.add_option('--cpus', help='Number of virtual CPUs to allocate to the VM (optional)', + type='int') +parser.add_option('--mem', help='Amount of memory (in megabytes) to allocate to the VM (optional)', + type='int') +parser.add_option('--channel', help='Channel to create the task in', default='vm') + +opts, args = parser.parse_args() + +if len(args) < 2: + parser.error('You must specify a VM name and a command to run') + +vm_name = args[0] +cmd = ' '.join(args[1:]) + +session = koji.ClientSession(opts.server) +session.ssl_login(opts.cert, opts.ca, opts.server_ca) + +task_opts = {} +if opts.cpus: + task_opts['cpus'] = opts.cpus +if opts.mem: + task_opts['mem'] = opts.mem + +params = [vm_name, [cmd], task_opts] + +task_id = session.makeTask('vmExec', params, channel=opts.channel) + +print 'Created task %s' % task_id