198 lines
6.5 KiB
Python
198 lines
6.5 KiB
Python
# python library
|
|
|
|
# db utilities for koji
|
|
# Copyright (c) 2005-2014 Red Hat, Inc.
|
|
#
|
|
# Koji is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation;
|
|
# version 2.1 of the License.
|
|
#
|
|
# This software is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public
|
|
# License along with this software; if not, write to the Free Software
|
|
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
|
|
#
|
|
# Authors:
|
|
# Mike McLean <mikem@redhat.com>
|
|
|
|
|
|
from __future__ import absolute_import
|
|
|
|
import logging
|
|
# import psycopg2.extensions
|
|
# # don't convert timestamp fields to DateTime objects
|
|
# del psycopg2.extensions.string_types[1114]
|
|
# del psycopg2.extensions.string_types[1184]
|
|
# del psycopg2.extensions.string_types[1082]
|
|
# del psycopg2.extensions.string_types[1083]
|
|
# del psycopg2.extensions.string_types[1266]
|
|
import re
|
|
import sys
|
|
import time
|
|
import traceback
|
|
|
|
import psycopg2
|
|
|
|
from . import context
|
|
|
|
POSITIONAL_RE = re.compile(r'%[a-z]')
|
|
NAMED_RE = re.compile(r'%\(([^\)]+)\)[a-z]')
|
|
|
|
## Globals ##
|
|
_DBopts = None
|
|
# A persistent connection to the database.
|
|
# A new connection will be created whenever
|
|
# Apache forks a new worker, and that connection
|
|
# will be used to service all requests handled
|
|
# by that worker.
|
|
# This probably doesn't need to be a ThreadLocal
|
|
# since Apache is not using threading,
|
|
# but play it safe anyway.
|
|
_DBconn = context.ThreadLocal()
|
|
|
|
|
|
class DBWrapper:
|
|
def __init__(self, cnx):
|
|
self.cnx = cnx
|
|
|
|
def __getattr__(self, key):
|
|
if not self.cnx:
|
|
raise Exception('connection is closed')
|
|
return getattr(self.cnx, key)
|
|
|
|
def cursor(self, *args, **kw):
|
|
if not self.cnx:
|
|
raise Exception('connection is closed')
|
|
return CursorWrapper(self.cnx.cursor(*args, **kw))
|
|
|
|
def close(self):
|
|
# Rollback any uncommitted changes and clear the connection so
|
|
# this DBWrapper is no longer usable after close()
|
|
if not self.cnx:
|
|
raise Exception('connection is closed')
|
|
self.cnx.cursor().execute('ROLLBACK')
|
|
# We do this rather than cnx.rollback to avoid opening a new transaction
|
|
# If our connection gets recycled cnx.rollback will be called then.
|
|
self.cnx = None
|
|
|
|
|
|
class CursorWrapper:
|
|
def __init__(self, cursor):
|
|
self.cursor = cursor
|
|
self.logger = logging.getLogger('koji.db')
|
|
|
|
def __getattr__(self, key):
|
|
return getattr(self.cursor, key)
|
|
|
|
def _timed_call(self, method, args, kwargs):
|
|
start = time.time()
|
|
ret = getattr(self.cursor, method)(*args, **kwargs)
|
|
self.logger.debug("%s operation completed in %.4f seconds", method, time.time() - start)
|
|
return ret
|
|
|
|
def fetchone(self, *args, **kwargs):
|
|
return self._timed_call('fetchone', args, kwargs)
|
|
|
|
def fetchall(self, *args, **kwargs):
|
|
return self._timed_call('fetchall', args, kwargs)
|
|
|
|
def quote(self, operation, parameters):
|
|
if hasattr(self.cursor, "mogrify"):
|
|
quote = self.cursor.mogrify
|
|
else:
|
|
def quote(a, b):
|
|
return a % b
|
|
try:
|
|
return quote(operation, parameters)
|
|
except Exception:
|
|
self.logger.exception('Unable to quote query:\n%s\nParameters: %s', operation, parameters)
|
|
return "INVALID QUERY"
|
|
|
|
def preformat(self, sql, params):
|
|
"""psycopg2 requires all variable placeholders to use the string (%s) datatype,
|
|
regardless of the actual type of the data. Format the sql string to be compliant.
|
|
It also requires IN parameters to be in tuple rather than list format."""
|
|
sql = POSITIONAL_RE.sub(r'%s', sql)
|
|
sql = NAMED_RE.sub(r'%(\1)s', sql)
|
|
if isinstance(params, dict):
|
|
for name, value in params.items():
|
|
if isinstance(value, list):
|
|
params[name] = tuple(value)
|
|
else:
|
|
if isinstance(params, tuple):
|
|
params = list(params)
|
|
for i, item in enumerate(params):
|
|
if isinstance(item, list):
|
|
params[i] = tuple(item)
|
|
return sql, params
|
|
|
|
def execute(self, operation, parameters=()):
|
|
debug = self.logger.isEnabledFor(logging.DEBUG)
|
|
operation, parameters = self.preformat(operation, parameters)
|
|
if debug:
|
|
self.logger.debug(self.quote(operation, parameters))
|
|
start = time.time()
|
|
try:
|
|
ret = self.cursor.execute(operation, parameters)
|
|
except Exception:
|
|
self.logger.error('Query failed. Query was: %s', self.quote(operation, parameters))
|
|
raise
|
|
if debug:
|
|
self.logger.debug("Execute operation completed in %.4f seconds", time.time() - start)
|
|
return ret
|
|
|
|
|
|
## Functions ##
|
|
def provideDBopts(**opts):
|
|
global _DBopts
|
|
if _DBopts is None:
|
|
_DBopts = dict([i for i in opts.items() if i[1] is not None])
|
|
|
|
|
|
def setDBopts(**opts):
|
|
global _DBopts
|
|
_DBopts = opts
|
|
|
|
|
|
def getDBopts():
|
|
return _DBopts
|
|
|
|
|
|
def connect():
|
|
logger = logging.getLogger('koji.db')
|
|
global _DBconn
|
|
if hasattr(_DBconn, 'conn'):
|
|
# Make sure the previous transaction has been
|
|
# closed. This is safe to call multiple times.
|
|
conn = _DBconn.conn
|
|
try:
|
|
# Under normal circumstances, the last use of this connection
|
|
# will have issued a raw ROLLBACK to close the transaction. To
|
|
# avoid 'no transaction in progress' warnings (depending on postgres
|
|
# configuration) we open a new one here.
|
|
# Should there somehow be a transaction in progress, a second
|
|
# BEGIN will be a harmless no-op, though there may be a warning.
|
|
conn.cursor().execute('BEGIN')
|
|
conn.rollback()
|
|
return DBWrapper(conn)
|
|
except psycopg2.Error:
|
|
del _DBconn.conn
|
|
# create a fresh connection
|
|
opts = _DBopts
|
|
if opts is None:
|
|
opts = {}
|
|
try:
|
|
conn = psycopg2.connect(**opts)
|
|
except Exception:
|
|
logger.error(''.join(traceback.format_exception(*sys.exc_info())))
|
|
raise
|
|
# XXX test
|
|
# return conn
|
|
_DBconn.conn = conn
|
|
|
|
return DBWrapper(conn)
|