authentik/e2e/test_provider_oauth.py

235 lines
9.3 KiB
Python

"""test OAuth Provider flow"""
from time import sleep
from oauth2_provider.generators import generate_client_id, generate_client_secret
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from structlog import get_logger
from docker import DockerClient, from_env
from docker.models.containers import Container
from docker.types import Healthcheck
from e2e.utils import USER, SeleniumTestCase
from passbook.core.models import Application
from passbook.flows.models import Flow
from passbook.policies.expression.models import ExpressionPolicy
from passbook.policies.models import PolicyBinding
from passbook.providers.oauth.models import OAuth2Provider
LOGGER = get_logger()
class TestProviderOAuth(SeleniumTestCase):
"""test OAuth Provider flow"""
def setUp(self):
self.client_id = generate_client_id()
self.client_secret = generate_client_secret()
self.container = self.setup_client()
super().setUp()
def setup_client(self) -> Container:
"""Setup client grafana container which we test OAuth against"""
client: DockerClient = from_env()
container = client.containers.run(
image="grafana/grafana:7.1.0",
detach=True,
network_mode="host",
auto_remove=True,
healthcheck=Healthcheck(
test=["CMD", "wget", "--spider", "http://localhost:3000"],
interval=5 * 100 * 1000000,
start_period=1 * 100 * 1000000,
),
environment={
"GF_AUTH_GITHUB_ENABLED": "true",
"GF_AUTH_GITHUB_allow_sign_up": "true",
"GF_AUTH_GITHUB_CLIENT_ID": self.client_id,
"GF_AUTH_GITHUB_CLIENT_SECRET": self.client_secret,
"GF_AUTH_GITHUB_SCOPES": "user:email,read:org",
"GF_AUTH_GITHUB_AUTH_URL": self.url(
"passbook_providers_oauth:github-authorize"
),
"GF_AUTH_GITHUB_TOKEN_URL": self.url(
"passbook_providers_oauth:github-access-token"
),
"GF_AUTH_GITHUB_API_URL": self.url(
"passbook_providers_oauth:github-user"
),
"GF_LOG_LEVEL": "debug",
},
)
while True:
container.reload()
status = container.attrs.get("State", {}).get("Health", {}).get("Status")
if status == "healthy":
return container
LOGGER.info("Container failed healthcheck")
sleep(1)
def tearDown(self):
self.container.kill()
super().tearDown()
def test_authorization_consent_implied(self):
"""test OAuth Provider flow (default authorization flow with implied consent)"""
sleep(1)
# Bootstrap all needed objects
authorization_flow = Flow.objects.get(
slug="default-provider-authorization-implicit-consent"
)
provider = OAuth2Provider.objects.create(
name="grafana",
client_type=OAuth2Provider.CLIENT_CONFIDENTIAL,
authorization_grant_type=OAuth2Provider.GRANT_AUTHORIZATION_CODE,
client_id=self.client_id,
client_secret=self.client_secret,
redirect_uris="http://localhost:3000/login/github",
skip_authorization=True,
authorization_flow=authorization_flow,
)
Application.objects.create(
name="Grafana", slug="grafana", provider=provider,
)
self.driver.get("http://localhost:3000")
self.driver.find_element(By.CLASS_NAME, "btn-service--github").click()
self.driver.find_element(By.ID, "id_uid_field").click()
self.driver.find_element(By.ID, "id_uid_field").send_keys(USER().username)
self.driver.find_element(By.ID, "id_uid_field").send_keys(Keys.ENTER)
self.driver.find_element(By.ID, "id_password").send_keys(USER().username)
self.driver.find_element(By.ID, "id_password").send_keys(Keys.ENTER)
self.wait_for_url("http://localhost:3000/?orgId=1")
self.driver.find_element(By.XPATH, "//a[contains(@href, '/profile')]").click()
self.assertEqual(
self.driver.find_element(By.CLASS_NAME, "page-header__title").text,
USER().username,
)
self.assertEqual(
self.driver.find_element(By.CSS_SELECTOR, "input[name=name]").get_attribute(
"value"
),
USER().username,
)
self.assertEqual(
self.driver.find_element(
By.CSS_SELECTOR, "input[name=email]"
).get_attribute("value"),
USER().email,
)
self.assertEqual(
self.driver.find_element(
By.CSS_SELECTOR, "input[name=login]"
).get_attribute("value"),
USER().username,
)
def test_authorization_consent_explicit(self):
"""test OAuth Provider flow (default authorization flow with explicit consent)"""
sleep(1)
# Bootstrap all needed objects
authorization_flow = Flow.objects.get(
slug="default-provider-authorization-explicit-consent"
)
provider = OAuth2Provider.objects.create(
name="grafana",
client_type=OAuth2Provider.CLIENT_CONFIDENTIAL,
authorization_grant_type=OAuth2Provider.GRANT_AUTHORIZATION_CODE,
client_id=self.client_id,
client_secret=self.client_secret,
redirect_uris="http://localhost:3000/login/github",
skip_authorization=True,
authorization_flow=authorization_flow,
)
app = Application.objects.create(
name="Grafana", slug="grafana", provider=provider,
)
self.driver.get("http://localhost:3000")
self.driver.find_element(By.CLASS_NAME, "btn-service--github").click()
self.driver.find_element(By.ID, "id_uid_field").click()
self.driver.find_element(By.ID, "id_uid_field").send_keys(USER().username)
self.driver.find_element(By.ID, "id_uid_field").send_keys(Keys.ENTER)
self.driver.find_element(By.ID, "id_password").send_keys(USER().username)
self.driver.find_element(By.ID, "id_password").send_keys(Keys.ENTER)
self.assertIn(
app.name,
self.driver.find_element(
By.XPATH, "/html/body/div[2]/div/main/div/form/div[2]/p[1]"
).text,
)
self.assertEqual(
"GitHub Compatibility: User Email",
self.driver.find_element(
By.XPATH, "/html/body/div[2]/div/main/div/form/div[2]/ul/li[1]"
).text,
)
sleep(1)
self.driver.find_element(By.CSS_SELECTOR, "[type=submit]").click()
self.wait_for_url("http://localhost:3000/?orgId=1")
self.driver.find_element(By.XPATH, "//a[contains(@href, '/profile')]").click()
self.assertEqual(
self.driver.find_element(By.CLASS_NAME, "page-header__title").text,
USER().username,
)
self.assertEqual(
self.driver.find_element(By.CSS_SELECTOR, "input[name=name]").get_attribute(
"value"
),
USER().username,
)
self.assertEqual(
self.driver.find_element(
By.CSS_SELECTOR, "input[name=email]"
).get_attribute("value"),
USER().email,
)
self.assertEqual(
self.driver.find_element(
By.CSS_SELECTOR, "input[name=login]"
).get_attribute("value"),
USER().username,
)
def test_denied(self):
"""test OAuth Provider flow (default authorization flow, denied)"""
sleep(1)
# Bootstrap all needed objects
authorization_flow = Flow.objects.get(
slug="default-provider-authorization-explicit-consent"
)
provider = OAuth2Provider.objects.create(
name="grafana",
client_type=OAuth2Provider.CLIENT_CONFIDENTIAL,
authorization_grant_type=OAuth2Provider.GRANT_AUTHORIZATION_CODE,
client_id=self.client_id,
client_secret=self.client_secret,
redirect_uris="http://localhost:3000/login/github",
skip_authorization=True,
authorization_flow=authorization_flow,
)
app = Application.objects.create(
name="Grafana", slug="grafana", provider=provider,
)
negative_policy = ExpressionPolicy.objects.create(
name="negative-static", expression="return False"
)
PolicyBinding.objects.create(target=app, policy=negative_policy, order=0)
self.driver.get("http://localhost:3000")
self.driver.find_element(By.CLASS_NAME, "btn-service--github").click()
self.driver.find_element(By.ID, "id_uid_field").click()
self.driver.find_element(By.ID, "id_uid_field").send_keys(USER().username)
self.driver.find_element(By.ID, "id_uid_field").send_keys(Keys.ENTER)
self.driver.find_element(By.ID, "id_password").send_keys(USER().username)
self.driver.find_element(By.ID, "id_password").send_keys(Keys.ENTER)
self.wait_for_url(self.url("passbook_flows:denied"))
self.assertEqual(
self.driver.find_element(By.CSS_SELECTOR, "header > h1").text,
"Permission denied",
)