authentik/tests/e2e/test_provider_ldap.py

287 lines
11 KiB
Python
Raw Normal View History

"""LDAP and Outpost e2e tests"""
from sys import platform
from time import sleep
from unittest.case import skipUnless
from docker.client import DockerClient, from_env
from docker.models.containers import Container
from guardian.shortcuts import get_anonymous_user
from ldap3 import ALL, ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES, SUBTREE, Connection, Server
from ldap3.core.exceptions import LDAPInvalidCredentialsResult
from authentik.core.models import Application, Group, User
from authentik.events.models import Event, EventAction
from authentik.flows.models import Flow
from authentik.outposts.managed import MANAGED_OUTPOST
from authentik.outposts.models import Outpost, OutpostType
from authentik.providers.ldap.models import LDAPProvider
outposts/proxyv2 (#1365) * outposts/proxyv2: initial commit Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add rs256 Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> more stuff Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add forward auth an sign_out Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> match cookie name Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> re-add support for rs256 for backwards compat Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add error handler Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> ensure unique user-agent is used Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> set cookie duration based on id_token expiry Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> build proxy v2 Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add ssl Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add basic auth and custom header support Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add application cert loading Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> implement whitelist Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add redis Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> migrate embedded outpost to v2 Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> remove old proxy Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> providers/proxy: make token expiration configurable Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add metrics Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> fix tests Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * providers/proxy: only allow one redirect URI Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * fix docker build for proxy Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * remove default port offset Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * add AUTHENTIK_HOST_BROWSER Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * tests: fix e2e/integration tests not using proper tags Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * remove references of old port Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * fix user_attributes not being loaded correctly Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * cleanup dependencies Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * cleanup Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
2021-09-08 19:04:56 +01:00
from tests.e2e.utils import (
USER,
SeleniumTestCase,
apply_migration,
get_docker_tag,
object_manager,
retry,
)
@skipUnless(platform.startswith("linux"), "requires local docker")
class TestProviderLDAP(SeleniumTestCase):
"""LDAP and Outpost e2e tests"""
ldap_container: Container
def tearDown(self) -> None:
super().tearDown()
self.output_container_logs(self.ldap_container)
self.ldap_container.kill()
def start_ldap(self, outpost: Outpost) -> Container:
"""Start ldap container based on outpost created"""
client: DockerClient = from_env()
container = client.containers.run(
outposts/proxyv2 (#1365) * outposts/proxyv2: initial commit Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add rs256 Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> more stuff Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add forward auth an sign_out Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> match cookie name Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> re-add support for rs256 for backwards compat Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add error handler Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> ensure unique user-agent is used Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> set cookie duration based on id_token expiry Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> build proxy v2 Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add ssl Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add basic auth and custom header support Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add application cert loading Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> implement whitelist Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add redis Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> migrate embedded outpost to v2 Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> remove old proxy Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> providers/proxy: make token expiration configurable Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add metrics Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> fix tests Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * providers/proxy: only allow one redirect URI Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * fix docker build for proxy Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * remove default port offset Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * add AUTHENTIK_HOST_BROWSER Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * tests: fix e2e/integration tests not using proper tags Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * remove references of old port Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * fix user_attributes not being loaded correctly Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * cleanup dependencies Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * cleanup Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
2021-09-08 19:04:56 +01:00
image=f"beryju.org/authentik/outpost-ldap:{get_docker_tag()}",
detach=True,
network_mode="host",
auto_remove=True,
environment={
"AUTHENTIK_HOST": self.live_server_url,
"AUTHENTIK_TOKEN": outpost.token.key,
},
)
return container
def _prepare(self) -> User:
"""prepare user, provider, app and container"""
# set additionalHeaders to test later
user = USER()
user.attributes["extraAttribute"] = "bar"
user.save()
ldap: LDAPProvider = LDAPProvider.objects.create(
name="ldap_provider",
authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
search_group=Group.objects.first(),
)
# we need to create an application to actually access the ldap
Application.objects.create(name="ldap", slug="ldap", provider=ldap)
outpost: Outpost = Outpost.objects.create(
name="ldap_outpost",
type=OutpostType.LDAP,
)
outpost.providers.add(ldap)
outpost.save()
user = outpost.user
self.ldap_container = self.start_ldap(outpost)
# Wait until outpost healthcheck succeeds
healthcheck_retries = 0
while healthcheck_retries < 50:
if len(outpost.state) > 0:
state = outpost.state[0]
if state.last_seen:
break
healthcheck_retries += 1
sleep(0.5)
return user
@retry()
@apply_migration("authentik_core", "0003_default_user")
@apply_migration("authentik_flows", "0008_default_flows")
@object_manager
def test_ldap_bind_success(self):
"""Test simple bind"""
self._prepare()
server = Server("ldap://localhost:3389", get_info=ALL)
_connection = Connection(
server,
raise_exceptions=True,
user=f"cn={USER().username},ou=users,DC=ldap,DC=goauthentik,DC=io",
password=USER().username,
)
_connection.bind()
self.assertTrue(
Event.objects.filter(
action=EventAction.LOGIN,
user={
"pk": USER().pk,
"email": USER().email,
"username": USER().username,
},
)
)
@retry()
@apply_migration("authentik_core", "0003_default_user")
@apply_migration("authentik_flows", "0008_default_flows")
@object_manager
def test_ldap_bind_success_ssl(self):
"""Test simple bind with ssl"""
self._prepare()
server = Server("ldaps://localhost:6636", get_info=ALL)
_connection = Connection(
server,
raise_exceptions=True,
user=f"cn={USER().username},ou=users,DC=ldap,DC=goauthentik,DC=io",
password=USER().username,
)
_connection.bind()
self.assertTrue(
Event.objects.filter(
action=EventAction.LOGIN,
user={
"pk": USER().pk,
"email": USER().email,
"username": USER().username,
},
)
)
@retry()
@apply_migration("authentik_core", "0003_default_user")
@apply_migration("authentik_flows", "0008_default_flows")
@object_manager
def test_ldap_bind_fail(self):
"""Test simple bind (failed)"""
self._prepare()
server = Server("ldap://localhost:3389", get_info=ALL)
_connection = Connection(
server,
raise_exceptions=True,
user=f"cn={USER().username},ou=users,DC=ldap,DC=goauthentik,DC=io",
password=USER().username + "fqwerwqer",
)
with self.assertRaises(LDAPInvalidCredentialsResult):
_connection.bind()
anon = get_anonymous_user()
self.assertTrue(
Event.objects.filter(
action=EventAction.LOGIN_FAILED,
user={"pk": anon.pk, "email": anon.email, "username": anon.username},
)
)
@retry()
@apply_migration("authentik_core", "0003_default_user")
@apply_migration("authentik_core", "0009_group_is_superuser")
@apply_migration("authentik_flows", "0008_default_flows")
@object_manager
def test_ldap_bind_search(self):
"""Test simple bind + search"""
outpost_user = self._prepare()
server = Server("ldap://localhost:3389", get_info=ALL)
_connection = Connection(
server,
raise_exceptions=True,
user=f"cn={USER().username},ou=users,dc=ldap,dc=goauthentik,dc=io",
password=USER().username,
)
_connection.bind()
self.assertTrue(
Event.objects.filter(
action=EventAction.LOGIN,
user={
"pk": USER().pk,
"email": USER().email,
"username": USER().username,
},
)
)
embedded_account = Outpost.objects.filter(managed=MANAGED_OUTPOST).first().user
_connection.search(
"ou=users,dc=ldap,dc=goauthentik,dc=io",
"(objectClass=user)",
search_scope=SUBTREE,
attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES],
)
response = _connection.response
# Remove raw_attributes to make checking easier
for obj in response:
del obj["raw_attributes"]
del obj["raw_dn"]
self.assertCountEqual(
response,
[
{
"dn": f"cn={outpost_user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
"attributes": {
"cn": [outpost_user.username],
"sAMAccountName": [outpost_user.username],
"uid": [outpost_user.uid],
"name": [""],
"displayName": [""],
"mail": [""],
"objectClass": [
"user",
"organizationalPerson",
"goauthentik.io/ldap/user",
],
"uidNumber": [str(2000 + outpost_user.pk)],
"gidNumber": [str(2000 + outpost_user.pk)],
"memberOf": [],
"accountStatus": ["true"],
"superuser": ["false"],
"goauthentik.io/ldap/active": ["true"],
"goauthentik.io/ldap/superuser": ["false"],
"goauthentik.io/user/override-ips": ["true"],
"goauthentik.io/user/service-account": ["true"],
},
"type": "searchResEntry",
},
{
"dn": f"cn={embedded_account.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
"attributes": {
"cn": [embedded_account.username],
"sAMAccountName": [embedded_account.username],
"uid": [embedded_account.uid],
"name": [""],
"displayName": [""],
"mail": [""],
"objectClass": [
"user",
"organizationalPerson",
"goauthentik.io/ldap/user",
],
"uidNumber": [str(2000 + embedded_account.pk)],
"gidNumber": [str(2000 + embedded_account.pk)],
"memberOf": [],
"accountStatus": ["true"],
"superuser": ["false"],
"goauthentik.io/ldap/active": ["true"],
"goauthentik.io/ldap/superuser": ["false"],
"goauthentik.io/user/override-ips": ["true"],
"goauthentik.io/user/service-account": ["true"],
},
"type": "searchResEntry",
},
{
"dn": f"cn={USER().username},ou=users,dc=ldap,dc=goauthentik,dc=io",
"attributes": {
"cn": [USER().username],
"sAMAccountName": [USER().username],
"uid": [USER().uid],
"name": [USER().name],
"displayName": [USER().name],
"mail": [USER().email],
"objectClass": [
"user",
"organizationalPerson",
"goauthentik.io/ldap/user",
],
"uidNumber": [str(2000 + USER().pk)],
"gidNumber": [str(2000 + USER().pk)],
"memberOf": ["cn=authentik Admins,ou=groups,dc=ldap,dc=goauthentik,dc=io"],
"accountStatus": ["true"],
"superuser": ["true"],
"goauthentik.io/ldap/active": ["true"],
"goauthentik.io/ldap/superuser": ["true"],
"extraAttribute": ["bar"],
},
"type": "searchResEntry",
},
],
)