authentik/passbook/core/rules.py

48 lines
1.4 KiB
Python
Raw Normal View History

"""passbook core rule engine"""
2018-11-30 13:33:33 +00:00
from logging import getLogger
from celery import group
from passbook.core.celery import CELERY_APP
from passbook.core.models import Rule, User
2018-11-30 13:33:33 +00:00
LOGGER = getLogger(__name__)
@CELERY_APP.task()
2019-02-10 19:09:47 +00:00
def _rule_engine_task(user_pk, rule_pk, **kwargs):
2018-11-30 13:33:33 +00:00
"""Task wrapper to run rule checking"""
rule_obj = Rule.objects.filter(pk=rule_pk).select_subclasses().first()
user_obj = User.objects.get(pk=user_pk)
2019-02-10 19:09:47 +00:00
for key, value in kwargs.items():
setattr(user_obj, key, value)
2018-11-30 13:33:33 +00:00
LOGGER.debug("Running rule `%s`#%s for user %s...", rule_obj.name, rule_obj.pk.hex, user_obj)
return rule_obj.passes(user_obj)
class RuleEngine:
"""Orchestrate rule checking, launch tasks and return result"""
2019-02-10 19:09:47 +00:00
rules = None
_group = None
2019-02-10 19:09:47 +00:00
def __init__(self, rules):
self.rules = rules
def for_user(self, user):
"""Check rules for user"""
signatures = []
2019-02-10 19:09:47 +00:00
kwargs = {
'__password__': getattr(user, '__password__')
}
for rule in self.rules:
signatures.append(_rule_engine_task.s(user.pk, rule.pk.hex, **kwargs))
2018-11-30 13:33:33 +00:00
self._group = group(signatures)()
return self
@property
def result(self):
"""Get rule-checking result"""
2018-11-30 13:33:33 +00:00
for rule_result in self._group.get():
if rule_result is False:
return False
return True