2021-07-01 10:51:00 +01:00
|
|
|
"""LDAP and Outpost e2e tests"""
|
|
|
|
from sys import platform
|
|
|
|
from time import sleep
|
|
|
|
from unittest.case import skipUnless
|
|
|
|
|
|
|
|
from docker.client import DockerClient, from_env
|
|
|
|
from docker.models.containers import Container
|
|
|
|
from guardian.shortcuts import get_anonymous_user
|
|
|
|
from ldap3 import (
|
|
|
|
ALL,
|
|
|
|
ALL_ATTRIBUTES,
|
|
|
|
ALL_OPERATIONAL_ATTRIBUTES,
|
|
|
|
SUBTREE,
|
|
|
|
Connection,
|
|
|
|
Server,
|
|
|
|
)
|
|
|
|
from ldap3.core.exceptions import LDAPInsufficientAccessRightsResult
|
|
|
|
|
|
|
|
from authentik.core.models import Application, Group, User
|
|
|
|
from authentik.events.models import Event, EventAction
|
|
|
|
from authentik.flows.models import Flow
|
|
|
|
from authentik.outposts.models import Outpost, OutpostType
|
|
|
|
from authentik.providers.ldap.models import LDAPProvider
|
|
|
|
from tests.e2e.utils import (
|
|
|
|
USER,
|
|
|
|
SeleniumTestCase,
|
|
|
|
apply_migration,
|
|
|
|
object_manager,
|
|
|
|
retry,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@skipUnless(platform.startswith("linux"), "requires local docker")
|
|
|
|
class TestProviderLDAP(SeleniumTestCase):
|
|
|
|
"""LDAP and Outpost e2e tests"""
|
|
|
|
|
|
|
|
ldap_container: Container
|
|
|
|
|
|
|
|
def tearDown(self) -> None:
|
|
|
|
super().tearDown()
|
|
|
|
self.output_container_logs(self.ldap_container)
|
|
|
|
self.ldap_container.kill()
|
|
|
|
|
|
|
|
def start_ldap(self, outpost: Outpost) -> Container:
|
|
|
|
"""Start ldap container based on outpost created"""
|
|
|
|
client: DockerClient = from_env()
|
|
|
|
container = client.containers.run(
|
|
|
|
image="beryju.org/authentik/outpost-ldap:gh-master",
|
|
|
|
detach=True,
|
|
|
|
network_mode="host",
|
|
|
|
auto_remove=True,
|
|
|
|
environment={
|
|
|
|
"AUTHENTIK_HOST": self.live_server_url,
|
|
|
|
"AUTHENTIK_TOKEN": outpost.token.key,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
return container
|
|
|
|
|
|
|
|
def _prepare(self) -> User:
|
|
|
|
"""prepare user, provider, app and container"""
|
|
|
|
# set additionalHeaders to test later
|
|
|
|
user = USER()
|
|
|
|
user.attributes["extraAttribute"] = "bar"
|
|
|
|
user.save()
|
|
|
|
|
|
|
|
ldap: LDAPProvider = LDAPProvider.objects.create(
|
|
|
|
name="ldap_provider",
|
|
|
|
authorization_flow=Flow.objects.get(slug="default-authentication-flow"),
|
|
|
|
search_group=Group.objects.first(),
|
|
|
|
)
|
|
|
|
# we need to create an application to actually access the ldap
|
|
|
|
Application.objects.create(name="ldap", slug="ldap", provider=ldap)
|
|
|
|
outpost: Outpost = Outpost.objects.create(
|
|
|
|
name="ldap_outpost",
|
|
|
|
type=OutpostType.LDAP,
|
|
|
|
)
|
|
|
|
outpost.providers.add(ldap)
|
|
|
|
outpost.save()
|
|
|
|
user = outpost.user
|
|
|
|
|
|
|
|
self.ldap_container = self.start_ldap(outpost)
|
|
|
|
|
|
|
|
# Wait until outpost healthcheck succeeds
|
|
|
|
healthcheck_retries = 0
|
|
|
|
while healthcheck_retries < 50:
|
|
|
|
if len(outpost.state) > 0:
|
|
|
|
state = outpost.state[0]
|
|
|
|
if state.last_seen:
|
|
|
|
break
|
|
|
|
healthcheck_retries += 1
|
|
|
|
sleep(0.5)
|
|
|
|
return user
|
|
|
|
|
|
|
|
@retry()
|
|
|
|
@apply_migration("authentik_core", "0003_default_user")
|
|
|
|
@apply_migration("authentik_flows", "0008_default_flows")
|
|
|
|
@object_manager
|
|
|
|
def test_ldap_bind_success(self):
|
|
|
|
"""Test simple bind"""
|
|
|
|
self._prepare()
|
|
|
|
server = Server("ldap://localhost:3389", get_info=ALL)
|
|
|
|
_connection = Connection(
|
|
|
|
server,
|
|
|
|
raise_exceptions=True,
|
|
|
|
user=f"cn={USER().username},ou=users,DC=ldap,DC=goauthentik,DC=io",
|
|
|
|
password=USER().username,
|
|
|
|
)
|
|
|
|
_connection.bind()
|
|
|
|
self.assertTrue(
|
|
|
|
Event.objects.filter(
|
|
|
|
action=EventAction.LOGIN,
|
|
|
|
user={
|
|
|
|
"pk": USER().pk,
|
|
|
|
"email": USER().email,
|
|
|
|
"username": USER().username,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
@retry()
|
|
|
|
@apply_migration("authentik_core", "0003_default_user")
|
|
|
|
@apply_migration("authentik_flows", "0008_default_flows")
|
|
|
|
@object_manager
|
|
|
|
def test_ldap_bind_fail(self):
|
|
|
|
"""Test simple bind (failed)"""
|
|
|
|
self._prepare()
|
|
|
|
server = Server("ldap://localhost:3389", get_info=ALL)
|
|
|
|
_connection = Connection(
|
|
|
|
server,
|
|
|
|
raise_exceptions=True,
|
|
|
|
user=f"cn={USER().username},ou=users,DC=ldap,DC=goauthentik,DC=io",
|
|
|
|
password=USER().username + "fqwerwqer",
|
|
|
|
)
|
|
|
|
with self.assertRaises(LDAPInsufficientAccessRightsResult):
|
|
|
|
_connection.bind()
|
|
|
|
anon = get_anonymous_user()
|
|
|
|
self.assertTrue(
|
|
|
|
Event.objects.filter(
|
|
|
|
action=EventAction.LOGIN_FAILED,
|
|
|
|
user={"pk": anon.pk, "email": anon.email, "username": anon.username},
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
@retry()
|
|
|
|
@apply_migration("authentik_core", "0003_default_user")
|
2021-07-01 11:14:35 +01:00
|
|
|
@apply_migration("authentik_core", "0009_group_is_superuser")
|
2021-07-01 10:51:00 +01:00
|
|
|
@apply_migration("authentik_flows", "0008_default_flows")
|
|
|
|
@object_manager
|
|
|
|
def test_ldap_bind_search(self):
|
|
|
|
"""Test simple bind + search"""
|
|
|
|
outpost_user = self._prepare()
|
|
|
|
server = Server("ldap://localhost:3389", get_info=ALL)
|
|
|
|
_connection = Connection(
|
|
|
|
server,
|
|
|
|
raise_exceptions=True,
|
|
|
|
user=f"cn={USER().username},ou=users,dc=ldap,dc=goauthentik,dc=io",
|
|
|
|
password=USER().username,
|
|
|
|
)
|
|
|
|
_connection.bind()
|
|
|
|
self.assertTrue(
|
|
|
|
Event.objects.filter(
|
|
|
|
action=EventAction.LOGIN,
|
|
|
|
user={
|
|
|
|
"pk": USER().pk,
|
|
|
|
"email": USER().email,
|
|
|
|
"username": USER().username,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
)
|
|
|
|
_connection.search(
|
|
|
|
"ou=users,dc=ldap,dc=goauthentik,dc=io",
|
|
|
|
"(objectClass=user)",
|
|
|
|
search_scope=SUBTREE,
|
|
|
|
attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES],
|
|
|
|
)
|
|
|
|
response = _connection.response
|
|
|
|
# Remove raw_attributes to make checking easier
|
|
|
|
for obj in response:
|
|
|
|
del obj["raw_attributes"]
|
|
|
|
del obj["raw_dn"]
|
|
|
|
self.assertCountEqual(
|
|
|
|
response,
|
|
|
|
[
|
|
|
|
{
|
|
|
|
"dn": f"cn={outpost_user.username},ou=users,dc=ldap,dc=goauthentik,dc=io",
|
|
|
|
"attributes": {
|
|
|
|
"cn": [outpost_user.username],
|
|
|
|
"uid": [outpost_user.uid],
|
|
|
|
"name": [""],
|
|
|
|
"displayName": [""],
|
|
|
|
"mail": [""],
|
|
|
|
"objectClass": [
|
|
|
|
"user",
|
|
|
|
"organizationalPerson",
|
|
|
|
"goauthentik.io/ldap/user",
|
|
|
|
],
|
|
|
|
"memberOf": [],
|
|
|
|
"goauthentik.io/ldap/active": ["true"],
|
|
|
|
"goauthentik.io/ldap/superuser": ["false"],
|
|
|
|
"goauthentik.io/user/override-ips": ["true"],
|
|
|
|
"goauthentik.io/user/service-account": ["true"],
|
|
|
|
},
|
|
|
|
"type": "searchResEntry",
|
|
|
|
},
|
|
|
|
{
|
|
|
|
"dn": f"cn={USER().username},ou=users,dc=ldap,dc=goauthentik,dc=io",
|
|
|
|
"attributes": {
|
|
|
|
"cn": [USER().username],
|
|
|
|
"uid": [USER().uid],
|
|
|
|
"name": [USER().name],
|
|
|
|
"displayName": [USER().name],
|
|
|
|
"mail": [USER().email],
|
|
|
|
"objectClass": [
|
|
|
|
"user",
|
|
|
|
"organizationalPerson",
|
|
|
|
"goauthentik.io/ldap/user",
|
|
|
|
],
|
|
|
|
"memberOf": [
|
|
|
|
"cn=authentik Admins,ou=groups,dc=ldap,dc=goauthentik,dc=io"
|
|
|
|
],
|
|
|
|
"goauthentik.io/ldap/active": ["true"],
|
|
|
|
"goauthentik.io/ldap/superuser": ["true"],
|
|
|
|
"extraAttribute": ["bar"],
|
|
|
|
},
|
|
|
|
"type": "searchResEntry",
|
|
|
|
},
|
|
|
|
],
|
|
|
|
)
|