Web auth

Example

web-auth.py
import asab.web.rest
#!/usr/bin/env python3
import asab.web.auth
import typing

# Set up a web container listening at port 8080
asab.Config["web"] = {"listen": "8080"}

# Disables or enables all authentication and authorization, or switches it into MOCK mode.
# When disabled, the `resources` and `userinfo` handler arguments are set to `None`.
asab.Config["auth"]["enabled"] = "mock"  # Mock authorization, useful for debugging.
# asab.Config["auth"]["enabled"] = "yes"   # Authorization is enabled.
# asab.Config["auth"]["enabled"] = "no" # Authorization is disabled.

# Activating the mock mode disables communication with the authorization server.
# The requests' Authorization headers are ignored and AuthService provides mock authorization with mock user info.
# You can provide custom user info by specifying the path pointing to your JSON file.
asab.Config["auth"]["mock_user_info_path"] = "./mock-userinfo.json"

# URL of the authorization server's JWK public keys, used for ID token verification.
# This option is ignored in mock mode or when authorization is disabled.
asab.Config["auth"]["public_keys_url"] = "http://localhost:3081/.well-known/jwks.json"


class MyApplication(asab.Application):

    def __init__(self):
        super().__init__()

        # Initialize web container
        self.add_module(asab.web.Module)
        self.WebService = self.get_service("asab.WebService")
        self.WebContainer = asab.web.WebContainer(self.WebService, "web")

        self.WebContainer.WebApp.middlewares.append(asab.web.rest.JsonExceptionMiddleware)

        from asab.api import ApiService
        self.ApiService = ApiService(self)
        self.ApiService.initialize_web(self.WebContainer)

        # Initialize authorization
        self.AuthService = asab.web.auth.AuthService(self)
        self.AuthService.install(self.WebContainer)

        # Add routes
        self.WebContainer.WebApp.router.add_get("/no_auth", self.no_auth)
        self.WebContainer.WebApp.router.add_get("/auth", self.auth)
        self.WebContainer.WebApp.router.add_get("/auth/resource_check", self.auth_resource)
        self.WebContainer.WebApp.router.add_put("/auth/resource_check", self.auth_resource_put)
        self.WebContainer.WebApp.router.add_get("/{tenant}/required_tenant", self.tenant_in_path)
        self.WebContainer.WebApp.router.add_get("/{tenant}/required_tenant/resource_check", self.tenant_in_path_resources)
        self.WebContainer.WebApp.router.add_get("/configurable_tenant", self.tenant_in_query)
        self.WebContainer.WebApp.router.add_get("/configurable_tenant/resource_check", self.tenant_in_query_resources)


    @asab.web.auth.noauth
    async def no_auth(self, request):
        """
        NO AUTH
        - authentication skipped

        - `tenant`, `user_info`, `resources` params not allowed
        """
        data = {
            "tenant": "NOT AVAILABLE",
            "resources": "NOT AVAILABLE",
            "user_info": "NOT AVAILABLE",
        }
        return asab.web.rest.json_response(request, data)


    async def auth(self, request, *, user_info: typing.Optional[dict], resources: typing.Optional[frozenset]):
        """
        TENANT-AGNOSTIC
        - returns 401 if authentication not successful

        - `user_info`, `resources` params allowed
        - `tenant` param not allowed
        - `resources` contain only globally granted resources
        """
        data = {
            "tenant": "NOT AVAILABLE",
            "resources": list(resources) if resources else None,
            "user_info": user_info,
        }
        return asab.web.rest.json_response(request, data)


    @asab.web.auth.require("something:access", "something:edit")
    async def auth_resource(self, request, *, user_info: typing.Optional[dict], resources: typing.Optional[frozenset]):
        """
        TENANT-AGNOSTIC + RESOURCE CHECK
        - returns 401 if authentication not successful
        - globally granted resources checked
        - returns 403 if resource access not granted

        - `user_info`, `resources` params allowed
        - `tenant` param not allowed
        - `resources` contain only globally granted resources
        """
        data = {
            "tenant": "NOT AVAILABLE",
            "resources": list(resources) if resources else None,
            "user_info": user_info,
        }
        return asab.web.rest.json_response(request, data)


    @asab.web.rest.json_schema_handler({
        "type": "object"
    })
    @asab.web.auth.require("something:access", "something:edit")
    async def auth_resource_put(
        self, request, *,
        user_info: typing.Optional[dict],
        resources: typing.Optional[frozenset],
        json_data: dict
    ):
        """
        Decorator asab.web.auth.require can be used together with other decorators.
        """
        data = {
            "tenant": "NOT AVAILABLE",
            "resources": list(resources) if resources else None,
            "user_info": user_info,
            "json_data": json_data,
        }
        return asab.web.rest.json_response(request, data)


    async def tenant_in_path(
        self, request, *,
        tenant: typing.Optional[str],
        user_info: typing.Optional[dict],
        resources: typing.Optional[frozenset]
    ):
        """
        TENANT-AWARE
        - returns 401 if authentication not successful
        - `tenant` access checked
        - returns 403 if tenant not accessible

        - `user_info`, `resources` params allowed
        - `tenant` param required in path, cannot be None
        - `resources` contain tenant-granted resources
        """
        data = {
            "tenant": tenant,
            "resources": list(resources) if resources else None,
            "user_info": user_info,
        }
        return asab.web.rest.json_response(request, data)


    async def tenant_in_query(
        self, request, *,
        tenant: typing.Optional[str],
        user_info: typing.Optional[dict],
        resources: typing.Optional[frozenset]
    ):
        """
        CONFIGURABLY TENANT-AWARE
        - returns 401 if authentication not successful
        - `tenant` expected in query string
        - tenant access checked
        - returns 403 if tenant not accessible
        - `tenant` is set to `None` if `tenant` not in query

        - `user_info`, `resources` params allowed
        - `resources` contain tenant-granted resources if tenant is not None,
            otherwise only globally-granted resources
        """
        data = {
            "tenant": tenant,
            "resources": list(resources) if resources else None,
            "user_info": user_info,
        }
        return asab.web.rest.json_response(request, data)


    @asab.web.auth.require("something:access", "something:edit")
    async def tenant_in_path_resources(
        self, request, *,
        tenant: typing.Optional[str],
        user_info: typing.Optional[dict],
        resources: typing.Optional[frozenset]
    ):
        """
        TENANT-AWARE + RESOURCE CHECK
        - returns 401 if authentication not successful
        - `tenant` access checked
        - returns 403 if tenant not accessible
        - tenant-accessible resources checked
        - returns 403 if resource access not granted

        - `user_info`, `resources` params allowed
        - `tenant` param required, cannot be None
        - `resources` contain only resources granted within tenant
        """
        data = {
            "tenant": tenant,
            "resources": list(resources) if resources else None,
            "user_info": user_info,
        }
        return asab.web.rest.json_response(request, data)


    @asab.web.auth.require("something:access", "something:edit")
    async def tenant_in_query_resources(
        self, request, *,
        tenant: typing.Optional[str],
        user_info: typing.Optional[dict],
        resources: typing.Optional[frozenset]
    ):
        """
        CONFIGURABLY TENANT-AWARE + RESOURCE CHECK
        - returns 401 if authentication not successful
        - `tenant` expected in query string
        - tenant access checked
        - returns 403 if tenant not accessible
        - returns 403 if resource access not granted within tenant
        - `tenant` is set to `None` if `tenant` not in query
        - returns 403 if tenant is None resource access is not granted globally

        - `user_info`, `resources` params allowed
        - `resources` contain tenant-granted resources if tenant is not None,
            otherwise only globally-granted resources
        """
        data = {
            "tenant": tenant,
            "resources": list(resources) if resources else None,
            "user_info": user_info,
        }
        return asab.web.rest.json_response(request, data)


if __name__ == "__main__":
    app = MyApplication()
    app.run()