authentik/tests/e2e/utils.py

263 lines
10 KiB
Python
Raw Normal View History

2020-12-05 21:08:42 +00:00
"""authentik e2e testing utilities"""
2020-11-23 13:24:42 +00:00
import json
outposts/proxyv2 (#1365) * outposts/proxyv2: initial commit Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add rs256 Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> more stuff Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add forward auth an sign_out Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> match cookie name Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> re-add support for rs256 for backwards compat Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add error handler Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> ensure unique user-agent is used Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> set cookie duration based on id_token expiry Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> build proxy v2 Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add ssl Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add basic auth and custom header support Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add application cert loading Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> implement whitelist Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add redis Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> migrate embedded outpost to v2 Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> remove old proxy Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> providers/proxy: make token expiration configurable Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add metrics Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> fix tests Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * providers/proxy: only allow one redirect URI Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * fix docker build for proxy Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * remove default port offset Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * add AUTHENTIK_HOST_BROWSER Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * tests: fix e2e/integration tests not using proper tags Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * remove references of old port Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * fix user_attributes not being loaded correctly Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * cleanup dependencies Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * cleanup Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
2021-09-08 19:04:56 +01:00
import os
from functools import lru_cache, wraps
2020-07-12 15:17:04 +01:00
from os import environ, makedirs
2020-09-11 22:21:11 +01:00
from time import sleep, time
from typing import Any, Callable, Optional
from channels.testing import ChannelsLiveServerTestCase
from django.apps import apps
2021-02-27 21:57:15 +00:00
from django.db import connection
2021-02-26 15:46:01 +00:00
from django.db.migrations.loader import MigrationLoader
from django.db.migrations.operations.special import RunPython
from django.test.testcases import TransactionTestCase
from django.urls import reverse
2020-09-11 22:21:11 +01:00
from docker import DockerClient, from_env
from docker.errors import DockerException
2020-09-11 22:21:11 +01:00
from docker.models.containers import Container
from selenium import webdriver
from selenium.common.exceptions import NoSuchElementException, TimeoutException, WebDriverException
2020-11-23 13:24:42 +00:00
from selenium.webdriver.common.by import By
2021-02-26 15:46:01 +00:00
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.remote.webdriver import WebDriver
2021-02-21 22:30:31 +00:00
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support.ui import WebDriverWait
from structlog.stdlib import get_logger
2020-12-05 21:08:42 +00:00
from authentik.core.api.users import UserSerializer
from authentik.core.models import User
from authentik.core.tests.utils import create_test_admin_user
from authentik.managed.manager import ObjectManager
RETRIES = int(environ.get("RETRIES", "3"))
outposts/proxyv2 (#1365) * outposts/proxyv2: initial commit Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add rs256 Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> more stuff Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add forward auth an sign_out Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> match cookie name Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> re-add support for rs256 for backwards compat Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add error handler Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> ensure unique user-agent is used Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> set cookie duration based on id_token expiry Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> build proxy v2 Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add ssl Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add basic auth and custom header support Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add application cert loading Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> implement whitelist Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add redis Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> migrate embedded outpost to v2 Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> remove old proxy Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> providers/proxy: make token expiration configurable Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> add metrics Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> fix tests Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * providers/proxy: only allow one redirect URI Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * fix docker build for proxy Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * remove default port offset Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * add AUTHENTIK_HOST_BROWSER Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * tests: fix e2e/integration tests not using proper tags Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * remove references of old port Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * fix user_attributes not being loaded correctly Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * cleanup dependencies Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org> * cleanup Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
2021-09-08 19:04:56 +01:00
def get_docker_tag() -> str:
"""Get docker-tag based off of CI variables"""
env_pr_branch = "GITHUB_HEAD_REF"
default_branch = "GITHUB_REF"
branch_name = os.environ.get(default_branch, "master")
if os.environ.get(env_pr_branch, "") != "":
branch_name = os.environ[env_pr_branch]
branch_name = branch_name.replace("refs/heads/", "").replace("/", "-")
return f"gh-{branch_name}"
class SeleniumTestCase(ChannelsLiveServerTestCase):
"""StaticLiveServerTestCase which automatically creates a Webdriver instance"""
2020-09-11 22:21:11 +01:00
container: Optional[Container] = None
2021-02-21 22:30:31 +00:00
wait_timeout: int
user: User
2020-09-11 22:21:11 +01:00
def setUp(self):
super().setUp()
self.maxDiff = None
2021-02-21 22:30:31 +00:00
self.wait_timeout = 60
self.driver = self._get_driver()
2020-06-20 23:26:29 +01:00
self.driver.maximize_window()
2020-09-29 14:01:01 +01:00
self.driver.implicitly_wait(30)
2021-02-21 22:30:31 +00:00
self.wait = WebDriverWait(self.driver, self.wait_timeout)
self.logger = get_logger()
self.user = create_test_admin_user(set_password=True)
2020-09-11 22:21:11 +01:00
if specs := self.get_container_specs():
self.container = self._start_container(specs)
def get_container_image(self, base: str) -> str:
"""Try to pull docker image based on git branch, fallback to master if not found."""
client: DockerClient = from_env()
image = f"{base}:gh-master"
try:
branch_image = f"{base}:{get_docker_tag()}"
client.images.pull(branch_image)
return branch_image
except DockerException:
client.images.pull(image)
return image
def _start_container(self, specs: dict[str, Any]) -> Container:
2020-09-11 22:21:11 +01:00
client: DockerClient = from_env()
container = client.containers.run(**specs)
if "healthcheck" not in specs:
return container
2020-09-11 22:21:11 +01:00
while True:
container.reload()
status = container.attrs.get("State", {}).get("Health", {}).get("Status")
if status == "healthy":
return container
self.logger.info("Container failed healthcheck")
sleep(1)
def output_container_logs(self, container: Optional[Container] = None):
"""Output the container logs to our STDOUT"""
ct = container or self.container
self.logger.debug("--------container logs", container=ct.image.tags[0])
for log in ct.logs().decode().split("\n"):
self.logger.debug(log, container=ct.image.tags[0])
self.logger.debug("--------end container logs", container=ct.image.tags[0])
def get_container_specs(self) -> Optional[dict[str, Any]]:
2020-09-11 22:21:11 +01:00
"""Optionally get container specs which will launched on setup, wait for the container to
be healthy, and deleted again on tearDown"""
return None
def _get_driver(self) -> WebDriver:
count = 0
while count < RETRIES:
try:
return webdriver.Remote(
command_executor="http://localhost:4444/wd/hub",
options=webdriver.ChromeOptions(),
)
except WebDriverException:
count += 1
raise ValueError(f"Webdriver failed after {RETRIES}.")
def tearDown(self):
2020-07-23 19:03:35 +01:00
if "TF_BUILD" in environ:
makedirs("selenium_screenshots/", exist_ok=True)
screenshot_file = f"selenium_screenshots/{self.__class__.__name__}_{time()}.png"
2020-07-12 15:17:04 +01:00
self.driver.save_screenshot(screenshot_file)
self.logger.warning("Saved screenshot", file=screenshot_file)
self.logger.debug("--------browser logs")
for line in self.driver.get_log("browser"):
self.logger.debug(line["message"], source=line["source"], level=line["level"])
self.logger.debug("--------end browser logs")
2020-09-11 22:21:11 +01:00
if self.container:
self.output_container_logs()
2020-09-11 22:21:11 +01:00
self.container.kill()
self.driver.quit()
super().tearDown()
2020-06-26 15:21:59 +01:00
def wait_for_url(self, desired_url):
"""Wait until URL is `desired_url`."""
self.wait.until(
lambda driver: driver.current_url == desired_url,
f"URL {self.driver.current_url} doesn't match expected URL {desired_url}",
)
2020-06-26 15:21:59 +01:00
def url(self, view, **kwargs) -> str:
"""reverse `view` with `**kwargs` into full URL using live_server_url"""
return self.live_server_url + reverse(view, kwargs=kwargs)
2021-09-16 16:30:16 +01:00
def if_user_url(self, view) -> str:
2020-11-23 13:24:42 +00:00
"""same as self.url() but show URL in shell"""
2021-09-16 16:30:16 +01:00
return f"{self.live_server_url}/if/user/#{view}"
2020-11-23 13:24:42 +00:00
def get_shadow_root(self, selector: str, container: Optional[WebElement] = None) -> WebElement:
2021-02-21 22:30:31 +00:00
"""Get shadow root element's inner shadowRoot"""
if not container:
container = self.driver
shadow_root = container.find_element(By.CSS_SELECTOR, selector)
element = self.driver.execute_script("return arguments[0].shadowRoot", shadow_root)
2021-02-21 22:30:31 +00:00
return element
2021-02-26 15:46:01 +00:00
def login(self):
"""Do entire login flow and check user afterwards"""
flow_executor = self.get_shadow_root("ak-flow-executor")
identification_stage = self.get_shadow_root("ak-stage-identification", flow_executor)
2021-02-26 15:46:01 +00:00
identification_stage.find_element(By.CSS_SELECTOR, "input[name=uidField]").click()
identification_stage.find_element(By.CSS_SELECTOR, "input[name=uidField]").send_keys(
self.user.username
)
identification_stage.find_element(By.CSS_SELECTOR, "input[name=uidField]").send_keys(
Keys.ENTER
)
2021-02-26 15:46:01 +00:00
flow_executor = self.get_shadow_root("ak-flow-executor")
password_stage = self.get_shadow_root("ak-stage-password", flow_executor)
password_stage.find_element(By.CSS_SELECTOR, "input[name=password]").send_keys(
self.user.username
2021-02-26 15:46:01 +00:00
)
password_stage.find_element(By.CSS_SELECTOR, "input[name=password]").send_keys(Keys.ENTER)
sleep(1)
2021-02-26 15:46:01 +00:00
2020-11-23 13:24:42 +00:00
def assert_user(self, expected_user: User):
"""Check users/me API and assert it matches expected_user"""
2020-12-05 21:08:42 +00:00
self.driver.get(self.url("authentik_api:user-me") + "?format=json")
2020-11-23 13:24:42 +00:00
user_json = self.driver.find_element(By.CSS_SELECTOR, "pre").text
user = UserSerializer(data=json.loads(user_json)["user"])
2020-11-23 13:24:42 +00:00
user.is_valid()
self.assertEqual(user["username"].value, expected_user.username)
self.assertEqual(user["name"].value, expected_user.name)
self.assertEqual(user["email"].value, expected_user.email)
@lru_cache
def get_loader():
"""Thin wrapper to lazily get a Migration Loader, only when it's needed
and only once"""
return MigrationLoader(connection)
2021-02-26 15:46:01 +00:00
def apply_migration(app_name: str, migration_name: str):
"""Re-apply migrations that create objects using RunPython before test cases"""
def wrapper_outter(func: Callable):
"""Retry test multiple times"""
@wraps(func)
def wrapper(self: TransactionTestCase, *args, **kwargs):
migration = get_loader().get_migration(app_name, migration_name)
2021-02-26 15:46:01 +00:00
with connection.schema_editor() as schema_editor:
for operation in migration.operations:
if not isinstance(operation, RunPython):
continue
operation.code(apps, schema_editor)
return func(self, *args, **kwargs)
return wrapper
return wrapper_outter
def object_manager(func: Callable):
"""Run objectmanager before a test function"""
@wraps(func)
def wrapper(*args, **kwargs):
"""Run objectmanager before a test function"""
ObjectManager().run()
return func(*args, **kwargs)
return wrapper
def retry(max_retires=RETRIES, exceptions=None):
"""Retry test multiple times. Default to catching Selenium Timeout Exception"""
if not exceptions:
exceptions = [WebDriverException, TimeoutException, NoSuchElementException]
logger = get_logger()
def retry_actual(func: Callable):
"""Retry test multiple times"""
count = 1
@wraps(func)
def wrapper(self: TransactionTestCase, *args, **kwargs):
"""Run test again if we're below max_retries, including tearDown and
setUp. Otherwise raise the error"""
nonlocal count
try:
return func(self, *args, **kwargs)
# pylint: disable=catching-non-exception
except tuple(exceptions) as exc:
count += 1
if count > max_retires:
logger.debug("Exceeded retry count", exc=exc, test=self)
# pylint: disable=raising-non-exception
raise exc
logger.debug("Retrying on error", exc=exc, test=self)
self.tearDown()
self._post_teardown() # noqa
self.setUp()
return wrapper(self, *args, **kwargs)
return wrapper
return retry_actual