authentik/passbook/sources/saml/processors/response.py

198 lines
7.9 KiB
Python

"""passbook saml source processor"""
from typing import TYPE_CHECKING, Dict
from defusedxml import ElementTree
from django.core.cache import cache
from django.core.exceptions import SuspiciousOperation
from django.http import HttpRequest, HttpResponse
from signxml import XMLVerifier
from structlog import get_logger
from passbook.core.models import User
from passbook.flows.models import Flow
from passbook.flows.planner import (
PLAN_CONTEXT_PENDING_USER,
PLAN_CONTEXT_SSO,
FlowPlanner,
)
from passbook.flows.views import SESSION_KEY_PLAN
from passbook.lib.utils.urls import redirect_with_qs
from passbook.policies.utils import delete_none_keys
from passbook.providers.saml.utils.encoding import decode_base64_and_inflate
from passbook.sources.saml.exceptions import (
MismatchedRequestID,
MissingSAMLResponse,
UnsupportedNameIDFormat,
)
from passbook.sources.saml.models import SAMLSource
from passbook.sources.saml.processors.constants import (
SAML_NAME_ID_FORMAT_EMAIL,
SAML_NAME_ID_FORMAT_PERSISTENT,
SAML_NAME_ID_FORMAT_TRANSIENT,
SAML_NAME_ID_FORMAT_WINDOWS,
SAML_NAME_ID_FORMAT_X509,
)
from passbook.sources.saml.processors.request import SESSION_REQUEST_ID
from passbook.stages.password.stage import PLAN_CONTEXT_AUTHENTICATION_BACKEND
from passbook.stages.prompt.stage import PLAN_CONTEXT_PROMPT
LOGGER = get_logger()
if TYPE_CHECKING:
from xml.etree.ElementTree import Element # nosec
CACHE_SEEN_REQUEST_ID = "passbook_saml_seen_ids_%s"
DEFAULT_BACKEND = "django.contrib.auth.backends.ModelBackend"
class ResponseProcessor:
"""SAML Response Processor"""
_source: SAMLSource
_root: "Element"
_root_xml: str
def __init__(self, source: SAMLSource):
self._source = source
def parse(self, request: HttpRequest):
"""Check if `request` contains SAML Response data, parse and validate it."""
# First off, check if we have any SAML Data at all.
raw_response = request.POST.get("SAMLResponse", None)
if not raw_response:
raise MissingSAMLResponse("Request does not contain 'SAMLResponse'")
# relay_state = request.POST.get('RelayState', None)
# Check if response is compressed, b64 decode it
self._root_xml = decode_base64_and_inflate(raw_response)
self._root = ElementTree.fromstring(self._root_xml)
self._verify_signed()
self._verify_request_id(request)
def _verify_signed(self):
"""Verify SAML Response's Signature"""
verifier = XMLVerifier()
verifier.verify(
self._root_xml, x509_cert=self._source.signing_kp.certificate_data
)
LOGGER.debug("Successfully verified signautre")
def _verify_request_id(self, request: HttpRequest):
if self._source.allow_idp_initiated:
# If IdP-initiated SSO flows are enabled, we want to cache the Response ID
# somewhat mitigate replay attacks
seen_ids = cache.get(CACHE_SEEN_REQUEST_ID % self._source.pk, [])
if self._root.attrib["ID"] in seen_ids:
raise SuspiciousOperation("Replay attack detected")
seen_ids.append(self._root.attrib["ID"])
cache.set(CACHE_SEEN_REQUEST_ID % self._source.pk, seen_ids)
return
if (
SESSION_REQUEST_ID not in request.session
or "InResponseTo" not in self._root.attrib
):
raise MismatchedRequestID(
"Missing InResponseTo and IdP-initiated Logins are not allowed"
)
if request.session[SESSION_REQUEST_ID] != self._root.attrib["InResponseTo"]:
raise MismatchedRequestID("Mismatched request ID")
def _handle_name_id_transient(self, request: HttpRequest) -> HttpResponse:
"""Handle a NameID with the Format of Transient. This is a bit more complex than other
formats, as we need to create a temporary User that is used in the session. This
user has an attribute that refers to our Source for cleanup. The user is also deleted
on logout and periodically."""
# Create a temporary User
name_id = self._get_name_id().text
user: User = User.objects.create(
username=name_id,
attributes={
"saml": {"source": self._source.pk.hex, "delete_on_logout": True}
},
)
LOGGER.debug("Created temporary user for NameID Transient", username=name_id)
user.set_unusable_password()
user.save()
return self._flow_response(
request,
self._source.authentication_flow,
**{
PLAN_CONTEXT_PENDING_USER: user,
PLAN_CONTEXT_AUTHENTICATION_BACKEND: DEFAULT_BACKEND,
},
)
def _get_name_id(self) -> "Element":
"""Get NameID Element"""
assertion = self._root.find("{urn:oasis:names:tc:SAML:2.0:assertion}Assertion")
subject = assertion.find("{urn:oasis:names:tc:SAML:2.0:assertion}Subject")
name_id = subject.find("{urn:oasis:names:tc:SAML:2.0:assertion}NameID")
if name_id is None:
raise ValueError("NameID Element not found!")
return name_id
def _get_name_id_filter(self) -> Dict[str, str]:
"""Returns the subject's NameID as a Filter for the `User`"""
name_id_el = self._get_name_id()
name_id = name_id_el.text
if not name_id:
raise UnsupportedNameIDFormat("Subject's NameID is empty.")
_format = name_id_el.attrib["Format"]
if _format == SAML_NAME_ID_FORMAT_EMAIL:
return {"email": name_id}
if _format == SAML_NAME_ID_FORMAT_PERSISTENT:
return {"username": name_id}
if _format == SAML_NAME_ID_FORMAT_X509:
# This attribute is statically set by the LDAP source
return {"attributes__distinguishedName": name_id}
if _format == SAML_NAME_ID_FORMAT_WINDOWS:
if "\\" in name_id:
name_id = name_id.split("\\")[1]
return {"username": name_id}
raise UnsupportedNameIDFormat(
f"Assertion contains NameID with unsupported format {_format}."
)
def prepare_flow(self, request: HttpRequest) -> HttpResponse:
"""Prepare flow plan depending on whether or not the user exists"""
name_id = self._get_name_id()
# Sanity check, show a warning if NameIDPolicy doesn't match what we go
if self._source.name_id_policy != name_id.attrib["Format"]:
LOGGER.warning(
"NameID from IdP doesn't match our policy",
expected=self._source.name_id_policy,
got=name_id.attrib["Format"],
)
# transient NameIDs are handeled seperately as they don't have to go through flows.
if name_id.attrib["Format"] == SAML_NAME_ID_FORMAT_TRANSIENT:
return self._handle_name_id_transient(request)
name_id_filter = self._get_name_id_filter()
matching_users = User.objects.filter(**name_id_filter)
if matching_users.exists():
# User exists already, switch to authentication flow
return self._flow_response(
request,
self._source.authentication_flow,
**{
PLAN_CONTEXT_PENDING_USER: matching_users.first(),
PLAN_CONTEXT_AUTHENTICATION_BACKEND: DEFAULT_BACKEND,
},
)
return self._flow_response(
request,
self._source.enrollment_flow,
**{PLAN_CONTEXT_PROMPT: delete_none_keys(name_id_filter)},
)
def _flow_response(
self, request: HttpRequest, flow: Flow, **kwargs
) -> HttpResponse:
kwargs[PLAN_CONTEXT_SSO] = True
request.session[SESSION_KEY_PLAN] = FlowPlanner(flow).plan(request, kwargs)
return redirect_with_qs(
"passbook_flows:flow-executor-shell",
request.GET,
flow_slug=flow.slug,
)