major refactoring of koji daemons
- move TaskManager, SCM, and a couple helper functions to koji.daemon - move generic TaskHandler classes to koji.tasks - remove use of globals in kojid and kojira - fix a few issues revealed by pylint
This commit is contained in:
parent
8d6ba17952
commit
419a1dafe4
9 changed files with 1815 additions and 2061 deletions
|
|
@ -1,15 +1,12 @@
|
|||
|
||||
SUBDIRS = lib
|
||||
BINFILES = kojid
|
||||
LIBEXECFILES = mergerepos
|
||||
PYFILES = $(wildcard *.py)
|
||||
|
||||
_default:
|
||||
@echo "nothing to make. try make install"
|
||||
|
||||
clean:
|
||||
rm -f *.o *.so *.pyc *~
|
||||
for d in $(SUBDIRS); do make -s -C $$d clean; done
|
||||
|
||||
|
||||
install:
|
||||
|
|
@ -35,6 +32,4 @@ install:
|
|||
mkdir -p $(DESTDIR)/etc/kojid
|
||||
install -p -m 644 kojid.conf $(DESTDIR)/etc/kojid/kojid.conf
|
||||
|
||||
for d in $(SUBDIRS); do make DESTDIR=`cd $(DESTDIR); pwd` \
|
||||
-C $$d install; [ $$? = 0 ] || exit 1; done
|
||||
|
||||
|
|
|
|||
1969
builder/kojid
1969
builder/kojid
File diff suppressed because it is too large
Load diff
|
|
@ -1,20 +0,0 @@
|
|||
|
||||
PYTHON=python
|
||||
SHAREDIR = $(DESTDIR)/usr/share/koji-builder
|
||||
MODDIR = $(SHAREDIR)/lib
|
||||
PYFILES = $(wildcard *.py)
|
||||
PYVER := $(shell $(PYTHON) -c 'import sys; print "%.3s" %(sys.version)')
|
||||
|
||||
_default:
|
||||
@echo "nothing to make. try make install"
|
||||
|
||||
clean:
|
||||
rm -f *.o *.so *.pyc *~
|
||||
|
||||
install:
|
||||
mkdir -p $(MODDIR)
|
||||
for p in $(PYFILES) ; do \
|
||||
install -p -m 644 $$p $(MODDIR)/$$p; \
|
||||
done
|
||||
$(PYTHON) -c "import compileall; compileall.compile_dir('$(MODDIR)', 1, '$(PYDIR)', 1)"
|
||||
|
||||
|
|
@ -1,224 +0,0 @@
|
|||
# Python module
|
||||
# tasks handlers for the koji build daemon
|
||||
|
||||
# Copyright (c) 2008 Red Hat
|
||||
#
|
||||
# Koji is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU Lesser General Public
|
||||
# License as published by the Free Software Foundation;
|
||||
# version 2.1 of the License.
|
||||
#
|
||||
# This software is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
# Lesser General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Lesser General Public
|
||||
# License along with this software; if not, write to the Free Software
|
||||
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
||||
#
|
||||
# Authors:
|
||||
# Mike McLean <mikem@redhat.com>
|
||||
|
||||
import koji
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import urllib2
|
||||
|
||||
|
||||
class BaseTaskHandler(object):
|
||||
"""The base class for task handlers
|
||||
|
||||
Each task handler is a class, a new instance of which is created
|
||||
to handle each task.
|
||||
"""
|
||||
|
||||
# list of methods the class can handle
|
||||
Methods = []
|
||||
|
||||
# Options:
|
||||
Foreground = False
|
||||
|
||||
def __init__(self, id, method, params, session, options, workdir=None):
|
||||
self.id = id #task id
|
||||
if method not in self.Methods:
|
||||
raise koji.GenericError, 'method "%s" is not supported' % method
|
||||
self.method = method
|
||||
# handle named parameters
|
||||
self.params,self.opts = koji.decode_args(*params)
|
||||
self.session = session
|
||||
self.options = options
|
||||
if workdir is None:
|
||||
workdir = "%s/%s" % (options.workdir, koji.pathinfo.taskrelpath(id))
|
||||
self.workdir = workdir
|
||||
self.logger = logging.getLogger("koji.build.BaseTaskHandler")
|
||||
|
||||
def setManager(self,manager):
|
||||
"""Set the manager attribute
|
||||
|
||||
This is only used for foreground tasks to give them access
|
||||
to their task manager.
|
||||
"""
|
||||
if not self.Foreground:
|
||||
return
|
||||
self.manager = manager
|
||||
|
||||
def handler(self):
|
||||
"""(abstract) the handler for the task."""
|
||||
raise NotImplementedError
|
||||
|
||||
def run(self):
|
||||
"""Execute the task"""
|
||||
self.createWorkdir()
|
||||
try:
|
||||
return self.handler(*self.params,**self.opts)
|
||||
finally:
|
||||
self.removeWorkdir()
|
||||
|
||||
_taskWeight = 1.0
|
||||
|
||||
def weight(self):
|
||||
"""Return the weight of the task.
|
||||
|
||||
This is run by the taskmanager before the task is run to determine
|
||||
the weight of the task. The weight is an abstract measure of the
|
||||
total load the task places on the system while running.
|
||||
|
||||
A task may set _taskWeight for a constant weight different from 1, or
|
||||
override this function for more complicated situations.
|
||||
|
||||
Note that task weight is partially ignored while the task is sleeping.
|
||||
"""
|
||||
return getattr(self,'_taskWeight',1.0)
|
||||
|
||||
def createWorkdir(self):
|
||||
if self.workdir is None:
|
||||
return
|
||||
self.removeWorkdir()
|
||||
os.makedirs(self.workdir)
|
||||
|
||||
def removeWorkdir(self):
|
||||
if self.workdir is None:
|
||||
return
|
||||
safe_rmtree(self.workdir, unmount=False, strict=True)
|
||||
#os.spawnvp(os.P_WAIT, 'rm', ['rm', '-rf', self.workdir])
|
||||
|
||||
def wait(self, subtasks=None, all=False, failany=False):
|
||||
"""Wait on subtasks
|
||||
|
||||
subtasks is a list of integers (or an integer). If more than one subtask
|
||||
is specified, then the default behavior is to return when any of those
|
||||
tasks complete. However, if all is set to True, then it waits for all of
|
||||
them to complete. If all and failany are both set to True, then each
|
||||
finished task will be checked for failure, and a failure will cause all
|
||||
of the unfinished tasks to be cancelled.
|
||||
|
||||
special values:
|
||||
subtasks = None specify all subtasks
|
||||
|
||||
Implementation notes:
|
||||
The build daemon forks all tasks as separate processes. This function
|
||||
uses signal.pause to sleep. The main process watches subtasks in
|
||||
the database and will send the subprocess corresponding to the
|
||||
subtask a SIGUSR2 to wake it up when subtasks complete.
|
||||
"""
|
||||
if isinstance(subtasks,int):
|
||||
# allow single integer w/o enclosing list
|
||||
subtasks = [subtasks]
|
||||
self.session.host.taskSetWait(self.id,subtasks)
|
||||
self.logger.debug("Waiting on %r" % subtasks)
|
||||
while True:
|
||||
finished, unfinished = self.session.host.taskWait(self.id)
|
||||
if len(unfinished) == 0:
|
||||
#all done
|
||||
break
|
||||
elif len(finished) > 0:
|
||||
if all:
|
||||
if failany:
|
||||
failed = False
|
||||
for task in finished:
|
||||
try:
|
||||
result = self.session.getTaskResult(task)
|
||||
except (koji.GenericError, Fault), task_error:
|
||||
self.logger.info("task %s failed or was canceled" % task)
|
||||
failed = True
|
||||
break
|
||||
if failed:
|
||||
self.logger.info("at least one task failed or was canceled, cancelling unfinished tasks")
|
||||
self.session.cancelTaskChildren(self.id)
|
||||
# reraise the original error now, rather than waiting for
|
||||
# an error in taskWaitResults()
|
||||
raise task_error
|
||||
else:
|
||||
# at least one done
|
||||
break
|
||||
# signal handler set by TaskManager.forkTask
|
||||
self.logger.debug("Pausing...")
|
||||
signal.pause()
|
||||
# main process will wake us up with SIGUSR2
|
||||
self.logger.debug("...waking up")
|
||||
self.logger.debug("Finished waiting")
|
||||
return dict(self.session.host.taskWaitResults(self.id,subtasks))
|
||||
|
||||
def getUploadDir(self):
|
||||
return koji.pathinfo.taskrelpath(self.id)
|
||||
|
||||
def uploadFile(self, filename, remoteName=None):
|
||||
"""Upload the file with the given name to the task output directory
|
||||
on the hub."""
|
||||
# Only upload files with content
|
||||
if os.path.isfile(filename) and os.stat(filename).st_size > 0:
|
||||
self.session.uploadWrapper(filename, self.getUploadDir(), remoteName)
|
||||
|
||||
def localPath(self, relpath):
|
||||
"""Return a local path to a remote file.
|
||||
|
||||
If the file is on an nfs mount, use that, otherwise download a copy"""
|
||||
if self.options.topurl:
|
||||
self.logger.debug("Downloading %s", relpath)
|
||||
url = "%s/%s" % (self.options.topurl, relpath)
|
||||
fsrc = urllib2.urlopen(url)
|
||||
fn = "%s/local/%s" % (self.workdir, relpath)
|
||||
os.makedirs(os.path.dirname(fn))
|
||||
fdst = file(fn, 'w')
|
||||
shutil.copyfileobj(fsrc, fdst)
|
||||
fsrc.close()
|
||||
fdst.close()
|
||||
else:
|
||||
fn = "%s/%s" % (self.options.topdir, relpath)
|
||||
return fn
|
||||
|
||||
|
||||
#XXX - not the right place for this
|
||||
#XXX - not as safe as we want
|
||||
def safe_rmtree(path, unmount=False, strict=True):
|
||||
logger = logging.getLogger("koji.build")
|
||||
#safe remove: with -xdev the find cmd will not cross filesystems
|
||||
# (though it will cross bind mounts from the same filesystem)
|
||||
if not os.path.exists(path):
|
||||
logger.debug("No such path: %s" % path)
|
||||
return
|
||||
if unmount:
|
||||
umount_all(path)
|
||||
#first rm -f non-directories
|
||||
logger.debug('Scrubbing files in %s' % path)
|
||||
rv = os.system("find '%s' -xdev \\! -type d -print0 |xargs -0 rm -f" % path)
|
||||
msg = 'file removal failed (code %r) for %s' % (rv,path)
|
||||
if rv != 0:
|
||||
logger.warn(msg)
|
||||
if strict:
|
||||
raise koji.GenericError, msg
|
||||
else:
|
||||
return rv
|
||||
#them rmdir directories
|
||||
#with -depth, we start at the bottom and work up
|
||||
logger.debug('Scrubbing directories in %s' % path)
|
||||
rv = os.system("find '%s' -xdev -depth -type d -print0 |xargs -0 rmdir" % path)
|
||||
msg = 'dir removal failed (code %r) for %s' % (rv,path)
|
||||
if rv != 0:
|
||||
logger.warn(msg)
|
||||
if strict:
|
||||
raise koji.GenericError, msg
|
||||
return rv
|
||||
|
||||
|
|
@ -170,7 +170,6 @@ rm -rf $RPM_BUILD_ROOT
|
|||
%config(noreplace) %{_sysconfdir}/sysconfig/kojid
|
||||
%dir %{_sysconfdir}/kojid
|
||||
%config(noreplace) %{_sysconfdir}/kojid/kojid.conf
|
||||
%{_datadir}/koji-builder
|
||||
%attr(-,kojibuilder,kojibuilder) %{_sysconfdir}/mock/koji
|
||||
|
||||
%pre builder
|
||||
|
|
|
|||
1104
koji/daemon.py
Normal file
1104
koji/daemon.py
Normal file
File diff suppressed because it is too large
Load diff
426
koji/tasks.py
Normal file
426
koji/tasks.py
Normal file
|
|
@ -0,0 +1,426 @@
|
|||
# Task definitions used by various Koji daemons
|
||||
|
||||
# Copyright (c) 2010 Red Hat, Inc.
|
||||
#
|
||||
# Koji is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU Lesser General Public
|
||||
# License as published by the Free Software Foundation;
|
||||
# version 2.1 of the License.
|
||||
#
|
||||
# This software is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
# Lesser General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU Lesser General Public
|
||||
# License along with this software; if not, write to the Free Software
|
||||
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
||||
|
||||
# Authors:
|
||||
# Mike McLean <mikem@redhat.com>
|
||||
|
||||
import koji
|
||||
import os
|
||||
import logging
|
||||
import xmlrpclib
|
||||
import signal
|
||||
import urllib2
|
||||
import shutil
|
||||
import random
|
||||
import time
|
||||
import pprint
|
||||
|
||||
def scan_mounts(topdir):
|
||||
"""Search path for mountpoints"""
|
||||
mplist = []
|
||||
topdir = os.path.normpath(topdir)
|
||||
fo = file('/proc/mounts','r')
|
||||
for line in fo.readlines():
|
||||
path = line.split()[1]
|
||||
if path.startswith(topdir):
|
||||
mplist.append(path)
|
||||
fo.close()
|
||||
#reverse sort so deeper dirs come first
|
||||
mplist.sort()
|
||||
mplist.reverse()
|
||||
return mplist
|
||||
|
||||
def umount_all(topdir):
|
||||
"Unmount every mount under topdir"
|
||||
logger = logging.getLogger("koji.build")
|
||||
for path in scan_mounts(topdir):
|
||||
logger.debug('Unmounting %s' % path)
|
||||
cmd = ['umount', '-l', path]
|
||||
rv = os.spawnvp(os.P_WAIT,cmd[0],cmd)
|
||||
if rv != 0:
|
||||
raise koji.GenericError, 'umount failed (exit code %r) for %s' % (rv,path)
|
||||
#check mounts again
|
||||
remain = scan_mounts(topdir)
|
||||
if remain:
|
||||
raise koji.GenericError, "Unmounting incomplete: %r" % remain
|
||||
|
||||
def safe_rmtree(path, unmount=False, strict=True):
|
||||
logger = logging.getLogger("koji.build")
|
||||
#safe remove: with -xdev the find cmd will not cross filesystems
|
||||
# (though it will cross bind mounts from the same filesystem)
|
||||
if not os.path.exists(path):
|
||||
logger.debug("No such path: %s" % path)
|
||||
return
|
||||
if unmount:
|
||||
umount_all(path)
|
||||
#first rm -f non-directories
|
||||
logger.debug('Scrubbing files in %s' % path)
|
||||
rv = os.system("find '%s' -xdev \\! -type d -print0 |xargs -0 rm -f" % path)
|
||||
msg = 'file removal failed (code %r) for %s' % (rv,path)
|
||||
if rv != 0:
|
||||
logger.warn(msg)
|
||||
if strict:
|
||||
raise koji.GenericError, msg
|
||||
else:
|
||||
return rv
|
||||
#them rmdir directories
|
||||
#with -depth, we start at the bottom and work up
|
||||
logger.debug('Scrubbing directories in %s' % path)
|
||||
rv = os.system("find '%s' -xdev -depth -type d -print0 |xargs -0 rmdir" % path)
|
||||
msg = 'dir removal failed (code %r) for %s' % (rv,path)
|
||||
if rv != 0:
|
||||
logger.warn(msg)
|
||||
if strict:
|
||||
raise koji.GenericError, msg
|
||||
return rv
|
||||
|
||||
class ServerExit(Exception):
|
||||
"""Raised to shutdown the server"""
|
||||
pass
|
||||
|
||||
class BaseTaskHandler(object):
|
||||
"""The base class for task handlers
|
||||
|
||||
Each task handler is a class, a new instance of which is created
|
||||
to handle each task.
|
||||
"""
|
||||
|
||||
# list of methods the class can handle
|
||||
Methods = []
|
||||
|
||||
# Options:
|
||||
Foreground = False
|
||||
|
||||
def __init__(self, id, method, params, session, options, workdir=None):
|
||||
self.id = id #task id
|
||||
if method not in self.Methods:
|
||||
raise koji.GenericError, 'method "%s" is not supported' % method
|
||||
self.method = method
|
||||
# handle named parameters
|
||||
self.params,self.opts = koji.decode_args(*params)
|
||||
self.session = session
|
||||
self.options = options
|
||||
if workdir is None:
|
||||
workdir = "%s/%s" % (self.options.workdir, koji.pathinfo.taskrelpath(id))
|
||||
self.workdir = workdir
|
||||
self.logger = logging.getLogger("koji.build.BaseTaskHandler")
|
||||
self.manager = None
|
||||
|
||||
def setManager(self,manager):
|
||||
"""Set the manager attribute
|
||||
|
||||
This is only used for foreground tasks to give them access
|
||||
to their task manager.
|
||||
"""
|
||||
if not self.Foreground:
|
||||
return
|
||||
self.manager = manager
|
||||
|
||||
def handler(self):
|
||||
"""(abstract) the handler for the task."""
|
||||
raise NotImplementedError
|
||||
|
||||
def run(self):
|
||||
"""Execute the task"""
|
||||
self.createWorkdir()
|
||||
try:
|
||||
return self.handler(*self.params,**self.opts)
|
||||
finally:
|
||||
self.removeWorkdir()
|
||||
|
||||
_taskWeight = 1.0
|
||||
|
||||
def weight(self):
|
||||
"""Return the weight of the task.
|
||||
|
||||
This is run by the taskmanager before the task is run to determine
|
||||
the weight of the task. The weight is an abstract measure of the
|
||||
total load the task places on the system while running.
|
||||
|
||||
A task may set _taskWeight for a constant weight different from 1, or
|
||||
override this function for more complicated situations.
|
||||
|
||||
Note that task weight is partially ignored while the task is sleeping.
|
||||
"""
|
||||
return getattr(self,'_taskWeight',1.0)
|
||||
|
||||
def createWorkdir(self):
|
||||
if self.workdir is None:
|
||||
return
|
||||
self.removeWorkdir()
|
||||
os.makedirs(self.workdir)
|
||||
|
||||
def removeWorkdir(self):
|
||||
if self.workdir is None:
|
||||
return
|
||||
safe_rmtree(self.workdir, unmount=False, strict=True)
|
||||
#os.spawnvp(os.P_WAIT, 'rm', ['rm', '-rf', self.workdir])
|
||||
|
||||
def wait(self, subtasks=None, all=False, failany=False):
|
||||
"""Wait on subtasks
|
||||
|
||||
subtasks is a list of integers (or an integer). If more than one subtask
|
||||
is specified, then the default behavior is to return when any of those
|
||||
tasks complete. However, if all is set to True, then it waits for all of
|
||||
them to complete. If all and failany are both set to True, then each
|
||||
finished task will be checked for failure, and a failure will cause all
|
||||
of the unfinished tasks to be cancelled.
|
||||
|
||||
special values:
|
||||
subtasks = None specify all subtasks
|
||||
|
||||
Implementation notes:
|
||||
The build daemon forks all tasks as separate processes. This function
|
||||
uses signal.pause to sleep. The main process watches subtasks in
|
||||
the database and will send the subprocess corresponding to the
|
||||
subtask a SIGUSR2 to wake it up when subtasks complete.
|
||||
"""
|
||||
if isinstance(subtasks,int):
|
||||
# allow single integer w/o enclosing list
|
||||
subtasks = [subtasks]
|
||||
self.session.host.taskSetWait(self.id,subtasks)
|
||||
self.logger.debug("Waiting on %r" % subtasks)
|
||||
while True:
|
||||
finished, unfinished = self.session.host.taskWait(self.id)
|
||||
if len(unfinished) == 0:
|
||||
#all done
|
||||
break
|
||||
elif len(finished) > 0:
|
||||
if all:
|
||||
if failany:
|
||||
failed = False
|
||||
for task in finished:
|
||||
try:
|
||||
result = self.session.getTaskResult(task)
|
||||
except (koji.GenericError, xmlrpclib.Fault), task_error:
|
||||
self.logger.info("task %s failed or was canceled" % task)
|
||||
failed = True
|
||||
break
|
||||
if failed:
|
||||
self.logger.info("at least one task failed or was canceled, cancelling unfinished tasks")
|
||||
self.session.cancelTaskChildren(self.id)
|
||||
# reraise the original error now, rather than waiting for
|
||||
# an error in taskWaitResults()
|
||||
raise task_error
|
||||
else:
|
||||
# at least one done
|
||||
break
|
||||
# signal handler set by TaskManager.forkTask
|
||||
self.logger.debug("Pausing...")
|
||||
signal.pause()
|
||||
# main process will wake us up with SIGUSR2
|
||||
self.logger.debug("...waking up")
|
||||
self.logger.debug("Finished waiting")
|
||||
return dict(self.session.host.taskWaitResults(self.id,subtasks))
|
||||
|
||||
def getUploadDir(self):
|
||||
return koji.pathinfo.taskrelpath(self.id)
|
||||
|
||||
def uploadFile(self, filename, relPath=None, remoteName=None):
|
||||
"""Upload the file with the given name to the task output directory
|
||||
on the hub."""
|
||||
uploadPath = self.getUploadDir()
|
||||
if relPath:
|
||||
relPath = relPath.strip('/')
|
||||
uploadPath += '/' + relPath
|
||||
# Only upload files with content
|
||||
if os.path.isfile(filename) and os.stat(filename).st_size > 0:
|
||||
self.session.uploadWrapper(filename, uploadPath, remoteName)
|
||||
|
||||
def uploadTree(self, dirpath, flatten=False):
|
||||
"""Upload the directory tree at dirpath to the task directory on the
|
||||
hub, preserving the directory structure"""
|
||||
dirpath = dirpath.rstrip('/')
|
||||
for path, dirs, files in os.walk(dirpath):
|
||||
if flatten:
|
||||
relpath = None
|
||||
else:
|
||||
relpath = path[len(dirpath) + 1:]
|
||||
for filename in files:
|
||||
self.uploadFile(os.path.join(path, filename), relpath)
|
||||
|
||||
def localPath(self, relpath):
|
||||
"""Return a local path to a remote file.
|
||||
|
||||
If the file is on an nfs mount, use that, otherwise download a copy"""
|
||||
if self.options.topurl:
|
||||
fn = "%s/local/%s" % (self.workdir, relpath)
|
||||
if os.path.exists(fn):
|
||||
# We've already downloaded this file,
|
||||
# just return the existing local path
|
||||
return fn
|
||||
self.logger.debug("Downloading %s", relpath)
|
||||
url = "%s/%s" % (self.options.topurl, relpath)
|
||||
fsrc = urllib2.urlopen(url)
|
||||
if not os.path.exists(os.path.dirname(fn)):
|
||||
os.makedirs(os.path.dirname(fn))
|
||||
fdst = file(fn, 'w')
|
||||
shutil.copyfileobj(fsrc, fdst)
|
||||
fsrc.close()
|
||||
fdst.close()
|
||||
else:
|
||||
fn = "%s/%s" % (self.options.topdir, relpath)
|
||||
return fn
|
||||
|
||||
def subtask(self, method, arglist, **opts):
|
||||
return self.session.host.subtask(method, arglist, self.id, **opts)
|
||||
|
||||
def subtask2(self, __taskopts, __method, *args, **kwargs):
|
||||
return self.session.host.subtask2(self.id, __taskopts, __method, *args, **kwargs)
|
||||
|
||||
def find_arch(self, arch, host, tag):
|
||||
"""
|
||||
For noarch tasks, find a canonical arch that is supported by both the host and tag.
|
||||
If the arch is anything other than noarch, return it unmodified.
|
||||
"""
|
||||
if arch != "noarch":
|
||||
return arch
|
||||
|
||||
# We need a concrete arch. Pick one that:
|
||||
# a) this host can handle
|
||||
# b) the build tag can support
|
||||
# c) is canonical
|
||||
host_arches = host['arches']
|
||||
if not host_arches:
|
||||
raise koji.BuildError, "No arch list for this host: %s" % host['name']
|
||||
tag_arches = tag['arches']
|
||||
if not tag_arches:
|
||||
raise koji.BuildError, "No arch list for tag: %s" % tag['name']
|
||||
# index canonical host arches
|
||||
host_arches = set([koji.canonArch(a) for a in host_arches.split()])
|
||||
# index canonical tag arches
|
||||
tag_arches = set([koji.canonArch(a) for a in tag_arches.split()])
|
||||
# find the intersection of host and tag arches
|
||||
common_arches = list(host_arches & tag_arches)
|
||||
if common_arches:
|
||||
# pick one of the common arches randomly
|
||||
# need to re-seed the prng or we'll get the same arch every time,
|
||||
# because we just forked from a common parent
|
||||
random.seed()
|
||||
arch = random.choice(common_arches)
|
||||
self.logger.info('Valid arches: %s, using: %s' % (' '.join(common_arches), arch))
|
||||
return arch
|
||||
else:
|
||||
# no overlap
|
||||
raise koji.BuildError, "host %s (%s) does not support any arches of tag %s (%s)" % \
|
||||
(host['name'], ', '.join(host_arches), tag['name'], ', '.join(tag_arches))
|
||||
|
||||
class FakeTask(BaseTaskHandler):
|
||||
Methods = ['someMethod']
|
||||
Foreground = True
|
||||
def handler(self, *args):
|
||||
self.logger.info("This is a fake task. Args: " + str(args))
|
||||
return 42
|
||||
|
||||
|
||||
class SleepTask(BaseTaskHandler):
|
||||
Methods = ['sleep']
|
||||
_taskWeight = 0.25
|
||||
def handler(self, n):
|
||||
self.logger.info("Sleeping for %s seconds" % n)
|
||||
time.sleep(n)
|
||||
self.logger.info("Finished sleeping")
|
||||
|
||||
class ForkTask(BaseTaskHandler):
|
||||
Methods = ['fork']
|
||||
def handler(self, n=5, m=37):
|
||||
for i in xrange(n):
|
||||
os.spawnvp(os.P_NOWAIT, 'sleep', ['sleep',str(m)])
|
||||
|
||||
class WaitTestTask(BaseTaskHandler):
|
||||
Methods = ['waittest']
|
||||
_taskWeight = 0.1
|
||||
def handler(self,count,seconds=10):
|
||||
tasks = []
|
||||
for i in xrange(count):
|
||||
task_id = self.session.host.subtask(method='sleep',
|
||||
arglist=[seconds],
|
||||
label=str(i),
|
||||
parent=self.id)
|
||||
tasks.append(task_id)
|
||||
results = self.wait(all=True)
|
||||
self.logger.info(pprint.pformat(results))
|
||||
|
||||
|
||||
class SubtaskTask(BaseTaskHandler):
|
||||
Methods = ['subtask']
|
||||
_taskWeight = 0.1
|
||||
def handler(self,n=4):
|
||||
if n > 0:
|
||||
task_id = self.session.host.subtask(method='subtask',
|
||||
arglist=[n-1],
|
||||
label='foo',
|
||||
parent=self.id)
|
||||
self.wait(task_id)
|
||||
else:
|
||||
task_id = self.session.host.subtask(method='sleep',
|
||||
arglist=[15],
|
||||
label='bar',
|
||||
parent=self.id)
|
||||
self.wait(task_id)
|
||||
|
||||
|
||||
class DefaultTask(BaseTaskHandler):
|
||||
"""Used when no matching method is found"""
|
||||
Methods = ['default']
|
||||
_taskWeight = 0.1
|
||||
def handler(self,*args,**opts):
|
||||
raise koji.GenericError, "Invalid method: %s" % self.method
|
||||
|
||||
|
||||
class ShutdownTask(BaseTaskHandler):
|
||||
Methods = ['shutdown']
|
||||
_taskWeight = 0.0
|
||||
Foreground = True
|
||||
def handler(self):
|
||||
#note: this is a foreground task
|
||||
raise ServerExit
|
||||
|
||||
|
||||
class DependantTask(BaseTaskHandler):
|
||||
|
||||
Methods = ['dependantTask']
|
||||
#mostly just waiting on other tasks
|
||||
_taskWeight = 0.2
|
||||
|
||||
def handler(self, wait_list, task_list):
|
||||
for task in wait_list:
|
||||
if not isinstance(task, int) or not self.session.getTaskInfo(task):
|
||||
self.logger.debug("invalid task id %s, removing from wait_list" % task)
|
||||
wait_list.remove(task)
|
||||
|
||||
# note, tasks in wait_list are not children of this task so we can't
|
||||
# just use self.wait()
|
||||
while wait_list:
|
||||
for task in wait_list[:]:
|
||||
if self.session.taskFinished(task):
|
||||
info = self.session.getTaskInfo(task)
|
||||
if info and koji.TASK_STATES[info['state']] in ['CANCELED','FAILED']:
|
||||
raise koji.GenericError, "Dependency %s failed to complete." % info['id']
|
||||
wait_list.remove(task)
|
||||
# let the system rest before polling again
|
||||
time.sleep(1)
|
||||
|
||||
subtasks = []
|
||||
for task in task_list:
|
||||
# **((len(task)>2 and task[2]) or {}) expands task[2] into opts if it exists, allows for things like 'priority=15'
|
||||
task_id = self.session.host.subtask(method=task[0], arglist=task[1], parent=self.id, **((len(task)>2 and task[2]) or {}))
|
||||
if task_id:
|
||||
subtasks.append(task_id)
|
||||
if subtasks:
|
||||
self.wait(subtasks, all=True)
|
||||
21
koji/util.py
21
koji/util.py
|
|
@ -1,4 +1,4 @@
|
|||
# Copyright (c) 2005-2007 Red Hat
|
||||
# Copyright (c) 2005-2010 Red Hat
|
||||
#
|
||||
# Koji is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU Lesser General Public
|
||||
|
|
@ -18,6 +18,7 @@ import calendar
|
|||
import re
|
||||
import time
|
||||
import koji
|
||||
import os
|
||||
|
||||
try:
|
||||
from hashlib import md5 as md5_constructor
|
||||
|
|
@ -147,3 +148,21 @@ def filedigestAlgo(hdr):
|
|||
digest_algo_id = None
|
||||
digest_algo = koji.RPM_FILEDIGESTALGO_IDS.get(digest_algo_id, 'unknown')
|
||||
return digest_algo.lower()
|
||||
|
||||
def parseStatus(rv, prefix):
|
||||
if isinstance(prefix, list) or isinstance(prefix, tuple):
|
||||
prefix = ' '.join(prefix)
|
||||
if os.WIFSIGNALED(rv):
|
||||
return '%s was killed by signal %i' % (prefix, os.WTERMSIG(rv))
|
||||
elif os.WIFEXITED(rv):
|
||||
return '%s exited with status %i' % (prefix, os.WEXITSTATUS(rv))
|
||||
else:
|
||||
return '%s terminated for unknown reasons' % prefix
|
||||
|
||||
def isSuccess(rv):
|
||||
"""Return True if rv indicates successful completion
|
||||
(exited with status 0), False otherwise."""
|
||||
if os.WIFEXITED(rv) and os.WEXITSTATUS(rv) == 0:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
|
|
|||
106
util/kojira
106
util/kojira
|
|
@ -1,7 +1,7 @@
|
|||
#!/usr/bin/python
|
||||
|
||||
# Koji Repository Administrator (kojira)
|
||||
# Copyright (c) 2005-2007 Red Hat
|
||||
# Copyright (c) 2005-2010 Red Hat
|
||||
#
|
||||
# Koji is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU Lesser General Public
|
||||
|
|
@ -27,6 +27,7 @@ except ImportError:
|
|||
import sys
|
||||
import os
|
||||
import koji
|
||||
from koji.tasks import safe_rmtree
|
||||
from optparse import OptionParser
|
||||
from ConfigParser import ConfigParser
|
||||
import fnmatch
|
||||
|
|
@ -38,38 +39,11 @@ import time
|
|||
import traceback
|
||||
|
||||
|
||||
|
||||
def safe_rmtree(path, strict=True):
|
||||
logger = logging.getLogger("koji.repo")
|
||||
#safe remove: with -xdev the find cmd will not cross filesystems
|
||||
# (though it will cross bind mounts from the same filesystem)
|
||||
if not os.path.exists(path):
|
||||
logger.debug("No such path: %s" % path)
|
||||
return
|
||||
#first rm -f non-directories
|
||||
logger.debug('Removing files under %s' % path)
|
||||
rv = os.system("find '%s' -xdev \\! -type d -print0 |xargs -0 rm -f" % path)
|
||||
msg = 'file removal failed (code %r) for %s' % (rv,path)
|
||||
if rv != 0:
|
||||
logger.warn(msg)
|
||||
if strict:
|
||||
raise koji.GenericError, msg
|
||||
else:
|
||||
return rv
|
||||
#them rmdir directories
|
||||
#with -depth, we start at the bottom and work up
|
||||
logger.debug('Removing directories under %s' % path)
|
||||
rv = os.system("find '%s' -xdev -depth -type d -print0 |xargs -0 rmdir" % path)
|
||||
msg = 'dir removal failed (code %r) for %s' % (rv,path)
|
||||
if rv != 0:
|
||||
logger.warn(msg)
|
||||
if strict:
|
||||
raise koji.GenericError, msg
|
||||
return rv
|
||||
|
||||
class ManagedRepo(object):
|
||||
|
||||
def __init__(self, data):
|
||||
def __init__(self, session, options, data):
|
||||
self.session = session
|
||||
self.options = options
|
||||
self.logger = logging.getLogger("koji.repo")
|
||||
self.current = True
|
||||
self.repo_id = data['id']
|
||||
|
|
@ -78,7 +52,7 @@ class ManagedRepo(object):
|
|||
self.tag_id = data['tag_id']
|
||||
self.state = data['state']
|
||||
self.first_seen = time.time()
|
||||
order = session.getFullInheritance(self.tag_id, event=self.event_id)
|
||||
order = self.session.getFullInheritance(self.tag_id, event=self.event_id)
|
||||
#order may contain same tag more than once
|
||||
tags = {self.tag_id : 1}
|
||||
for x in order:
|
||||
|
|
@ -90,9 +64,9 @@ class ManagedRepo(object):
|
|||
if self.state == koji.REPO_EXPIRED:
|
||||
return
|
||||
elif self.state == koji.REPO_DELETED:
|
||||
raise GenericError, "Repo already deleted"
|
||||
raise koji.GenericError, "Repo already deleted"
|
||||
self.logger.info("Expiring repo %s.." % self.repo_id)
|
||||
session.repoExpire(self.repo_id)
|
||||
self.session.repoExpire(self.repo_id)
|
||||
self.state = koji.REPO_EXPIRED
|
||||
|
||||
def expired(self):
|
||||
|
|
@ -124,7 +98,7 @@ class ManagedRepo(object):
|
|||
|
||||
def tryDelete(self):
|
||||
"""Remove the repo from disk, if possible"""
|
||||
tag_info = session.getTag(self.tag_id)
|
||||
tag_info = self.session.getTag(self.tag_id)
|
||||
if not tag_info:
|
||||
self.logger.warn('Could not get info for tag %i, skipping delete of repo %i' %
|
||||
(self.tag_id, self.repo_id))
|
||||
|
|
@ -146,13 +120,13 @@ class ManagedRepo(object):
|
|||
return False
|
||||
else:
|
||||
age = time.time() - max(self.event_ts, mtime)
|
||||
if age < options.deleted_repo_lifetime:
|
||||
if age < self.options.deleted_repo_lifetime:
|
||||
#XXX should really be called expired_repo_lifetime
|
||||
return False
|
||||
self.logger.debug("Attempting to delete repo %s.." % self.repo_id)
|
||||
if self.state != koji.REPO_EXPIRED:
|
||||
raise GenericError, "Repo not expired"
|
||||
if session.repoDelete(self.repo_id) > 0:
|
||||
raise koji.GenericError, "Repo not expired"
|
||||
if self.session.repoDelete(self.repo_id) > 0:
|
||||
#cannot delete, we are referenced by a buildroot
|
||||
self.logger.debug("Cannot delete repo %s, still referenced" % self.repo_id)
|
||||
return False
|
||||
|
|
@ -178,7 +152,7 @@ class ManagedRepo(object):
|
|||
#also no point in checking
|
||||
return False
|
||||
self.logger.debug("Checking for changes: %r" % self.taglist)
|
||||
if session.tagChangedSinceEvent(self.event_id,self.taglist):
|
||||
if self.session.tagChangedSinceEvent(self.event_id,self.taglist):
|
||||
self.logger.debug("Tag data has changed since event %r" % self.event_id)
|
||||
self.current = False
|
||||
else:
|
||||
|
|
@ -188,7 +162,9 @@ class ManagedRepo(object):
|
|||
|
||||
class RepoManager(object):
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, options, session):
|
||||
self.options = options
|
||||
self.session = session
|
||||
self.repos = {}
|
||||
self.tasks = {}
|
||||
self.tag_use_stats = {}
|
||||
|
|
@ -203,7 +179,7 @@ class RepoManager(object):
|
|||
|
||||
def readCurrentRepos(self):
|
||||
self.logger.debug("Reading current repo data")
|
||||
repodata = session.getActiveRepos()
|
||||
repodata = self.session.getActiveRepos()
|
||||
self.logger.debug("Repo data: %r" % repodata)
|
||||
for data in repodata:
|
||||
repo_id = data['id']
|
||||
|
|
@ -217,7 +193,7 @@ class RepoManager(object):
|
|||
else:
|
||||
self.logger.info('Found repo %s, state=%s'
|
||||
%(repo_id, koji.REPO_STATES[data['state']]))
|
||||
self.repos[repo_id] = ManagedRepo(data)
|
||||
self.repos[repo_id] = ManagedRepo(self.session, self.options, data)
|
||||
|
||||
def pruneLocalRepos(self):
|
||||
"""Scan filesystem for repos and remove any deleted ones
|
||||
|
|
@ -230,12 +206,12 @@ class RepoManager(object):
|
|||
tagdir = "%s/%s" % (topdir, tag)
|
||||
if not os.path.isdir(tagdir):
|
||||
continue
|
||||
taginfo = session.getTag(tag)
|
||||
taginfo = self.session.getTag(tag)
|
||||
if taginfo is None:
|
||||
self.logger.warn("Unexpected directory (no such tag): %s" % tagdir)
|
||||
continue
|
||||
for repo_id in os.listdir(tagdir):
|
||||
if count >= options.prune_batch_size:
|
||||
if count >= self.options.prune_batch_size:
|
||||
#this keeps us from spending too much time on this at one time
|
||||
return
|
||||
repodir = "%s/%s" % (tagdir, repo_id)
|
||||
|
|
@ -253,11 +229,11 @@ class RepoManager(object):
|
|||
except OSError:
|
||||
#just in case something deletes the repo out from under us
|
||||
continue
|
||||
rinfo = session.repoInfo(repo_id)
|
||||
rinfo = self.session.repoInfo(repo_id)
|
||||
if rinfo is None:
|
||||
if not options.ignore_stray_repos:
|
||||
if not self.options.ignore_stray_repos:
|
||||
age = time.time() - dir_ts
|
||||
if age > options.deleted_repo_lifetime:
|
||||
if age > self.options.deleted_repo_lifetime:
|
||||
count += 1
|
||||
self.logger.info("Removing unexpected directory (no such repo): %s" % repodir)
|
||||
safe_rmtree(repodir, strict=False)
|
||||
|
|
@ -267,7 +243,7 @@ class RepoManager(object):
|
|||
continue
|
||||
if rinfo['state'] in (koji.REPO_DELETED, koji.REPO_PROBLEM):
|
||||
age = time.time() - max(rinfo['create_ts'], dir_ts)
|
||||
if age > options.deleted_repo_lifetime:
|
||||
if age > self.options.deleted_repo_lifetime:
|
||||
#XXX should really be called expired_repo_lifetime
|
||||
count += 1
|
||||
logger.info("Removing stray repo (state=%s): %s" % (koji.REPO_STATES[rinfo['state']], repodir))
|
||||
|
|
@ -280,8 +256,8 @@ class RepoManager(object):
|
|||
if stats and now - stats['ts'] < 3600:
|
||||
#use the cache
|
||||
return stats
|
||||
data = session.listBuildroots(tagID=tag_id,
|
||||
queryOpts={'order': '-create_event_id', 'limit' : 100})
|
||||
data = self.session.listBuildroots(tagID=tag_id,
|
||||
queryOpts={'order': '-create_event_id', 'limit' : 100})
|
||||
#XXX magic number (limit)
|
||||
if data:
|
||||
tag_name = data[0]['tag_name']
|
||||
|
|
@ -350,7 +326,7 @@ class RepoManager(object):
|
|||
def updateRepos(self):
|
||||
#check on tasks
|
||||
for tag_id, task_id in self.tasks.items():
|
||||
tinfo = session.getTaskInfo(task_id)
|
||||
tinfo = self.session.getTaskInfo(task_id)
|
||||
tstate = koji.TASK_STATES[tinfo['state']]
|
||||
if tstate == 'CLOSED':
|
||||
self.logger.info("Finished: newRepo task %s for tag %s" % (task_id, tag_id))
|
||||
|
|
@ -368,7 +344,7 @@ class RepoManager(object):
|
|||
repo.expire()
|
||||
#find out which tags require repos
|
||||
tags = {}
|
||||
for target in session.getBuildTargets():
|
||||
for target in self.session.getBuildTargets():
|
||||
tag_id = target['build_tag']
|
||||
tags[tag_id] = target['build_tag_name']
|
||||
#index repos by tag
|
||||
|
|
@ -410,20 +386,20 @@ class RepoManager(object):
|
|||
self.logger.debug("order: %s", regen)
|
||||
# i.e. tags with oldest (or no) repos get precedence
|
||||
for ts, tag_id in regen:
|
||||
if len(self.tasks) >= options.max_repo_tasks:
|
||||
if len(self.tasks) >= self.options.max_repo_tasks:
|
||||
self.logger.info("Maximum number of repo tasks reached.")
|
||||
break
|
||||
tagname = tags[tag_id]
|
||||
taskopts = {}
|
||||
for pat in options.debuginfo_tags.split():
|
||||
for pat in self.options.debuginfo_tags.split():
|
||||
if fnmatch.fnmatch(tagname, pat):
|
||||
taskopts['debuginfo'] = True
|
||||
break
|
||||
for pat in options.source_tags.split():
|
||||
for pat in self.options.source_tags.split():
|
||||
if fnmatch.fnmatch(tagname, pat):
|
||||
taskopts['src'] = True
|
||||
break
|
||||
task_id = session.newRepo(tagname, **taskopts)
|
||||
task_id = self.session.newRepo(tagname, **taskopts)
|
||||
self.logger.info("Created newRepo task %s for tag %s (%s)" % (task_id, tag_id, tags[tag_id]))
|
||||
self.tasks[tag_id] = task_id
|
||||
#some cleanup
|
||||
|
|
@ -435,7 +411,7 @@ class RepoManager(object):
|
|||
if repo.ready():
|
||||
repo.expire()
|
||||
for repo in repolist:
|
||||
if n_deletes >= options.delete_batch_size:
|
||||
if n_deletes >= self.options.delete_batch_size:
|
||||
break
|
||||
if repo.expired():
|
||||
#try to delete
|
||||
|
|
@ -443,10 +419,13 @@ class RepoManager(object):
|
|||
n_deletes += 1
|
||||
|
||||
|
||||
def main():
|
||||
repomgr = RepoManager()
|
||||
def main(options, session):
|
||||
repomgr = RepoManager(options, session)
|
||||
repomgr.readCurrentRepos()
|
||||
repomgr.pruneLocalRepos()
|
||||
def shutdown(*args):
|
||||
raise SystemExit
|
||||
signal.signal(signal.SIGTERM,shutdown)
|
||||
logger.info("Entering main loop")
|
||||
while True:
|
||||
try:
|
||||
|
|
@ -475,11 +454,6 @@ def main():
|
|||
finally:
|
||||
sys.exit()
|
||||
|
||||
def _exit_signal_handler(signum, frame):
|
||||
logger.error('Exiting on signal')
|
||||
session.logout()
|
||||
sys.exit(1)
|
||||
|
||||
def get_options():
|
||||
"""process options from command line and config file"""
|
||||
# parse command line args
|
||||
|
|
@ -634,6 +608,4 @@ if __name__ == "__main__":
|
|||
koji.daemonize()
|
||||
else:
|
||||
koji.add_stderr_logger("koji")
|
||||
main()
|
||||
|
||||
|
||||
main(options, session)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue