- add unit test for koji/util: (coverage 58%->100%)
- minor fix: check if input string time is valid
This commit is contained in:
parent
c496bf994c
commit
efe68e54df
2 changed files with 462 additions and 2 deletions
|
|
@ -2,9 +2,13 @@ from __future__ import absolute_import
|
|||
import mock
|
||||
import unittest
|
||||
from mock import call, patch
|
||||
from datetime import datetime
|
||||
|
||||
import os
|
||||
import time
|
||||
import resource
|
||||
import optparse
|
||||
import calendar
|
||||
import six.moves.configparser
|
||||
import koji
|
||||
import koji.util
|
||||
|
|
@ -488,6 +492,462 @@ class MavenUtilTestCase(unittest.TestCase):
|
|||
config.readfp(conf_file)
|
||||
return config
|
||||
|
||||
def test_formatChangelog(self):
|
||||
"""Test formatChangelog function"""
|
||||
entries = {'date': datetime(2017, 10, 10, 12, 34, 56),
|
||||
'author': 'koji <kojiadmin@localhost.local>',
|
||||
'text': 'This is a test release'}
|
||||
sample = "* Tue Oct 10 2017 {0}\n{1}\n\n".format(
|
||||
entries['author'].encode('utf-8'), entries['text'].encode('utf-8'))
|
||||
self.assertEqual(sample, koji.util.formatChangelog((entries,)))
|
||||
|
||||
def test_parseTime(self):
|
||||
"""Test parseTime function"""
|
||||
now = datetime.now()
|
||||
now_ts = int(calendar.timegm(now.timetuple()))
|
||||
self.assertEqual(1507593600, koji.util.parseTime('2017-10-10'))
|
||||
self.assertEqual(1507638896, koji.util.parseTime('2017-10-10 12:34:56'))
|
||||
self.assertEqual(0, koji.util.parseTime('1970-01-01 00:00:00'))
|
||||
self.assertNotEqual(now_ts, koji.util.parseTime(now.strftime("%Y-%m-%d")))
|
||||
self.assertEqual(now_ts, koji.util.parseTime(now.strftime("%Y-%m-%d %H:%M:%S")))
|
||||
|
||||
# non time format string
|
||||
self.assertEqual(None, koji.util.parseTime('not-a-time-format'))
|
||||
|
||||
time_tests = {
|
||||
# invalid month
|
||||
'2000-13-32': 'month must be in 1..12',
|
||||
# invalid day
|
||||
'2000-12-32': 'day is out of range for month',
|
||||
# invalid hour
|
||||
'2000-12-31 24:61:61': 'hour must be in 0..23',
|
||||
# invalid minute
|
||||
'2000-12-31 23:61:61': 'minute must be in 0..59',
|
||||
# invalid second
|
||||
'2000-12-31 23:59:61': 'second must be in 0..59',
|
||||
# corner case, leap day
|
||||
'1969-2-29': 'day is out of range for month'
|
||||
}
|
||||
|
||||
# invalid date test
|
||||
for args, err in time_tests.items():
|
||||
six.assertRaisesRegex(
|
||||
self, ValueError, err, koji.util.parseTime, args)
|
||||
|
||||
def test_duration(self):
|
||||
"""Test duration function"""
|
||||
start = time.time()
|
||||
self.assertEqual('0:00', koji.util.duration(start))
|
||||
|
||||
# wait for 2 seconds
|
||||
time.sleep(2)
|
||||
self.assertEqual('0:02', koji.util.duration(start))
|
||||
|
||||
def test_printList(self):
|
||||
"""Test printList function"""
|
||||
distro = ['fedora', 'rhel', 'centos', 'opensuse']
|
||||
self.assertEqual('', koji.util.printList([]))
|
||||
self.assertEqual('fedora', koji.util.printList(distro[0:1]))
|
||||
self.assertEqual('fedora and rhel', koji.util.printList(distro[0:2]))
|
||||
self.assertEqual('fedora, rhel, and centos', koji.util.printList(distro[0:3]))
|
||||
|
||||
def test_multi_fnmatch(self):
|
||||
"""Test multi_fnmatch function"""
|
||||
patterns = "example.py example*.py [0-9]*.py [0-9]_*_exmple.py"
|
||||
self.assertTrue(koji.util.multi_fnmatch('example.py', patterns))
|
||||
self.assertTrue(koji.util.multi_fnmatch('example.py', patterns.split()))
|
||||
self.assertTrue(koji.util.multi_fnmatch('01.py', patterns.split()))
|
||||
self.assertTrue(koji.util.multi_fnmatch('01_koji:util_example.py', patterns.split()))
|
||||
self.assertTrue(koji.util.multi_fnmatch('example_01.py', patterns.split()))
|
||||
self.assertFalse(koji.util.multi_fnmatch('sample.py', patterns.split()))
|
||||
|
||||
def test_filedigestAlgo(self):
|
||||
"""Test filedigestAlgo function"""
|
||||
hdr = {koji.RPM_TAG_FILEDIGESTALGO: None}
|
||||
self.assertEqual('md5', koji.util.filedigestAlgo(hdr))
|
||||
|
||||
hdr = {koji.RPM_TAG_FILEDIGESTALGO: 2}
|
||||
self.assertEqual('sha1', koji.util.filedigestAlgo(hdr))
|
||||
|
||||
hdr = {koji.RPM_TAG_FILEDIGESTALGO: 4}
|
||||
self.assertEqual('unknown', koji.util.filedigestAlgo(hdr))
|
||||
|
||||
@mock.patch('os.WEXITSTATUS', return_value=255)
|
||||
@mock.patch('os.WTERMSIG', return_value=19)
|
||||
@mock.patch('os.WIFEXITED')
|
||||
@mock.patch('os.WIFSIGNALED')
|
||||
def test_parseStatus(self, m_signaled, m_exited, m_termsig, m_exit):
|
||||
"""Test parseStatus function"""
|
||||
self.assertEqual('%s was killed by signal %i' % ('test-proc', 19),
|
||||
koji.util.parseStatus(0, 'test-proc'))
|
||||
|
||||
m_signaled.return_value = False
|
||||
self.assertEqual('%s exited with status %i' % ('test-proc', 255),
|
||||
koji.util.parseStatus(0, 'test-proc'))
|
||||
|
||||
m_exited.return_value = False
|
||||
self.assertEqual('%s terminated for unknown reasons' % ('test-proc'),
|
||||
koji.util.parseStatus(0, 'test-proc'))
|
||||
|
||||
for prefix in [['test', 'proc'], ('test', 'proc')]:
|
||||
self.assertEqual(
|
||||
'%s terminated for unknown reasons' % (' '.join(prefix)),
|
||||
koji.util.parseStatus(0, prefix))
|
||||
|
||||
def test_isSuccess(self):
|
||||
"""Test isSuccess function"""
|
||||
with mock.patch('os.WIFEXITED') as m_exit, \
|
||||
mock.patch('os.WEXITSTATUS') as m_exitst:
|
||||
# True case
|
||||
m_exit.return_value, m_exitst.return_value = True, 0
|
||||
self.assertTrue(koji.util.isSuccess(0))
|
||||
|
||||
# False cases
|
||||
m_exit.return_value, m_exitst.return_value = True, 1
|
||||
self.assertFalse(koji.util.isSuccess(0))
|
||||
m_exit.return_value, m_exitst.return_value = False, 255
|
||||
self.assertFalse(koji.util.isSuccess(0))
|
||||
|
||||
def test_call_with_argcheck(self):
|
||||
"""Test call_wit_argcheck function"""
|
||||
func = lambda *args, **kargs: True
|
||||
self.assertTrue(
|
||||
koji.util.call_with_argcheck(
|
||||
func, [1, 2, 3], {'para1': 1, 'para2': 2}))
|
||||
|
||||
# exception tests
|
||||
func = lambda *args, **kargs: \
|
||||
(_ for _ in ()).throw(TypeError('fake-type-error'))
|
||||
six.assertRaisesRegex(self, TypeError, 'fake-type-error',
|
||||
koji.util.call_with_argcheck,
|
||||
func, [1, 2, 3], {'para1': 1, 'para2': 2})
|
||||
|
||||
with mock.patch('sys.exc_info') as m_info:
|
||||
m_info.side_effect = lambda: \
|
||||
[None, None, mock.MagicMock(tb_next=None)]
|
||||
six.assertRaisesRegex(self, koji.ParameterError, 'fake-type-error',
|
||||
koji.util.call_with_argcheck,
|
||||
func, [1, 2, 3])
|
||||
|
||||
def test_dslice(self):
|
||||
"""Test dslice function"""
|
||||
distro = {'fedora': 1, 'rhel': 2, 'centos': 3}
|
||||
self.assertEqual({'fedora': 1}, koji.util.dslice(distro, ['fedora']))
|
||||
|
||||
# slice with non exist key,
|
||||
# if strict bit is not set, empty dict should be returned.
|
||||
self.assertEqual({}, koji.util.dslice(distro, ['debian'], False))
|
||||
# if strict bit is set, KeyError should be raised
|
||||
self.assertRaises(KeyError, koji.util.dslice, distro, ['debian'])
|
||||
|
||||
def test_dslice_ex(self):
|
||||
"""Test dslice_ex function"""
|
||||
distro = {'fedora': 1, 'rhel': 2, 'centos': 3}
|
||||
self.assertEqual({'rhel': 2, 'centos': 3},
|
||||
koji.util.dslice_ex(distro, ['fedora']))
|
||||
|
||||
# slice with non exist key,
|
||||
# if strict bit is not set, original dict should be returned
|
||||
self.assertEqual(distro, koji.util.dslice_ex(distro, ['debian'], False))
|
||||
# if strict bit is set, KeyError should be raised
|
||||
self.assertRaises(KeyError, koji.util.dslice_ex, distro, ['debian'])
|
||||
|
||||
def test_checkForBuilds(self):
|
||||
"""Test checkForBuilds function"""
|
||||
builds = [koji.parse_NVR("pkg-1-r1"),
|
||||
koji.parse_NVR("pkg-1-r2"),
|
||||
koji.parse_NVR("pkg-1.1-r1")]
|
||||
latest_builds = [koji.parse_NVR("pkg-1.1-r1")]
|
||||
|
||||
session = mock.MagicMock()
|
||||
session.getLatestBuilds = mock.Mock(return_value=latest_builds)
|
||||
session.listTagged = mock.Mock(return_value=builds)
|
||||
event = mock.MagicMock()
|
||||
|
||||
# latest bit check
|
||||
self.assertTrue(koji.util.checkForBuilds(
|
||||
session, 'fedora', (koji.parse_NVR('pkg-1.1-r1'),),
|
||||
event, latest=True))
|
||||
self.assertFalse(koji.util.checkForBuilds(
|
||||
session, 'fedora', (koji.parse_NVR('pkg-1.0-r2'),),
|
||||
event, latest=True))
|
||||
|
||||
# all elemnts in builds should exist.
|
||||
for b in builds:
|
||||
self.assertTrue(
|
||||
koji.util.checkForBuilds(session, "pkg-build", (b,), event))
|
||||
|
||||
# non exist build test.
|
||||
self.assertEqual(False, koji.util.checkForBuilds(
|
||||
session, "pkg-build",
|
||||
(koji.parse_NVR("pkg-1.0-r1"),), event))
|
||||
|
||||
def test_LazyValue(self):
|
||||
"""Test LazyValue object"""
|
||||
init, base, incr = 0, 1, 0
|
||||
lv = koji.util.LazyValue(
|
||||
lambda x, offset=0: base + x + offset,
|
||||
(init,),
|
||||
{'offset': incr})
|
||||
self.assertEqual(init + base + incr, lv.get())
|
||||
|
||||
base = 2
|
||||
self.assertEqual(init + base + incr, lv.get())
|
||||
|
||||
# cache bit test
|
||||
init, base, incr = 1, 2, 3
|
||||
lv = koji.util.LazyValue(
|
||||
lambda x, offset=0: base + x + offset,
|
||||
(init,),
|
||||
{'offset': incr},
|
||||
cache=True)
|
||||
self.assertEqual(init + base + incr, lv.get())
|
||||
|
||||
base = 3
|
||||
|
||||
# lv.get should return cached value: 6
|
||||
self.assertNotEqual(init + base + incr, lv.get())
|
||||
|
||||
def test_LazyString(self):
|
||||
"""Test LazyString object"""
|
||||
fmt = '[{timestamp}] {greeting} {0}'
|
||||
timestamp = int(time.time())
|
||||
|
||||
lstr = koji.util.LazyString(
|
||||
lambda fmt, *args, **kwargs:
|
||||
fmt.format(*args, timestamp=timestamp, **kwargs),
|
||||
(fmt, 'koji'),
|
||||
{'greeting': 'hello'})
|
||||
|
||||
self.assertEqual(
|
||||
fmt.format('koji', timestamp=timestamp, greeting='hello'),
|
||||
str(lstr))
|
||||
|
||||
# non cached string should be different
|
||||
prev_str = str(lstr)
|
||||
timestamp += 100
|
||||
self.assertNotEqual(prev_str, str(lstr))
|
||||
|
||||
# enable caching
|
||||
lstr = koji.util.LazyString(
|
||||
lambda fmt, *args, **kwargs:
|
||||
fmt.format(*args, timestamp=timestamp, **kwargs),
|
||||
(fmt, 'koji'),
|
||||
{'greeting': 'hello'},
|
||||
cache=True)
|
||||
|
||||
prev_str = str(lstr)
|
||||
timestamp += 10
|
||||
self.assertEqual(prev_str, str(lstr))
|
||||
|
||||
def test_LazyDict(self):
|
||||
"""Test LazyDict object"""
|
||||
name = None
|
||||
release = None
|
||||
date = None
|
||||
|
||||
# Testing on cache bit enabled.
|
||||
ldict = koji.util.LazyDict({})
|
||||
ldict.lazyset('name', lambda: name, (), cache=True)
|
||||
|
||||
name = 'fedora'
|
||||
self.assertEqual(name, ldict['name'])
|
||||
|
||||
# cached, ldict['name'] should not be changed
|
||||
name = 'rhel'
|
||||
self.assertNotEqual(name, ldict.get('name'))
|
||||
|
||||
# Testing on cahce bit disabled.
|
||||
ldict['name'] = koji.util.LazyValue(lambda: name, ())
|
||||
ldict['release'] = koji.util.LazyValue(lambda: release, ())
|
||||
ldict['date'] = koji.util.LazyValue(lambda: date, ())
|
||||
|
||||
name, release, date = 'fedora', 26, datetime.now().strftime('%Y%m%d')
|
||||
data = {'name': name, 'release': release, 'date': date}
|
||||
six.assertCountEqual(self, data.items(), ldict.items())
|
||||
six.assertCountEqual(self, data.items(), [v for v in ldict.iteritems()])
|
||||
|
||||
name, release, date = 'rhel', 7, '20171012'
|
||||
six.assertCountEqual(self, [name, release, date], ldict.values())
|
||||
six.assertCountEqual(self, [name, release, date], [v for v in ldict.itervalues()])
|
||||
|
||||
data = {'name': name, 'release': release, 'date': date}
|
||||
self.assertEqual(name, ldict.pop('name'))
|
||||
data.pop('name')
|
||||
six.assertCountEqual(self, data.items(), ldict.items())
|
||||
|
||||
(key, value) = ldict.popitem()
|
||||
data.pop(key)
|
||||
six.assertCountEqual(self, data.items(), ldict.items())
|
||||
|
||||
ldict_copy = ldict.copy()
|
||||
six.assertCountEqual(self, data.items(), ldict_copy.items())
|
||||
|
||||
def test_LazyRecord(self):
|
||||
"""Test LazyRecord object"""
|
||||
# create a list object with lazy attribute
|
||||
lobj = koji.util.LazyRecord(list)
|
||||
six.assertRaisesRegex(
|
||||
self, TypeError, 'object does not support lazy attributes',
|
||||
koji.util.lazysetattr, self, 'value', lambda x: x, (100,))
|
||||
|
||||
base, init, inc = 10, 1, 0
|
||||
koji.util.lazysetattr(
|
||||
lobj, 'lz_value',
|
||||
lambda x, offset=0: base + x + inc,
|
||||
(init, ),
|
||||
{'offset': inc},
|
||||
cache=True)
|
||||
|
||||
self.assertEqual(base + init + inc, lobj.lz_value)
|
||||
|
||||
# try to access non exist attribute data, AttributeError should raise
|
||||
self.assertRaises(AttributeError, getattr, lobj, 'data')
|
||||
|
||||
def test_HiddenValue(self):
|
||||
"""Test Hidd object"""
|
||||
hv = koji.util.HiddenValue('the plain text message')
|
||||
self.assertEqual('[value hidden]', str(hv))
|
||||
self.assertEqual('HiddenValue()', repr(hv))
|
||||
|
||||
hv2 = koji.util.HiddenValue(hv)
|
||||
self.assertEqual(hv2.value, hv.value)
|
||||
self.assertEqual('[value hidden]', str(hv2))
|
||||
self.assertEqual('HiddenValue()', repr(hv2))
|
||||
|
||||
def test_relpath(self):
|
||||
"""Test _relpath function"""
|
||||
self.assertRaises(ValueError, koji.util._relpath, None)
|
||||
self.assertRaises(ValueError, koji.util._relpath, "")
|
||||
|
||||
# _relpath is a backport of os.path.relpath
|
||||
# their behaviors and outputs should be the same.
|
||||
for p in ["/", ".", "..", "/bin", "\0", "\n", "\t/tmp"]:
|
||||
for s in [os.curdir, '/tmp']:
|
||||
self.assertEqual(os.path.relpath(p, s), koji.util._relpath(p, s))
|
||||
|
||||
def test_eventFromOpts(self):
|
||||
"""Test eventFromOpts function"""
|
||||
timestamp = datetime.now().strftime('%s')
|
||||
session = mock.MagicMock()
|
||||
event = mock.MagicMock(event=20171010, ts=timestamp, repo=1)
|
||||
|
||||
repo_info = {'create_event': 20171010,
|
||||
'create_ts': timestamp}
|
||||
|
||||
session.getEvent = lambda *args, **kwargs: event if args[0] == 20171010 else None
|
||||
session.getLastEvent = lambda *args, **kwargs: event
|
||||
session.repoInfo = lambda *args, **kwargs: repo_info if args[0] == 1 else None
|
||||
|
||||
# opts.event = 20171010
|
||||
opts = mock.MagicMock(event=20171010)
|
||||
self.assertEqual(event, koji.util.eventFromOpts(session, opts))
|
||||
|
||||
# opts.event = 12345678, non exist event
|
||||
opts = mock.MagicMock(event=12345678)
|
||||
self.assertEqual(None, koji.util.eventFromOpts(session, opts))
|
||||
|
||||
# opts.ts = timestamp
|
||||
opts = mock.MagicMock(event='', ts=timestamp)
|
||||
self.assertEqual(event, koji.util.eventFromOpts(session, opts))
|
||||
|
||||
# opts.repo = '1'
|
||||
opts = mock.MagicMock(event='', ts='', repo=1)
|
||||
expect = {'id': repo_info['create_event'],
|
||||
'ts': repo_info['create_ts']}
|
||||
|
||||
actual = koji.util.eventFromOpts(session, opts)
|
||||
self.assertNotEqual(None, actual)
|
||||
six.assertCountEqual(self, expect.items(), actual.items())
|
||||
|
||||
# no event is matched case
|
||||
opts = mock.MagicMock(event=0, ts=0, repo=0)
|
||||
self.assertEqual(None, koji.util.eventFromOpts(session, opts))
|
||||
|
||||
def test_setup_rlimits(self):
|
||||
"""Test test_setup_rlimits function"""
|
||||
logger = mock.MagicMock()
|
||||
options = {
|
||||
'RLIMIT_AS': '',
|
||||
'RLIMIT_CORE': '0',
|
||||
'RLIMIT_CPU': '',
|
||||
'RLIMIT_DATA': '4194304',
|
||||
'RLIMIT_FSIZE': '0',
|
||||
'RLIMIT_MEMLOCK': '',
|
||||
'RLIMIT_NOFILE': '768',
|
||||
'RLIMIT_NPROC': '3',
|
||||
'RLIMIT_OFILE': '',
|
||||
'RLIMIT_RSS': '',
|
||||
'RLIMIT_STACK': '4194304'
|
||||
}
|
||||
|
||||
# create a resource token <--> id lookup table
|
||||
rlimit_lookup = {getattr(resource, k): k for k in options}
|
||||
|
||||
def _getrlimit(res):
|
||||
return (options.get(rlimit_lookup[res], None), 0)
|
||||
|
||||
def _setrlimit(res, limits):
|
||||
results[rlimit_lookup[res]] = str(limits[0])
|
||||
|
||||
results = {k: '' for k, v in options.items()}
|
||||
with mock.patch('resource.setrlimit') as m_set, \
|
||||
mock.patch('resource.getrlimit') as m_get:
|
||||
m_get.side_effect = ValueError('resource.getrlimit-value-error')
|
||||
six.assertRaisesRegex(self, ValueError, 'resource.getrlimit-value-error',
|
||||
koji.util.setup_rlimits, options, logger)
|
||||
|
||||
m_get.side_effect = _getrlimit
|
||||
|
||||
# logger.error test
|
||||
koji.util.setup_rlimits({'RLIMIT_AS': 'abcde'}, logger)
|
||||
logger.error.assert_called_with('Invalid resource limit: %s=%s',
|
||||
'RLIMIT_AS',
|
||||
'abcde')
|
||||
|
||||
koji.util.setup_rlimits({'RLIMIT_AS': '1 2 3 4 5'}, logger)
|
||||
logger.error.assert_called_with('Invalid resource limit: %s=%s',
|
||||
'RLIMIT_AS',
|
||||
'1 2 3 4 5')
|
||||
|
||||
# exception and logger.error test
|
||||
m_set.side_effect = ValueError('resource.setrlimit-value-error')
|
||||
koji.util.setup_rlimits({'RLIMIT_AS': '0'}, logger)
|
||||
logger.error.assert_called_with('Unable to set %s: %s',
|
||||
'RLIMIT_AS',
|
||||
m_set.side_effect)
|
||||
|
||||
# run setrlimit test, the results should be equal to options
|
||||
m_set.side_effect = _setrlimit
|
||||
|
||||
# make some noise in options
|
||||
test_opt = dict(options)
|
||||
test_opt.update({
|
||||
'RLIMIT_CUSTOM': 'fake_rlimit_key',
|
||||
'DBName': 'koji',
|
||||
'DBUser': 'koji',
|
||||
'KojiDir': '/mnt/koji',
|
||||
'KojiDebug': True})
|
||||
|
||||
koji.util.setup_rlimits(test_opt, logger)
|
||||
six.assertCountEqual(self, results, options)
|
||||
|
||||
def test_adler32_constructor(self):
|
||||
"""Test adler32_constructor function"""
|
||||
chksum = koji.util.adler32_constructor('Wikipedia') # checksum is 300286872
|
||||
self.assertEqual(300286872, chksum.digest())
|
||||
self.assertEqual('%08x' % (300286872), chksum.hexdigest())
|
||||
|
||||
copy = chksum.copy()
|
||||
self.assertEqual(copy.digest(), chksum.digest())
|
||||
self.assertNotEqual(copy, chksum)
|
||||
|
||||
chksum.update('test') # checksum is equal to adler32(b'test', 300286872)
|
||||
self.assertNotEqual(300286872, chksum.digest())
|
||||
self.assertNotEqual(copy.digest(), chksum.digest())
|
||||
self.assertEqual(614401368, chksum.digest())
|
||||
|
||||
|
||||
class TestRmtree(unittest.TestCase):
|
||||
@patch('koji.util._rmtree')
|
||||
@patch('os.rmdir')
|
||||
|
|
@ -593,7 +1053,6 @@ class TestRmtree(unittest.TestCase):
|
|||
rmdir.assert_has_calls([call('b'), call('a')])
|
||||
chdir.assert_has_calls([call('b'), call('..'), call('a'), call('..')])
|
||||
|
||||
|
||||
@patch('os.listdir')
|
||||
@patch('os.lstat')
|
||||
@patch('stat.S_ISDIR')
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue