Discovery Service
Service discovery enables communication among multiple ASAB microservices in a server cluster. Each microservice can search for and find address to API interface (URL) of any other service within the cluster.
In service discovery, there are two main roles: the "server" and the "client."
The server advertises its "position" in the cluster and provides an API for communication. The client uses this API to interact with the server.
In the context of service discovery, all microservices in the cluster act as both servers and clients.
Prerequisites
Following requirements must be fulfilled:
- Zookeeper connection and configuration must be the same for all services in the cluster.
- Zookeeper container must be initialized in the service.
asab.WebService
and asab.WebContainer
must be initialized.
asab.APIService
must be initialized.
- Environment variables
NODE_ID
, SERVICE_ID
and INSTANCE_ID
must be set.
INSTANCE_ID
(or hostname if INSTANCE_ID
is missing) must be resolvable.
Server - Advertising into ZooKeeper
Even though the service can provide multiple communication interfaces, major use case is the web API.
A "server" application has to provide the API and advertise its address into a consensus technlogy - ZooKeeper.
Application requirements
Services inside the application must be initialized in the right order.
class MyApp(asab.Application):
def __init__(self):
super.__init__(self)
# Initialize web server
self.add_module(asab.web.Module)
websvc = self.get_service("asab.WebService")
self.WebContainer = asab.web.WebContainer(websvc, "web")
# Initialize zookeeper
self.add_module(asab.zookeeper.Module)
self.ZooKeeperService = self.get_service("asab.ZooKeeperService")
self.ZooKeeperContainer = asab.zookeeper.ZooKeeperContainer(
self.ZooKeeperService, "zookeeper"
)
# Initialize ApiService
self.ASABApiService = asab.api.ApiService(self)
self.ASABApiService.initialize_web(self.WebContainer)
self.ASABApiService.initialize_zookeeper(self.ZooKeeperContainer)
Configuration
ZooKeeper configuration must be the same for all services in the cluster. Port can be set in the configuration.
myapp.conf[zookeeper]
servers=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
[web]
listen=0.0.0.0 8090
Environment variables
The discovery is based on provided "ids". Use either a set of specific ids provided as environemntal variable, or create custom discovery ids within the application.
Pre-cast ids used for service discovery. Provide them as environment variables:
Variable |
Description |
Example |
SERVICE_ID |
Identifier of the ASAB microservice. |
my-app |
INSTANCE_ID |
Identifier of a specific instance of the ASAB microservice. |
my-app-1, my-app-2, ... |
NODE_ID |
Identifier of a cluster node. |
node-1, node-2, ... |
Instance ID must be resolvable
ASAB framework cannot set up networking. In order to enable service discovery, INSTANCE_ID
of a service must be resolavable from the client. If INSTANCE_ID
is missing, hostname is taken instead and then, hostname must be resolvable.
When all requirements of a server-side microservice are fullfilled, the service advertises into the consensus a unique information bound to its runtime.
The file of JSON format contains information describing the running application.
asab/run/ASABMyApplication.0000090098{
"host": "server-name-1",
"appclass": "MyApp",
"node_id": "node-1",
"service_id": "my-app",
"instance_id": "my-app-1",
"launch_time": "2023-12-01T10:52:29.656397Z",
"process_id": 1,
"containerized": true,
"created_at": "2023-11-23T11:08:22.454248Z",
"version": "v23.47-alpha",
"CI_COMMIT_TAG": "v23.47-alpha",
"CI_COMMIT_REF_NAME": "v23.47-alpha",
"CI_COMMIT_SHA": "...",
"CI_COMMIT_TIMESTAMP": "2023-11-23T11:00:48+00:00",
"CI_JOB_ID": "...",
"CI_PIPELINE_CREATED_AT": "2023-11-23T11:06:40Z",
"CI_RUNNER_ID": "..",
"CI_RUNNER_EXECUTABLE_ARCH": "linux/amd64",
"web": [
[
"0.0.0.0",
8090
]
]
}
Client - Using Service Discovery
All services are being "server" and "client" at the same time. This paragraph describes how to discover (as a client) a service in the cluster.
Call API using DiscoveryService.session()
Once the service propagates itself into ZooKeeper, other services in the cluster can use its API.
DiscoveryService
provides a method session()
which inherits from aiohttp.ClientSession
. It can be used the same way as aiohttp.ClientSession
. Instead of explicit URL, use URL with asab
domain.
The URL is constructed in the format http://<value>.<key>.asab/...
where key is the name of the identifier (e.g. instance_id
, service_id
) and value is its value (e.g. my_app_1
)
Example
class MyApp(asab.Application):
def __init__(self):
super.__init__(self)
# Initialize web server
self.add_module(asab.web.Module)
websvc = self.get_service("asab.WebService")
self.WebContainer = asab.web.WebContainer(websvc, "web")
# Initialize zookeeper
self.add_module(asab.zookeeper.Module)
self.ZooKeeperService = self.get_service("asab.ZooKeeperService")
self.ZooKeeperContainer = asab.zookeeper.ZooKeeperContainer(
self.ZooKeeperService, "zookeeper"
)
# The DiscoverySession is functional only with ApiService initialized.
self.ASABApiService = asab.api.ApiService(self)
self.ASABApiService.initialize_web(self.WebContainer)
self.ASABApiService.initialize_zookeeper(self.ZooKeeperContainer)
self.DiscoveryService = self.get_service("asab.DiscoveryService")
async def main(self):
async with self.DiscoveryService.session() as session:
try:
# use URL in format: <protocol>://<value>.<key>.asab/<endpoint> where key is "service_id" or "instance_id" and value the respective service identificator
async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
if resp.status == 200:
config = await resp.json()
except asab.api.discovery.NotDiscoveredError as e:
L.error(e)
Warning
asab.DiscoveryService
is functional only with ApiService
initialized. That means, WebContainer
and ZooKeeperContainer
must be also present in the application.
Warning
Discovery Service searches for microservices in the same ZooKeeper path as defined in the configuration. Therefore, all services in the cluster should be configured with the same ZooKeeper path.
locate()
Returns set of URLs based on an id of a service. Provide the filter as a dictionary. The keys of the dictionary can be node_id
, service_id
, instance_id
or custom ids.
Example
To look for all URLs of a service called very-nice-service
, use inside the application:
await self.DiscoveryService.locate({"service_id": "very-nice-service"})
It returns a set of values, URLs. E.g.
{"http://very-nice-service-1:8888", "http://very-nice-service-2:8889"}
discover()
Returns a dictionary with all known services, organized by their ids and information on how to resolve them.
Example
await self.DiscoveryService.discover()
Custom discovery ids
Custom identifiers can be set during runtime.
Example
Inside the application, when Api Service is already initialized:
self.ASABApiService.update_discovery({"custom_id": ["id1", "id2", "id3"]})
The argument of the method must be a dictionary, where key is string and value is a list.
The custom_id
can be used in both discovery session and locate()
method.
Using service discovery during authorization
When using authorization server e.g. SeaCat Auth to provide authorization for each API call, it can be also found in the cluster through service discovery. In order to connect asab.AuthService
with the service discovery, several requiremetns must be met:
- API Service must be fully initiliazed BEFORE Auth Service.
- Authorization server (SeaCat Auth) must be present in the cluster, resolvable by its instance id, and advertising itself into the ZooKeeper (being server itself).
[auth]
configuration section must contain URL recognizable by the service discovery.
Auth Service using service discovery
API Service must be fully initiliazed BEFORE Auth Service.
class MyApp(asab.Application):
def __init__(self):
super.__init__(self)
# Initialize web server
self.add_module(asab.web.Module)
websvc = self.get_service("asab.WebService")
self.WebContainer = asab.web.WebContainer(websvc, "web")
# Initialize zookeeper
self.add_module(asab.zookeeper.Module)
self.ZooKeeperService = self.get_service("asab.ZooKeeperService")
self.ZooKeeperContainer = asab.zookeeper.ZooKeeperContainer(
self.ZooKeeperService, "zookeeper"
)
# The DiscoverySession is functional only with ApiService initialized.
self.ASABApiService = asab.api.ApiService(self)
self.ASABApiService.initialize_web(self.WebContainer)
self.ASABApiService.initialize_zookeeper(self.ZooKeeperContainer)
self.DiscoveryService = self.get_service("asab.DiscoveryService")
# Initialize authorization after ASABApiService.initialize_zookeeper() to get DiscoveryService into auth module
self.AuthService = asab.web.auth.AuthService(self)
self.AuthService.install(self.WebContainer)
[auth]
configuration section must contain URL recognizable by the service discovery.
my_app.conf[auth]
"public_keys_url": "http://seacat-auth.service_id.asab/.well-known/jwks.json"
Reference
asab.api.discovery
DiscoveryResolver
Bases: DefaultResolver
Custom resolver for Discovery Session based on default aiohttp
resolver.
Source code in asab/api/discovery.py
| class DiscoveryResolver(aiohttp.DefaultResolver):
"""
Custom resolver for Discovery Session based on default `aiohttp` resolver.
"""
def __init__(self, svc) -> None:
super().__init__()
self.DiscoveryService = svc
async def resolve(self, hostname: str, port: int = 0, family: int = socket.AF_INET) -> typing.List[typing.Dict[str, typing.Any]]:
"""
Resolve a hostname only with '.asab' domain. and return a list of dictionaries
containing information about the resolved hosts further used by `aiohttp.TCPConnector`.
The hostname to resolve must be in the format of "<value>.<key>.asab",
where key is "service_id" or "instance_id" and value is the particular identificator of the service to be resolved.
The "asab" domain is required for resolution, otherwise it is treated like normal URL.
"""
url_split = hostname.rsplit(".", 2)
# If there is no asab domain, it is normal URL and resolve it normally
if url_split[-1] != "asab":
return await super().resolve(hostname, port, family)
# Make sure the format of the hostname is right
if len(url_split) != 3:
raise NotDiscoveredError("Invalid format of the hostname '{}'. Use e.g. `asab-config.service_id.asab` instead.".format(hostname))
hosts = []
located_instances = await self.DiscoveryService._locate({url_split[1]: url_split[0]})
if located_instances is None or len(located_instances) == 0:
raise NotDiscoveredError("Failed to discover '{}'.".format(hostname))
for phostname, pport, pfamily in located_instances:
try:
resolved = await super().resolve(phostname, pport, pfamily)
except socket.gaierror:
# Skip unresolved hosts
continue
hosts.extend(resolved)
if len(hosts) == 0:
raise NotDiscoveredError("Failed to resolve any of the hosts for '{}'.".format(hostname))
return hosts
|
resolve(hostname, port=0, family=socket.AF_INET)
async
Resolve a hostname only with '.asab' domain. and return a list of dictionaries
containing information about the resolved hosts further used by aiohttp.TCPConnector
.
The hostname to resolve must be in the format of "..asab",
where key is "service_id" or "instance_id" and value is the particular identificator of the service to be resolved.
The "asab" domain is required for resolution, otherwise it is treated like normal URL.
Source code in asab/api/discovery.py
| async def resolve(self, hostname: str, port: int = 0, family: int = socket.AF_INET) -> typing.List[typing.Dict[str, typing.Any]]:
"""
Resolve a hostname only with '.asab' domain. and return a list of dictionaries
containing information about the resolved hosts further used by `aiohttp.TCPConnector`.
The hostname to resolve must be in the format of "<value>.<key>.asab",
where key is "service_id" or "instance_id" and value is the particular identificator of the service to be resolved.
The "asab" domain is required for resolution, otherwise it is treated like normal URL.
"""
url_split = hostname.rsplit(".", 2)
# If there is no asab domain, it is normal URL and resolve it normally
if url_split[-1] != "asab":
return await super().resolve(hostname, port, family)
# Make sure the format of the hostname is right
if len(url_split) != 3:
raise NotDiscoveredError("Invalid format of the hostname '{}'. Use e.g. `asab-config.service_id.asab` instead.".format(hostname))
hosts = []
located_instances = await self.DiscoveryService._locate({url_split[1]: url_split[0]})
if located_instances is None or len(located_instances) == 0:
raise NotDiscoveredError("Failed to discover '{}'.".format(hostname))
for phostname, pport, pfamily in located_instances:
try:
resolved = await super().resolve(phostname, pport, pfamily)
except socket.gaierror:
# Skip unresolved hosts
continue
hosts.extend(resolved)
if len(hosts) == 0:
raise NotDiscoveredError("Failed to resolve any of the hosts for '{}'.".format(hostname))
return hosts
|
DiscoveryService
Bases: Service
Service for discovering ASAB microservices in a server cluster. It is based on searching in ZooKeeper /run
path.
Source code in asab/api/discovery.py
| class DiscoveryService(Service):
"""
Service for discovering ASAB microservices in a server cluster. It is based on searching in ZooKeeper `/run` path.
"""
def __init__(self, app, zkc, service_name="asab.DiscoveryService") -> None:
super().__init__(app, service_name)
self.ZooKeeperContainer = zkc
self.ProactorService = zkc.ProactorService
self.InternalAuthKeyPath = "/asab/auth/internal_auth_private.key"
self.InternalAuthKey = None
self.InternalAuthToken = None
self.InternalAuthTokenExpiration: datetime.timedelta = datetime.timedelta(seconds=5 * 300)
self._advertised_cache = dict()
self._cache_lock = asyncio.Lock()
self._ready_event = asyncio.Event()
self.App.PubSub.subscribe("Application.tick/300!", self._on_tick)
self.App.PubSub.subscribe("ZooKeeperContainer.state/CONNECTED!", self._on_zk_ready)
def _on_tick(self, msg):
self.App.TaskService.schedule(self._rescan_advertised_instances())
if jwcrypto is not None:
self._ensure_internal_auth_token()
def _on_zk_ready(self, msg, zkc):
if zkc == self.ZooKeeperContainer:
self.App.TaskService.schedule(self._rescan_advertised_instances())
if jwcrypto is not None:
self.App.TaskService.schedule(self._ensure_internal_auth_key(zkc))
async def _ensure_internal_auth_key(self, zkc=None):
zkc = zkc or self.ZooKeeperContainer
private_key_json = None
# Attempt to create and write a new private key
# while avoiding race condition with other ASAB services
while not private_key_json:
# Try to get the key
try:
private_key_json, _ = zkc.ZooKeeper.Client.get(self.InternalAuthKeyPath)
break
except kazoo.exceptions.NoNodeError:
pass
# Generate a new key
private_key = jwcrypto.jwk.JWK.generate(kty="EC", crv="P-256")
private_key_json = json.dumps(private_key.export(as_dict=True)).encode("utf-8")
try:
zkc.ZooKeeper.Client.create(self.InternalAuthKeyPath, private_key_json, makepath=True)
L.info("Internal auth key created.", struct_data={
"kid": private_key.key_id, "path": self.InternalAuthKeyPath})
except kazoo.exceptions.NodeExistsError:
# Another ASAB service has probably created the key in the meantime
pass
private_key = jwcrypto.jwk.JWK.from_json(private_key_json)
if private_key != self.InternalAuthKey:
# Private key has changed
self.InternalAuthKey = private_key
self._ensure_internal_auth_token(force_new=True)
def _ensure_internal_auth_token(self, force_new: bool = False):
assert self.InternalAuthKey
def _get_own_discovery_url():
instance_id = os.getenv("INSTANCE_ID", None)
if instance_id:
return "http://{}.instance_id.asab".format(instance_id)
service_id = os.getenv("SERVICE_ID", None)
if service_id:
return "http://{}.service_id.asab".format(service_id)
return "http://{}".format(self.App.HostName)
if self.InternalAuthToken and not force_new:
claims = json.loads(self.InternalAuthToken.claims)
if claims.get("exp") > (
datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(seconds=300)
).timestamp():
# Token is valid and does not expire soon
return
# Use this service's discovery URL as issuer ID and authorized party ID
my_discovery_url = _get_own_discovery_url()
expiration = datetime.datetime.now(datetime.timezone.utc) + self.InternalAuthTokenExpiration
claims = {
# Issuer (URL of the app that created the token)
"iss": my_discovery_url,
# Issued at
"iat": int(datetime.datetime.now(datetime.timezone.utc).timestamp()),
# Expires at
"exp": int((expiration).timestamp()),
# Authorized party
"azp": my_discovery_url,
# Audience (who is allowed to use this token)
"aud": "http://{}".format(self.App.HostName), # TODO: Something that signifies "anyone in this internal space"
# Tenants and resources
"resources": {
"*": ["authz:superuser"],
}
}
self.InternalAuthToken = jwcrypto.jwt.JWT(
header={
"alg": "ES256",
"typ": "JWT",
"kid": self.InternalAuthKey.key_id,
},
claims=json.dumps(claims)
)
self.InternalAuthToken.make_signed_token(self.InternalAuthKey)
L.info("New internal auth token issued.", struct_data={"exp": expiration})
async def locate(self, instance_id: str = None, **kwargs) -> set:
"""
Return a list of URLs for a given instance or service ID.
Args:
instance_id (str): The ID of a specific instance of a service that the client wants to locate.
service_id (str): The `service_id` parameter represents identifier of a service to locate.
It is used to query a service registry to find the instances of the service that are currently available.
Returns: A list of URLs in the format "http://servername:port" for the specified instance or service.
"""
if instance_id is not None:
locate_params = {"instance_id": instance_id}
elif len(kwargs) > 0:
locate_params = kwargs
else:
L.warning("Please provide instance_id, service_id, or other custom id to locate the service(s).")
return None
# Each taget can have two records - one for ipv4 and second for ipv6. This information is redundant in the URL format.
return set([
"http://{}:{}".format(servername, port)
for servername, port, family
in await self._locate(locate_params)
])
async def _locate(self, locate_params) -> typing.Set[typing.Tuple]:
"""
Locate service instances based on their instance ID or service ID.
Args:
instance_id (str): The unique identifier for a specific instance of a service
service_id (str): The ID of the service to locate
Returns: a list of tuples containing the server name and port number of the located service(s).
"""
res = set()
await asyncio.wait_for(self._ready_event.wait(), 600)
if len(self._advertised_cache) == 0:
L.warning("No instances to discover. Make sure [zookeeper] configuration is identical for all the ASAB services in the cluster.")
return res
for id_type, ids in self._advertised_cache.items():
if id_type in locate_params:
if locate_params[id_type] in ids:
res = res | ids[locate_params[id_type]]
return res
async def discover(self) -> typing.Dict[str, typing.Dict[str, typing.Set[typing.Tuple]]]:
# We need to make a copy of the cache so that the caller can't modify our cache.
await asyncio.wait_for(self._ready_event.wait(), 600)
return copy.deepcopy(self._advertised_cache)
async def get_advertised_instances(self) -> typing.List[typing.Dict]:
"""
This method is here for backward compatibility. Use `discover()` method instead.
Returns a list of dictionaries. Each dictionary represents an advertised instance
obtained by iterating over the items in the `/run` path in ZooKeeper.
"""
# TODO: an obsolete log for this method
advertised = []
async for item, item_data in self._iter_zk_items():
item_data['ephemeral_id'] = item
advertised.append(item_data)
return advertised
async def _rescan_advertised_instances(self):
"""
Returns structured dataset of identifier types, identifiers of instances and hosts and ports where they can be found.
It is a dict of identifier types as keys and dict as values. The second-layer dict has identifiers as keys an a set of tuples as a value. Each tuple contains host and port.
Example of the data structure:
{
"instance_id": {
"lmio-receiver-1": {("node1", 1234, socket.AF_INET)},
"asab-remote-control-1": {("node2", 1234, socket.AF_INET6)}
...
},
"service_id": {
"lmio-receiver": {("node1", 1234, socket.AF_INET), ("node2", 1234, socket.AF_INET), ("node3", 1234, socket.AF_INET)},
...
},
"custom1_id": {
"myid123": {("node1", 5678, socket.AF_INET), ("node2", 5678, socket.AF_INET6)},
...
},
"custom2_id": {
"myid123": {("node1", 5678, socket.AF_INET), ("node2", 5678, socket.AF_INET)},
...
},
...
}
"""
if self._cache_lock.locked():
# Only one rescan / cache update at a time
return
async with self._cache_lock:
advertised = {
"instance_id": {},
"service_id": {},
}
iterator = self._iter_zk_items()
if iterator is None:
return # reason logged from the _iter_zk_items method
async for item, item_data in iterator:
instance_id = item_data.get("instance_id")
service_id = item_data.get("service_id")
discovery: typing.Dict[str, list] = item_data.get("discovery", {})
if instance_id is not None:
discovery["instance_id"] = [instance_id]
if instance_id is not None:
discovery["service_id"] = [service_id]
host = item_data.get("host")
if host is None:
continue
web = item_data.get("web")
if web is None:
continue
for i in web:
try:
ip = i[0]
port = i[1]
except KeyError:
L.error("Unexpected format of 'web' section in advertised data: '{}'".format(web))
return
if ip == "0.0.0.0":
family = socket.AF_INET
elif ip == "::":
family = socket.AF_INET6
else:
continue
if discovery is not None:
for id_type, ids in discovery.items():
if advertised.get(id_type) is None:
advertised[id_type] = {}
for identifier in ids:
if identifier is not None:
if advertised[id_type].get(identifier) is None:
advertised[id_type][identifier] = {(host, port, family)}
else:
advertised[id_type][identifier].add((host, port, family))
self._advertised_cache = advertised
self._ready_event.set()
async def _iter_zk_items(self):
base_path = self.ZooKeeperContainer.Path + "/run"
def get_items():
try:
# Create the base path if it does not exist
if not self.ZooKeeperContainer.ZooKeeper.Client.exists(base_path):
self.ZooKeeperContainer.ZooKeeper.Client.create(base_path, b'', makepath=True)
return self.ZooKeeperContainer.ZooKeeper.Client.get_children(base_path, watch=self._on_change_threadsafe)
except (kazoo.exceptions.SessionExpiredError, kazoo.exceptions.ConnectionLoss):
L.warning("Connection to ZooKeeper lost. Discovery Service could not fetch up-to-date state of the cluster services.")
return None
except kazoo.exceptions.KazooException:
return None
def get_data(item):
try:
data, stat = self.ZooKeeperContainer.ZooKeeper.Client.get((base_path + '/' + item), watch=self._on_change_threadsafe)
return data
except (kazoo.exceptions.SessionExpiredError, kazoo.exceptions.ConnectionLoss):
L.warning("Connection to ZooKeeper lost. Discovery Service could not fetch up-to-date state of the cluster services.")
return None
except kazoo.exceptions.KazooException:
# There's a race condition -> if ZK node is deleted between get_items and get_data calls, NoNodeError is raised. Let's just ignore it. The ZK node is already deleted.
return None
items = await self.ProactorService.execute(get_items)
if items is None:
return
for item in items:
item_data = await self.ProactorService.execute(get_data, item)
if item_data is None:
continue
yield item, json.loads(item_data)
def _on_change_threadsafe(self, watched_event):
# Runs on a thread, returns the process back to the main thread
self.App.TaskService.schedule_threadsafe(self._rescan_advertised_instances())
def session(
self,
base_url: typing.Optional[str] = None,
auth: typing.Union[str, aiohttp.ClientRequest, None] = None,
headers: typing.Optional[typing.Mapping[str, str]] = None,
**kwargs
) -> aiohttp.ClientSession:
"""
Open HTTP session with custom hostname resolver for ASAB microservices.
Args:
:param base_url: Base URL to use for requests.
:param auth: Client request to extract authorization from, or the string "internal", to use internal authorization.
:param headers: Custom session HTTP headers.
Usage:
```python
# Without authorization
async with self.DiscoveryService.session() as session:
# use URL in format: <protocol>://<value>.<key>.asab/<endpoint>
# where key is "service_id" or "instance_id"
# and value the respective service identificator
async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
...
# Using end-user authorization (from the UI)
async with self.DiscoveryService.session(auth=request) as session:
async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
...
# Using internal m2m authorization
async with self.DiscoveryService.session(auth="internal") as session:
async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
...
"""
_headers = {}
if auth is None:
# By default, use the authorization from the incoming request
request = Request.get(None)
if request is not None and "Authorization" in request.headers:
_headers["Authorization"] = request.headers["Authorization"]
elif isinstance(auth, aiohttp.web.Request):
assert "Authorization" in auth.headers
_headers["Authorization"] = auth.headers["Authorization"]
elif auth == "internal":
if jwcrypto is None:
raise ModuleNotFoundError(
"You are trying to use internal auth without 'jwcrypto' installed. "
"Please run 'pip install jwcrypto' or install asab with 'authz' optional dependency."
)
authz = Authz.get(None)
if authz is not None:
L.warning(
"Using internal (superuser) authorization in an already authorized context. "
"This is potentially unwanted and dangerous.",
)
_headers["Authorization"] = "Bearer {}".format(self.InternalAuthToken.serialize())
else:
raise ValueError(
"Invalid 'auth' value. "
"Only instances of aiohttp.web.Request or the literal string 'internal' are allowed. "
"Found {}.".format(type(auth))
)
if headers is not None:
_headers.update(headers)
return aiohttp.ClientSession(
base_url,
connector=aiohttp.TCPConnector(resolver=DiscoveryResolver(self)),
headers=_headers,
**kwargs
)
|
get_advertised_instances()
async
This method is here for backward compatibility. Use discover()
method instead.
Returns a list of dictionaries. Each dictionary represents an advertised instance
obtained by iterating over the items in the /run
path in ZooKeeper.
Source code in asab/api/discovery.py
| async def get_advertised_instances(self) -> typing.List[typing.Dict]:
"""
This method is here for backward compatibility. Use `discover()` method instead.
Returns a list of dictionaries. Each dictionary represents an advertised instance
obtained by iterating over the items in the `/run` path in ZooKeeper.
"""
# TODO: an obsolete log for this method
advertised = []
async for item, item_data in self._iter_zk_items():
item_data['ephemeral_id'] = item
advertised.append(item_data)
return advertised
|
locate(instance_id=None, **kwargs)
async
Return a list of URLs for a given instance or service ID.
Parameters:
Name |
Type |
Description |
Default |
instance_id
|
str
|
The ID of a specific instance of a service that the client wants to locate.
|
None
|
service_id
|
str
|
The service_id parameter represents identifier of a service to locate.
It is used to query a service registry to find the instances of the service that are currently available.
|
required
|
Returns: A list of URLs in the format "http://servername:port" for the specified instance or service.
Source code in asab/api/discovery.py
| async def locate(self, instance_id: str = None, **kwargs) -> set:
"""
Return a list of URLs for a given instance or service ID.
Args:
instance_id (str): The ID of a specific instance of a service that the client wants to locate.
service_id (str): The `service_id` parameter represents identifier of a service to locate.
It is used to query a service registry to find the instances of the service that are currently available.
Returns: A list of URLs in the format "http://servername:port" for the specified instance or service.
"""
if instance_id is not None:
locate_params = {"instance_id": instance_id}
elif len(kwargs) > 0:
locate_params = kwargs
else:
L.warning("Please provide instance_id, service_id, or other custom id to locate the service(s).")
return None
# Each taget can have two records - one for ipv4 and second for ipv6. This information is redundant in the URL format.
return set([
"http://{}:{}".format(servername, port)
for servername, port, family
in await self._locate(locate_params)
])
|
session(base_url=None, auth=None, headers=None, **kwargs)
Open HTTP session with custom hostname resolver for ASAB microservices.
Parameters:
Name |
Type |
Description |
Default |
|
|
param base_url: Base URL to use for requests.
|
required
|
|
|
param auth: Client request to extract authorization from, or the string "internal", to use internal authorization.
|
required
|
|
|
param headers: Custom session HTTP headers.
|
required
|
Usage:
```python
Without authorization
async with self.DiscoveryService.session() as session:
# use URL in format: ://..asab/
# where key is "service_id" or "instance_id"
# and value the respective service identificator
async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
...
Using end-user authorization (from the UI)
async with self.DiscoveryService.session(auth=request) as session:
async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
...
Using internal m2m authorization
async with self.DiscoveryService.session(auth="internal") as session:
async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
...
Source code in asab/api/discovery.py
| def session(
self,
base_url: typing.Optional[str] = None,
auth: typing.Union[str, aiohttp.ClientRequest, None] = None,
headers: typing.Optional[typing.Mapping[str, str]] = None,
**kwargs
) -> aiohttp.ClientSession:
"""
Open HTTP session with custom hostname resolver for ASAB microservices.
Args:
:param base_url: Base URL to use for requests.
:param auth: Client request to extract authorization from, or the string "internal", to use internal authorization.
:param headers: Custom session HTTP headers.
Usage:
```python
# Without authorization
async with self.DiscoveryService.session() as session:
# use URL in format: <protocol>://<value>.<key>.asab/<endpoint>
# where key is "service_id" or "instance_id"
# and value the respective service identificator
async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
...
# Using end-user authorization (from the UI)
async with self.DiscoveryService.session(auth=request) as session:
async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
...
# Using internal m2m authorization
async with self.DiscoveryService.session(auth="internal") as session:
async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
...
"""
_headers = {}
if auth is None:
# By default, use the authorization from the incoming request
request = Request.get(None)
if request is not None and "Authorization" in request.headers:
_headers["Authorization"] = request.headers["Authorization"]
elif isinstance(auth, aiohttp.web.Request):
assert "Authorization" in auth.headers
_headers["Authorization"] = auth.headers["Authorization"]
elif auth == "internal":
if jwcrypto is None:
raise ModuleNotFoundError(
"You are trying to use internal auth without 'jwcrypto' installed. "
"Please run 'pip install jwcrypto' or install asab with 'authz' optional dependency."
)
authz = Authz.get(None)
if authz is not None:
L.warning(
"Using internal (superuser) authorization in an already authorized context. "
"This is potentially unwanted and dangerous.",
)
_headers["Authorization"] = "Bearer {}".format(self.InternalAuthToken.serialize())
else:
raise ValueError(
"Invalid 'auth' value. "
"Only instances of aiohttp.web.Request or the literal string 'internal' are allowed. "
"Found {}.".format(type(auth))
)
if headers is not None:
_headers.update(headers)
return aiohttp.ClientSession(
base_url,
connector=aiohttp.TCPConnector(resolver=DiscoveryResolver(self)),
headers=_headers,
**kwargs
)
|
NotDiscoveredError
Bases: RuntimeError
Raised when given service is not discovered.
Source code in asab/api/discovery.py
| class NotDiscoveredError(RuntimeError):
"""
Raised when given service is not discovered.
"""
pass
|