importasab.web.rest#!/usr/bin/env python3importasab.web.authimporttyping# Set up a web container listening at port 8080asab.Config["web"]={"listen":"8080"}# Disables or enables all authentication and authorization, or switches it into MOCK mode.# When disabled, the `resources` and `userinfo` handler arguments are set to `None`.asab.Config["auth"]["enabled"]="mock"# Mock authorization, useful for debugging.# asab.Config["auth"]["enabled"] = "yes" # Authorization is enabled.# asab.Config["auth"]["enabled"] = "no" # Authorization is disabled.# Activating the mock mode disables communication with the authorization server.# The requests' Authorization headers are ignored and AuthService provides mock authorization with mock user info.# You can provide custom user info by specifying the path pointing to your JSON file.asab.Config["auth"]["mock_user_info_path"]="./mock-userinfo.json"# URL of the authorization server's JWK public keys, used for ID token verification.# This option is ignored in mock mode or when authorization is disabled.asab.Config["auth"]["public_keys_url"]="http://localhost:3081/.well-known/jwks.json"classMyApplication(asab.Application):def__init__(self):super().__init__()# Initialize web containerself.add_module(asab.web.Module)self.WebService=self.get_service("asab.WebService")self.WebContainer=asab.web.WebContainer(self.WebService,"web")self.WebContainer.WebApp.middlewares.append(asab.web.rest.JsonExceptionMiddleware)fromasab.apiimportApiServiceself.ApiService=ApiService(self)self.ApiService.initialize_web(self.WebContainer)# Initialize authorizationself.AuthService=asab.web.auth.AuthService(self)self.AuthService.install(self.WebContainer)# Add routesself.WebContainer.WebApp.router.add_get("/no_auth",self.no_auth)self.WebContainer.WebApp.router.add_get("/auth",self.auth)self.WebContainer.WebApp.router.add_get("/auth/resource_check",self.auth_resource)self.WebContainer.WebApp.router.add_put("/auth/resource_check",self.auth_resource_put)self.WebContainer.WebApp.router.add_get("/{tenant}/required_tenant",self.tenant_in_path)self.WebContainer.WebApp.router.add_get("/{tenant}/required_tenant/resource_check",self.tenant_in_path_resources)self.WebContainer.WebApp.router.add_get("/configurable_tenant",self.tenant_in_query)self.WebContainer.WebApp.router.add_get("/configurable_tenant/resource_check",self.tenant_in_query_resources)@asab.web.auth.noauthasyncdefno_auth(self,request):""" NO AUTH - authentication skipped - `tenant`, `user_info`, `resources` params not allowed """data={"tenant":"NOT AVAILABLE","resources":"NOT AVAILABLE","user_info":"NOT AVAILABLE",}returnasab.web.rest.json_response(request,data)asyncdefauth(self,request,*,user_info:typing.Optional[dict],resources:typing.Optional[frozenset]):""" TENANT-AGNOSTIC - returns 401 if authentication not successful - `user_info`, `resources` params allowed - `tenant` param not allowed - `resources` contain only globally granted resources """data={"tenant":"NOT AVAILABLE","resources":list(resources)ifresourceselseNone,"user_info":user_info,}returnasab.web.rest.json_response(request,data)@asab.web.auth.require("something:access","something:edit")asyncdefauth_resource(self,request,*,user_info:typing.Optional[dict],resources:typing.Optional[frozenset]):""" TENANT-AGNOSTIC + RESOURCE CHECK - returns 401 if authentication not successful - globally granted resources checked - returns 403 if resource access not granted - `user_info`, `resources` params allowed - `tenant` param not allowed - `resources` contain only globally granted resources """data={"tenant":"NOT AVAILABLE","resources":list(resources)ifresourceselseNone,"user_info":user_info,}returnasab.web.rest.json_response(request,data)@asab.web.rest.json_schema_handler({"type":"object"})@asab.web.auth.require("something:access","something:edit")asyncdefauth_resource_put(self,request,*,user_info:typing.Optional[dict],resources:typing.Optional[frozenset],json_data:dict):""" Decorator asab.web.auth.require can be used together with other decorators. """data={"tenant":"NOT AVAILABLE","resources":list(resources)ifresourceselseNone,"user_info":user_info,"json_data":json_data,}returnasab.web.rest.json_response(request,data)asyncdeftenant_in_path(self,request,*,tenant:typing.Optional[str],user_info:typing.Optional[dict],resources:typing.Optional[frozenset]):""" TENANT-AWARE - returns 401 if authentication not successful - `tenant` access checked - returns 403 if tenant not accessible - `user_info`, `resources` params allowed - `tenant` param required in path, cannot be None - `resources` contain tenant-granted resources """data={"tenant":tenant,"resources":list(resources)ifresourceselseNone,"user_info":user_info,}returnasab.web.rest.json_response(request,data)asyncdeftenant_in_query(self,request,*,tenant:typing.Optional[str],user_info:typing.Optional[dict],resources:typing.Optional[frozenset]):""" CONFIGURABLY TENANT-AWARE - returns 401 if authentication not successful - `tenant` expected in query string - tenant access checked - returns 403 if tenant not accessible - `tenant` is set to `None` if `tenant` not in query - `user_info`, `resources` params allowed - `resources` contain tenant-granted resources if tenant is not None, otherwise only globally-granted resources """data={"tenant":tenant,"resources":list(resources)ifresourceselseNone,"user_info":user_info,}returnasab.web.rest.json_response(request,data)@asab.web.auth.require("something:access","something:edit")asyncdeftenant_in_path_resources(self,request,*,tenant:typing.Optional[str],user_info:typing.Optional[dict],resources:typing.Optional[frozenset]):""" TENANT-AWARE + RESOURCE CHECK - returns 401 if authentication not successful - `tenant` access checked - returns 403 if tenant not accessible - tenant-accessible resources checked - returns 403 if resource access not granted - `user_info`, `resources` params allowed - `tenant` param required, cannot be None - `resources` contain only resources granted within tenant """data={"tenant":tenant,"resources":list(resources)ifresourceselseNone,"user_info":user_info,}returnasab.web.rest.json_response(request,data)@asab.web.auth.require("something:access","something:edit")asyncdeftenant_in_query_resources(self,request,*,tenant:typing.Optional[str],user_info:typing.Optional[dict],resources:typing.Optional[frozenset]):""" CONFIGURABLY TENANT-AWARE + RESOURCE CHECK - returns 401 if authentication not successful - `tenant` expected in query string - tenant access checked - returns 403 if tenant not accessible - returns 403 if resource access not granted within tenant - `tenant` is set to `None` if `tenant` not in query - returns 403 if tenant is None resource access is not granted globally - `user_info`, `resources` params allowed - `resources` contain tenant-granted resources if tenant is not None, otherwise only globally-granted resources """data={"tenant":tenant,"resources":list(resources)ifresourceselseNone,"user_info":user_info,}returnasab.web.rest.json_response(request,data)if__name__=="__main__":app=MyApplication()app.run()