2020-09-29 09:32:41 +01:00
|
|
|
"""Test Controllers"""
|
|
|
|
import yaml
|
|
|
|
from django.test import TestCase
|
|
|
|
|
2020-12-05 21:08:42 +00:00
|
|
|
from authentik.flows.models import Flow
|
|
|
|
from authentik.outposts.apps import AuthentikOutpostConfig
|
|
|
|
from authentik.outposts.models import KubernetesServiceConnection, Outpost, OutpostType
|
|
|
|
from authentik.providers.proxy.controllers.kubernetes import ProxyKubernetesController
|
|
|
|
from authentik.providers.proxy.models import ProxyProvider
|
2020-09-29 09:32:41 +01:00
|
|
|
|
|
|
|
|
2021-03-05 18:09:13 +00:00
|
|
|
class TestProxyKubernetes(TestCase):
|
2020-09-29 09:32:41 +01:00
|
|
|
"""Test Controllers"""
|
|
|
|
|
2020-11-19 13:25:53 +00:00
|
|
|
def setUp(self):
|
|
|
|
# Ensure that local connection have been created
|
2020-12-05 21:08:42 +00:00
|
|
|
AuthentikOutpostConfig.init_local_connection()
|
2020-11-19 13:25:53 +00:00
|
|
|
|
2020-10-14 19:21:47 +01:00
|
|
|
def test_kubernetes_controller_static(self):
|
2020-09-29 09:32:41 +01:00
|
|
|
"""Test Kubernetes Controller"""
|
|
|
|
provider: ProxyProvider = ProxyProvider.objects.create(
|
|
|
|
name="test",
|
|
|
|
internal_host="http://localhost",
|
|
|
|
external_host="http://localhost",
|
|
|
|
authorization_flow=Flow.objects.first(),
|
|
|
|
)
|
2020-11-04 13:02:29 +00:00
|
|
|
service_connection = KubernetesServiceConnection.objects.first()
|
2020-09-29 09:32:41 +01:00
|
|
|
outpost: Outpost = Outpost.objects.create(
|
|
|
|
name="test",
|
|
|
|
type=OutpostType.PROXY,
|
2020-11-04 09:54:44 +00:00
|
|
|
service_connection=service_connection,
|
2020-09-29 09:32:41 +01:00
|
|
|
)
|
|
|
|
outpost.providers.add(provider)
|
|
|
|
outpost.save()
|
|
|
|
|
2020-11-04 09:54:44 +00:00
|
|
|
controller = ProxyKubernetesController(outpost, service_connection)
|
2020-09-29 09:32:41 +01:00
|
|
|
manifest = controller.get_static_deployment()
|
2020-10-19 13:55:25 +01:00
|
|
|
self.assertEqual(len(list(yaml.load_all(manifest, Loader=yaml.SafeLoader))), 4)
|
2020-10-14 19:21:47 +01:00
|
|
|
|
|
|
|
def test_kubernetes_controller_deploy(self):
|
|
|
|
"""Test Kubernetes Controller"""
|
|
|
|
provider: ProxyProvider = ProxyProvider.objects.create(
|
|
|
|
name="test",
|
|
|
|
internal_host="http://localhost",
|
|
|
|
external_host="http://localhost",
|
|
|
|
authorization_flow=Flow.objects.first(),
|
|
|
|
)
|
2020-11-04 13:02:29 +00:00
|
|
|
service_connection = KubernetesServiceConnection.objects.first()
|
2020-10-14 19:21:47 +01:00
|
|
|
outpost: Outpost = Outpost.objects.create(
|
|
|
|
name="test",
|
|
|
|
type=OutpostType.PROXY,
|
2020-11-04 09:54:44 +00:00
|
|
|
service_connection=service_connection,
|
2020-10-14 19:21:47 +01:00
|
|
|
)
|
|
|
|
outpost.providers.add(provider)
|
|
|
|
outpost.save()
|
|
|
|
|
2020-11-04 09:54:44 +00:00
|
|
|
controller = ProxyKubernetesController(outpost, service_connection)
|
2020-10-16 21:22:15 +01:00
|
|
|
controller.up()
|
2020-10-19 13:55:25 +01:00
|
|
|
controller.down()
|