debian-koji/koji/policy.py
2024-04-24 15:00:45 +02:00

462 lines
13 KiB
Python

# Copyright (c) 2008-2014 Red Hat, Inc.
#
# Koji is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation;
# version 2.1 of the License.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this software; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
#
# Authors:
# Mike McLean <mikem@redhat.com>
from __future__ import absolute_import
import fnmatch
import logging
import six
import koji
from koji.util import to_list, multi_fnmatch
class BaseSimpleTest(object):
"""Abstract base class for simple tests"""
# Provide the name of the test
name = None
def __init__(self, str):
"""Read the test parameters from string"""
self.str = str
def run(self, data):
"""Run the test against data provided"""
raise NotImplementedError
def __str__(self):
return self.str
# The following tests are generic enough that we can place them here
class TrueTest(BaseSimpleTest):
name = 'true'
def run(self, data):
return True
class FalseTest(BaseSimpleTest):
name = 'false'
def run(self, data):
return False
class AllTest(TrueTest):
name = 'all'
# alias for true
class NoneTest(FalseTest):
name = 'none'
# alias for false
class HasTest(BaseSimpleTest):
"""Test if policy data contains a field"""
name = "has"
def __init__(self, str):
super(HasTest, self).__init__(str)
try:
self.field = str.split()[1]
except IndexError:
raise koji.GenericError("Invalid or missing field in policy test")
def run(self, data):
return self.field in data
class BoolTest(BaseSimpleTest):
"""Test a field in the data as a boolean value
This test can be used as-is, or it can be subclassed to
test a specific field
Syntax:
name [field]
"""
name = 'bool'
field = None
def run(self, data):
args = self.str.split()[1:]
if self.field is None:
field = args[0]
else:
# expected when we are subclassed
field = self.field
if field not in data:
return False
return bool(data[field])
class MatchTest(BaseSimpleTest):
"""Matches a field in the data against glob patterns
True if any of the expressions match, else False
This test can be used as-is, or it can be subclassed to
test a specific field
Syntax:
name [field] pattern1 [pattern2 ...]
"""
name = 'match'
field = None
def run(self, data):
args = self.str.split()[1:]
if self.field is None:
field = args[0]
args = args[1:]
else:
# expected when we are subclassed
field = self.field
if field not in data:
return False
value = data[field]
if value is None:
# None does not match any pattern
return False
for pattern in args:
if fnmatch.fnmatch(value, pattern):
return True
return False
class MatchAnyTest(BaseSimpleTest):
"""Matches any item of a list/tuple/set value in the data against glob patterns
True if any of the expressions matches any item in the list/tuple/set, else False.
If the field doesn't exist or isn't a list/tuple/set, the test returns False
Syntax:
find field pattern1 [pattern2 ...]
"""
name = 'match_any'
field = None
def run(self, data):
args = self.str.split()[1:]
self.field = args[0]
args = args[1:]
tgt = data.get(self.field)
if tgt and isinstance(tgt, (list, tuple, set)):
for i in tgt:
if i is not None and multi_fnmatch(str(i), args):
return True
return False
class MatchAllTest(BaseSimpleTest):
"""Matches all items of a list/tuple/set value in the data against glob patterns
True if any of the expressions matches all items in the list/tuple/set, else False.
If the field doesn't exist or isn't a list/tuple/set, the test returns False
Syntax:
match_all field pattern1 [pattern2 ...]
"""
name = 'match_all'
field = None
def run(self, data):
args = self.str.split()[1:]
self.field = args[0]
args = args[1:]
tgt = data.get(self.field)
if tgt and isinstance(tgt, (list, tuple, set)):
for i in tgt:
if i is None or not multi_fnmatch(str(i), args):
return False
return True
return False
class TargetTest(MatchTest):
"""Matches target in the data against glob patterns
True if any of the expressions match, else False
Syntax:
target pattern1 [pattern2 ...]
"""
name = 'target'
field = 'target'
class CompareTest(BaseSimpleTest):
"""Simple numeric field comparison
Supports basic numeric comparisons. The right operand must be a valid number
This test can be used as-is, or it can be subclassed to
test a specific field
Syntax:
name [field] OP number
"""
name = 'compare'
field = None
allow_float = True
operators = {
'<': lambda a, b: a < b,
'>': lambda a, b: a > b,
'<=': lambda a, b: a <= b,
'>=': lambda a, b: a >= b,
'=': lambda a, b: a == b,
'!=': lambda a, b: a != b,
}
def __init__(self, str):
"""Read the test parameters from string"""
super(CompareTest, self).__init__(str)
if self.field is None:
# field OP number
self.field, cmp, value = str.split(None, 3)[1:]
else:
# OP number
cmp, value = str.split(None, 2)[1:]
self.func = self.operators.get(cmp, None)
if self.func is None:
raise koji.GenericError("Invalid comparison in test.")
try:
self.value = int(value)
except ValueError:
if not self.allow_float:
raise
self.value = float(value)
def run(self, data):
if self.field not in data:
return False
return self.func(data[self.field], self.value)
class SimpleRuleSet(object):
def __init__(self, rules, tests):
self.tests = tests
self.rules = self.parse_rules(rules)
self.lastrule = None
self.lastaction = None
self.logger = logging.getLogger('koji.policy')
def parse_rules(self, lines):
"""Parse rules into a ruleset data structure
At the top level, the structure is a set of rules
[rule1, rule2, ...]
Each rule is a pair
[tests, negate, action ]
Tests is a list of test handlers:
[handler1, handler2, ...]
Action can either be a string or a chained ruleset
"action"
or
[subrule1, subrule2, ...]
Putting it all together, you get something like this:
[[[test1, test2], negate, "action"],
[[test], negate,
[[[test1, test2], negate, "action"],
[[test1, test2, test3], negate
[[[test1, test2], negate, "action"]]]]]]
"""
cursor = []
self.ruleset = cursor
stack = []
for line in lines:
rule = self.parse_line(line)
if rule is None:
# blank/etc
continue
tests, negate, action = rule
if action == '{':
# nested rules
child = []
cursor.append([tests, negate, child])
stack.append(cursor)
cursor = child
elif action == '}':
if not stack:
raise koji.GenericError("nesting error in rule set")
cursor = stack.pop()
else:
cursor.append(rule)
if stack:
# unclosed {
raise koji.GenericError("nesting error in rule set")
def parse_line(self, line):
"""Parse line as a rule
Expected format is:
test [params] [&& test [params] ...] :: action-if-true
test [params] [&& test [params] ...] !! action-if-false
(syntax is !! instead of ||, because otherwise folks might think
they can mix && and ||, which is /not/ supported)
For complex rules:
test [params [&& ...]] :: {
test [params [&& ...]] :: action
test [params [&& ...]] :: {
...
}
}
Each closing brace must be on a line by itself
"""
line = line.split('#', 1)[0].strip()
if not line:
# blank or all comment
return None
if line == '}':
return None, False, '}'
# ?? allow }} ??
negate = False
pos = line.rfind('::')
if pos == -1:
pos = line.rfind('!!')
if pos == -1:
raise Exception("bad policy line: %s" % line)
negate = True
tests = line[:pos]
action = line[pos + 2:]
tests = [self.get_test_handler(x) for x in tests.split('&&')]
action = action.strip()
# just return action = { for nested rules
return tests, negate, action
def get_test_handler(self, str):
name = str.split(None, 1)[0]
try:
return self.tests[name](str)
except KeyError:
raise koji.GenericError("missing test handler: %s" % name)
def all_actions(self):
"""report a list of all actions in the ruleset
(only the first word of the action is considered)
"""
def _recurse(rules, index):
for tests, negate, action in rules:
if isinstance(action, list):
_recurse(action, index)
else:
name = action.split(None, 1)[0]
index[name] = 1
index = {}
_recurse(self.ruleset, index)
return to_list(index.keys())
def _apply(self, rules, data, top=False):
for tests, negate, action in rules:
if top:
self.lastrule = []
value = False
for test in tests:
check = test.run(data)
self.logger.debug("%s -> %s", test, check)
if not check:
break
else:
# all tests in current rule passed
value = True
if negate:
value = not value
if value:
self.lastrule.append([tests, negate])
if isinstance(action, list):
self.logger.debug("matched: entering subrule")
# action is a list of subrules
ret = self._apply(action, data)
if ret is not None:
return ret
# if ret is None, then none of the subrules matched,
# so we keep going
else:
self.logger.debug("matched: action=%s", action)
return action
return None
def apply(self, data):
self.logger.debug("policy start")
self.lastrule = []
self.lastaction = self._apply(self.ruleset, data, top=True)
self.logger.debug("policy done")
return self.lastaction
def last_rule(self):
if self.lastrule is None:
return None
ret = []
for (tests, negate) in self.lastrule:
line = '&&'.join([str(t) for t in tests])
if negate:
line += '!! '
else:
line += ':: '
ret.append(line)
ret = '... '.join(ret)
if self.lastaction is None:
ret += "(no match)"
else:
ret += self.lastaction
return ret
def findSimpleTests(namespace):
"""Search namespace for subclasses of BaseSimpleTest
This is a convenience function for initializing a SimpleRuleSet instance
namespace can be a dict (e.g. globals()), or a list of dicts
returns a dictionary of the found subclasses, indexed by name
"""
if not isinstance(namespace, (list, tuple)):
namespace = (namespace,)
ret = {}
for ns in namespace:
for key, value in six.iteritems(ns):
if value is BaseSimpleTest:
# skip this abstract base class if we encounter it
# this module contains generic tests, so it is valid to include it
# in the namespace list
continue
if isinstance(value, type(BaseSimpleTest)) and issubclass(value, BaseSimpleTest):
name = getattr(value, 'name', None)
if not name:
# use the class name
name = key
# but trim 'Test' from the end
if name.endswith('Test') and len(name) > 4:
name = name[:-4]
ret.setdefault(name, value)
# ...so first test wins in case of name overlap
return ret