authentik/passbook/flows/planner.py

71 lines
2.4 KiB
Python
Raw Normal View History

"""Flows Planner"""
from dataclasses import dataclass, field
from time import time
from typing import Any, Dict, List, Tuple
from django.http import HttpRequest
from structlog import get_logger
from passbook.flows.exceptions import FlowNonApplicableError
from passbook.flows.models import Factor, Flow
from passbook.policies.engine import PolicyEngine
LOGGER = get_logger()
PLAN_CONTEXT_PENDING_USER = "pending_user"
PLAN_CONTEXT_SSO = "is_sso"
@dataclass
class FlowPlan:
"""This data-class is the output of a FlowPlanner. It holds a flat list
of all Factors that should be run."""
factors: List[Factor] = field(default_factory=list)
context: Dict[str, Any] = field(default_factory=dict)
def next(self) -> Factor:
"""Return next pending factor from the bottom of the list"""
factor_cls = self.factors.pop(0)
return factor_cls
class FlowPlanner:
"""Execute all policies to plan out a flat list of all Factors
that should be applied."""
flow: Flow
def __init__(self, flow: Flow):
self.flow = flow
def _check_flow_root_policies(self, request: HttpRequest) -> Tuple[bool, List[str]]:
engine = PolicyEngine(self.flow.policies.all(), request.user, request)
engine.build()
return engine.result
def plan(self, request: HttpRequest) -> FlowPlan:
"""Check each of the flows' policies, check policies for each factor with PolicyBinding
and return ordered list"""
LOGGER.debug("Starting planning process", flow=self.flow)
start_time = time()
plan = FlowPlan()
# First off, check the flow's direct policy bindings
# to make sure the user even has access to the flow
root_passing, root_passing_messages = self._check_flow_root_policies(request)
if not root_passing:
raise FlowNonApplicableError(root_passing_messages)
# Check Flow policies
for factor in self.flow.factors.order_by("order").select_subclasses():
engine = PolicyEngine(factor.policies.all(), request.user, request)
engine.build()
passing, _ = engine.result
if passing:
LOGGER.debug("Factor passing", factor=factor)
plan.factors.append(factor)
end_time = time()
LOGGER.debug(
"Finished planning", flow=self.flow, duration_s=end_time - start_time
)
return plan