Some files in the tree had bits of code that you could run if you executed the files directly as scripts. Now that we have unit tests and the "fakehub" tool, we do not need this code.
190 lines
6.4 KiB
Python
190 lines
6.4 KiB
Python
# python library
|
|
|
|
# db utilities for koji
|
|
# Copyright (c) 2005-2014 Red Hat, Inc.
|
|
#
|
|
# Koji is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation;
|
|
# version 2.1 of the License.
|
|
#
|
|
# This software is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public
|
|
# License along with this software; if not, write to the Free Software
|
|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
#
|
|
# Authors:
|
|
# Mike McLean <mikem@redhat.com>
|
|
|
|
|
|
from __future__ import absolute_import
|
|
import logging
|
|
import psycopg2
|
|
# import psycopg2.extensions
|
|
# # don't convert timestamp fields to DateTime objects
|
|
# del psycopg2.extensions.string_types[1114]
|
|
# del psycopg2.extensions.string_types[1184]
|
|
# del psycopg2.extensions.string_types[1082]
|
|
# del psycopg2.extensions.string_types[1083]
|
|
# del psycopg2.extensions.string_types[1266]
|
|
import re
|
|
import sys
|
|
import time
|
|
import traceback
|
|
from . import context
|
|
|
|
POSITIONAL_RE = re.compile(r'%[a-z]')
|
|
NAMED_RE = re.compile(r'%\(([^\)]+)\)[a-z]')
|
|
|
|
## Globals ##
|
|
_DBopts = None
|
|
# A persistent connection to the database.
|
|
# A new connection will be created whenever
|
|
# Apache forks a new worker, and that connection
|
|
# will be used to service all requests handled
|
|
# by that worker.
|
|
# This probably doesn't need to be a ThreadLocal
|
|
# since Apache is not using threading,
|
|
# but play it safe anyway.
|
|
_DBconn = context.ThreadLocal()
|
|
|
|
class DBWrapper:
|
|
def __init__(self, cnx):
|
|
self.cnx = cnx
|
|
|
|
def __getattr__(self, key):
|
|
if not self.cnx:
|
|
raise Exception('connection is closed')
|
|
return getattr(self.cnx, key)
|
|
|
|
def cursor(self, *args, **kw):
|
|
if not self.cnx:
|
|
raise Exception('connection is closed')
|
|
return CursorWrapper(self.cnx.cursor(*args, **kw))
|
|
|
|
def close(self):
|
|
# Rollback any uncommitted changes and clear the connection so
|
|
# this DBWrapper is no longer usable after close()
|
|
if not self.cnx:
|
|
raise Exception('connection is closed')
|
|
self.cnx.cursor().execute('ROLLBACK')
|
|
#We do this rather than cnx.rollback to avoid opening a new transaction
|
|
#If our connection gets recycled cnx.rollback will be called then.
|
|
self.cnx = None
|
|
|
|
|
|
class CursorWrapper:
|
|
def __init__(self, cursor):
|
|
self.cursor = cursor
|
|
self.logger = logging.getLogger('koji.db')
|
|
|
|
def __getattr__(self, key):
|
|
return getattr(self.cursor, key)
|
|
|
|
def _timed_call(self, method, args, kwargs):
|
|
start = time.time()
|
|
ret = getattr(self.cursor, method)(*args, **kwargs)
|
|
self.logger.debug("%s operation completed in %.4f seconds", method, time.time() - start)
|
|
return ret
|
|
|
|
def fetchone(self, *args, **kwargs):
|
|
return self._timed_call('fetchone', args, kwargs)
|
|
|
|
def fetchall(self, *args, **kwargs):
|
|
return self._timed_call('fetchall', args, kwargs)
|
|
|
|
def quote(self, operation, parameters):
|
|
if hasattr(self.cursor, "mogrify"):
|
|
quote = self.cursor.mogrify
|
|
else:
|
|
quote = lambda a, b: a % b
|
|
try:
|
|
return quote(operation, parameters)
|
|
except Exception:
|
|
self.logger.exception('Unable to quote query:\n%s\nParameters: %s', operation, parameters)
|
|
return "INVALID QUERY"
|
|
|
|
def preformat(self, sql, params):
|
|
"""psycopg2 requires all variable placeholders to use the string (%s) datatype,
|
|
regardless of the actual type of the data. Format the sql string to be compliant.
|
|
It also requires IN parameters to be in tuple rather than list format."""
|
|
sql = POSITIONAL_RE.sub(r'%s', sql)
|
|
sql = NAMED_RE.sub(r'%(\1)s', sql)
|
|
if isinstance(params, dict):
|
|
for name, value in params.items():
|
|
if isinstance(value, list):
|
|
params[name] = tuple(value)
|
|
else:
|
|
if isinstance(params, tuple):
|
|
params = list(params)
|
|
for i, item in enumerate(params):
|
|
if isinstance(item, list):
|
|
params[i] = tuple(item)
|
|
return sql, params
|
|
|
|
def execute(self, operation, parameters=()):
|
|
debug = self.logger.isEnabledFor(logging.DEBUG)
|
|
operation, parameters = self.preformat(operation, parameters)
|
|
if debug:
|
|
self.logger.debug(self.quote(operation, parameters))
|
|
start = time.time()
|
|
try:
|
|
ret = self.cursor.execute(operation, parameters)
|
|
except Exception:
|
|
self.logger.error('Query failed. Query was: %s', self.quote(operation, parameters))
|
|
raise
|
|
if debug:
|
|
self.logger.debug("Execute operation completed in %.4f seconds", time.time() - start)
|
|
return ret
|
|
|
|
|
|
## Functions ##
|
|
def provideDBopts(**opts):
|
|
global _DBopts
|
|
if _DBopts is None:
|
|
_DBopts = dict([i for i in opts.items() if i[1] is not None])
|
|
|
|
def setDBopts(**opts):
|
|
global _DBopts
|
|
_DBopts = opts
|
|
|
|
def getDBopts():
|
|
return _DBopts
|
|
|
|
def connect():
|
|
logger = logging.getLogger('koji.db')
|
|
global _DBconn
|
|
if hasattr(_DBconn, 'conn'):
|
|
# Make sure the previous transaction has been
|
|
# closed. This is safe to call multiple times.
|
|
conn = _DBconn.conn
|
|
try:
|
|
# Under normal circumstances, the last use of this connection
|
|
# will have issued a raw ROLLBACK to close the transaction. To
|
|
# avoid 'no transaction in progress' warnings (depending on postgres
|
|
# configuration) we open a new one here.
|
|
# Should there somehow be a transaction in progress, a second
|
|
# BEGIN will be a harmless no-op, though there may be a warning.
|
|
conn.cursor().execute('BEGIN')
|
|
conn.rollback()
|
|
return DBWrapper(conn)
|
|
except psycopg2.Error:
|
|
del _DBconn.conn
|
|
#create a fresh connection
|
|
opts = _DBopts
|
|
if opts is None:
|
|
opts = {}
|
|
try:
|
|
conn = psycopg2.connect(**opts)
|
|
except Exception:
|
|
logger.error(''.join(traceback.format_exception(*sys.exc_info())))
|
|
raise
|
|
# XXX test
|
|
# return conn
|
|
_DBconn.conn = conn
|
|
|
|
return DBWrapper(conn)
|