Skip to content

Discovery Service¤

Service discovery enables communication among multiple ASAB microservices in a server cluster. Each microservice can search for and find address to API interface (URL) of any other service within the cluster.

In service discovery, there are two main roles: the "server" and the "client."

The server advertises its "position" in the cluster and provides an API for communication. The client uses this API to interact with the server.

In the context of service discovery, all microservices in the cluster act as both servers and clients.

Prerequisites¤

Following requirements must be fulfilled:

  • Zookeeper connection and configuration must be the same for all services in the cluster.
  • Zookeeper container must be initialized in the service.
  • asab.WebService and asab.WebContainer must be initialized.
  • asab.APIService must be initialized.
  • Environment variables NODE_ID, SERVICE_ID and INSTANCE_ID must be set.
  • INSTANCE_ID (or hostname if INSTANCE_ID is missing) must be resolvable.

Server - Advertising into ZooKeeper¤

Even though the service can provide multiple communication interfaces, major use case is the web API.

A "server" application has to provide the API and advertise its address into a consensus technlogy - ZooKeeper.

Application requirements¤

Services inside the application must be initialized in the right order.

class MyApp(asab.Application):
    def __init__(self):
        super.__init__(self)
        # Initialize web server
        self.add_module(asab.web.Module)
        websvc = self.get_service("asab.WebService")
        self.WebContainer = asab.web.WebContainer(websvc, "web")

        # Initialize zookeeper
        self.add_module(asab.zookeeper.Module)
        self.ZooKeeperService = self.get_service("asab.ZooKeeperService")
        self.ZooKeeperContainer = asab.zookeeper.ZooKeeperContainer(
            self.ZooKeeperService, "zookeeper"
        )

        # Initialize ApiService
        self.ASABApiService = asab.api.ApiService(self)
        self.ASABApiService.initialize_web(self.WebContainer)
        self.ASABApiService.initialize_zookeeper(self.ZooKeeperContainer)

Configuration¤

ZooKeeper configuration must be the same for all services in the cluster. Port can be set in the configuration.

myapp.conf
[zookeeper]
servers=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181

[web]
listen=0.0.0.0 8090

Environment variables¤

The discovery is based on provided "ids". Use either a set of specific ids provided as environemntal variable, or create custom discovery ids within the application.

Pre-cast ids used for service discovery. Provide them as environment variables:

Variable Description Example
SERVICE_ID Identifier of the ASAB microservice. my-app
INSTANCE_ID Identifier of a specific instance of the ASAB microservice. my-app-1, my-app-2, ...
NODE_ID Identifier of a cluster node. node-1, node-2, ...

Instance ID must be resolvable

ASAB framework cannot set up networking. In order to enable service discovery, INSTANCE_ID of a service must be resolavable from the client. If INSTANCE_ID is missing, hostname is taken instead and then, hostname must be resolvable.

Information advertised¤

When all requirements of a server-side microservice are fullfilled, the service advertises into the consensus a unique information bound to its runtime.

The file of JSON format contains information describing the running application.

asab/run/ASABMyApplication.0000090098
{
    "host": "server-name-1",
    "appclass": "MyApp",
    "node_id": "node-1",
    "service_id": "my-app",
    "instance_id": "my-app-1",
    "launch_time": "2023-12-01T10:52:29.656397Z",
    "process_id": 1,
    "containerized": true,
    "created_at": "2023-11-23T11:08:22.454248Z",
    "version": "v23.47-alpha",
    "CI_COMMIT_TAG": "v23.47-alpha",
    "CI_COMMIT_REF_NAME": "v23.47-alpha",
    "CI_COMMIT_SHA": "...",
    "CI_COMMIT_TIMESTAMP": "2023-11-23T11:00:48+00:00",
    "CI_JOB_ID": "...",
    "CI_PIPELINE_CREATED_AT": "2023-11-23T11:06:40Z",
    "CI_RUNNER_ID": "..",
    "CI_RUNNER_EXECUTABLE_ARCH": "linux/amd64",
    "web": [
        [
            "0.0.0.0",
            8090
        ]
    ]
}

Client - Using Service Discovery¤

All services are being "server" and "client" at the same time. This paragraph describes how to discover (as a client) a service in the cluster.

Call API using DiscoveryService.session()¤

Once the service propagates itself into ZooKeeper, other services in the cluster can use its API.

DiscoveryService provides a method session() which inherits from aiohttp.ClientSession. It can be used the same way as aiohttp.ClientSession. Instead of explicit URL, use URL with asab domain.

The URL is constructed in the format http://<value>.<key>.asab/... where key is the name of the identifier (e.g. instance_id, service_id) and value is its value (e.g. my_app_1)

Example

class MyApp(asab.Application):
    def __init__(self):
        super.__init__(self)
        # Initialize web server
        self.add_module(asab.web.Module)
        websvc = self.get_service("asab.WebService")
        self.WebContainer = asab.web.WebContainer(websvc, "web")

        # Initialize zookeeper
        self.add_module(asab.zookeeper.Module)
        self.ZooKeeperService = self.get_service("asab.ZooKeeperService")
        self.ZooKeeperContainer = asab.zookeeper.ZooKeeperContainer(
            self.ZooKeeperService, "zookeeper"
        )

        # The DiscoverySession is functional only with ApiService initialized.
        self.ASABApiService = asab.api.ApiService(self)
        self.ASABApiService.initialize_web(self.WebContainer)
        self.ASABApiService.initialize_zookeeper(self.ZooKeeperContainer)

        self.DiscoveryService = self.get_service("asab.DiscoveryService")

    async def main(self):
        async with self.DiscoveryService.session() as session: 
            try:
                # use URL in format: <protocol>://<value>.<key>.asab/<endpoint> where key is "service_id" or "instance_id" and value the respective service identificator
                async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
                    if resp.status == 200:
                        config = await resp.json()
            except asab.api.discovery.NotDiscoveredError as e:
                L.error(e)

Warning

asab.DiscoveryService is functional only with ApiService initialized. That means, WebContainer and ZooKeeperContainer must be also present in the application.

Warning

Discovery Service searches for microservices in the same ZooKeeper path as defined in the configuration. Therefore, all services in the cluster should be configured with the same ZooKeeper path.

locate()¤

Returns set of URLs based on an id of a service. Provide the filter as a dictionary. The keys of the dictionary can be node_id, service_id, instance_id or custom ids.

Example

To look for all URLs of a service called very-nice-service, use inside the application:

await self.DiscoveryService.locate({"service_id": "very-nice-service"})
It returns a set of values, URLs. E.g.
{"http://very-nice-service-1:8888", "http://very-nice-service-2:8889"}

discover()¤

Returns a dictionary with all known services, organized by their ids and information on how to resolve them.

Example

await self.DiscoveryService.discover()

Custom discovery ids¤

Custom identifiers can be set during runtime.

Example

Inside the application, when Api Service is already initialized:

self.ASABApiService.update_discovery({"custom_id": ["id1", "id2", "id3"]})

The argument of the method must be a dictionary, where key is string and value is a list. The custom_id can be used in both discovery session and locate() method.

Using service discovery during authorization¤

When using authorization server e.g. SeaCat Auth to provide authorization for each API call, it can be also found in the cluster through service discovery. In order to connect asab.AuthService with the service discovery, several requiremetns must be met:

  • API Service must be fully initiliazed BEFORE Auth Service.
  • Authorization server (SeaCat Auth) must be present in the cluster, resolvable by its instance id, and advertising itself into the ZooKeeper (being server itself).
  • [auth] configuration section must contain URL recognizable by the service discovery.

Auth Service using service discovery

API Service must be fully initiliazed BEFORE Auth Service.

class MyApp(asab.Application):
    def __init__(self):
        super.__init__(self)
        # Initialize web server
        self.add_module(asab.web.Module)
        websvc = self.get_service("asab.WebService")
        self.WebContainer = asab.web.WebContainer(websvc, "web")

        # Initialize zookeeper
        self.add_module(asab.zookeeper.Module)
        self.ZooKeeperService = self.get_service("asab.ZooKeeperService")
        self.ZooKeeperContainer = asab.zookeeper.ZooKeeperContainer(
            self.ZooKeeperService, "zookeeper"
        )

        # The DiscoverySession is functional only with ApiService initialized.
        self.ASABApiService = asab.api.ApiService(self)
        self.ASABApiService.initialize_web(self.WebContainer)
        self.ASABApiService.initialize_zookeeper(self.ZooKeeperContainer)

        self.DiscoveryService = self.get_service("asab.DiscoveryService")

        # Initialize authorization after ASABApiService.initialize_zookeeper() to get DiscoveryService into auth module
        self.AuthService = asab.web.auth.AuthService(self)
        self.AuthService.install(self.WebContainer)

[auth] configuration section must contain URL recognizable by the service discovery.

my_app.conf
[auth]
"public_keys_url": "http://seacat-auth.service_id.asab/.well-known/jwks.json"

Reference¤

asab.api.discovery ¤

DiscoveryResolver ¤

Bases: DefaultResolver

Custom resolver for Discovery Session based on default aiohttp resolver.

Source code in asab/api/discovery.py
class DiscoveryResolver(aiohttp.DefaultResolver):
	"""
	Custom resolver for Discovery Session based on default `aiohttp` resolver.
	"""

	def __init__(self, svc) -> None:
		super().__init__()
		self.DiscoveryService = svc


	async def resolve(self, hostname: str, port: int = 0, family: int = socket.AF_INET) -> typing.List[typing.Dict[str, typing.Any]]:
		"""
		Resolve a hostname only with '.asab' domain. and return a list of dictionaries
		containing information about the resolved hosts further used by `aiohttp.TCPConnector`.

		The hostname to resolve must be in the format of "<value>.<key>.asab",
		where key is "service_id" or "instance_id" and value is the particular identificator of the service to be resolved.
		The "asab" domain is required for resolution, otherwise it is treated like normal URL.
		"""
		url_split = hostname.rsplit(".", 2)

		# If there is no asab domain, it is normal URL and resolve it normally
		if url_split[-1] != "asab":
			return await super().resolve(hostname, port, family)

		# Make sure the format of the hostname is right
		if len(url_split) != 3:
			raise NotDiscoveredError("Invalid format of the hostname '{}'. Use e.g. `asab-config.service_id.asab` instead.".format(hostname))

		hosts = []
		located_instances = await self.DiscoveryService._locate({url_split[1]: url_split[0]})
		if located_instances is None or len(located_instances) == 0:
			raise NotDiscoveredError("Failed to discover '{}'.".format(hostname))

		for phostname, pport, pfamily in located_instances:
			try:
				resolved = await super().resolve(phostname, pport, pfamily)
			except socket.gaierror:
				# Skip unresolved hosts
				continue
			hosts.extend(resolved)

		if len(hosts) == 0:
			raise NotDiscoveredError("Failed to resolve any of the hosts for '{}'.".format(hostname))

		return hosts
resolve(hostname, port=0, family=socket.AF_INET) async ¤

Resolve a hostname only with '.asab' domain. and return a list of dictionaries containing information about the resolved hosts further used by aiohttp.TCPConnector.

The hostname to resolve must be in the format of "..asab", where key is "service_id" or "instance_id" and value is the particular identificator of the service to be resolved. The "asab" domain is required for resolution, otherwise it is treated like normal URL.

Source code in asab/api/discovery.py
async def resolve(self, hostname: str, port: int = 0, family: int = socket.AF_INET) -> typing.List[typing.Dict[str, typing.Any]]:
	"""
	Resolve a hostname only with '.asab' domain. and return a list of dictionaries
	containing information about the resolved hosts further used by `aiohttp.TCPConnector`.

	The hostname to resolve must be in the format of "<value>.<key>.asab",
	where key is "service_id" or "instance_id" and value is the particular identificator of the service to be resolved.
	The "asab" domain is required for resolution, otherwise it is treated like normal URL.
	"""
	url_split = hostname.rsplit(".", 2)

	# If there is no asab domain, it is normal URL and resolve it normally
	if url_split[-1] != "asab":
		return await super().resolve(hostname, port, family)

	# Make sure the format of the hostname is right
	if len(url_split) != 3:
		raise NotDiscoveredError("Invalid format of the hostname '{}'. Use e.g. `asab-config.service_id.asab` instead.".format(hostname))

	hosts = []
	located_instances = await self.DiscoveryService._locate({url_split[1]: url_split[0]})
	if located_instances is None or len(located_instances) == 0:
		raise NotDiscoveredError("Failed to discover '{}'.".format(hostname))

	for phostname, pport, pfamily in located_instances:
		try:
			resolved = await super().resolve(phostname, pport, pfamily)
		except socket.gaierror:
			# Skip unresolved hosts
			continue
		hosts.extend(resolved)

	if len(hosts) == 0:
		raise NotDiscoveredError("Failed to resolve any of the hosts for '{}'.".format(hostname))

	return hosts

DiscoveryService ¤

Bases: Service

Service for discovering ASAB microservices in a server cluster. It is based on searching in ZooKeeper /run path.

Source code in asab/api/discovery.py
class DiscoveryService(Service):
	"""
	Service for discovering ASAB microservices in a server cluster. It is based on searching in ZooKeeper `/run` path.
	"""
	def __init__(self, app, zkc, service_name="asab.DiscoveryService") -> None:
		super().__init__(app, service_name)
		self.ZooKeeperContainer = zkc
		self.ProactorService = zkc.ProactorService
		self.InternalAuth = None
		if jwcrypto is not None:
			from .internal_auth import InternalAuth
			self.InternalAuth = InternalAuth(app, zkc)

		self._advertised_cache = dict()
		self._cache_lock = asyncio.Lock()
		self._ready_event = asyncio.Event()

		self.App.PubSub.subscribe("Application.tick/300!", self._on_tick)
		self.App.PubSub.subscribe("ZooKeeperContainer.state/CONNECTED!", self._on_zk_ready)


	async def initialize(self, app):
		if self.InternalAuth is not None:
			await self.InternalAuth.initialize(app)


	def _on_tick(self, msg):
		self.App.TaskService.schedule(self._rescan_advertised_instances())


	def _on_zk_ready(self, msg, zkc):
		if zkc == self.ZooKeeperContainer:
			self.App.TaskService.schedule(self._rescan_advertised_instances())


	async def locate(self, instance_id: str = None, **kwargs) -> set:
		"""
		Return a list of URLs for a given instance or service ID.

		Args:
			instance_id (str): The ID of a specific instance of a service that the client wants to locate.
			service_id (str): The `service_id` parameter represents identifier of a service to locate.
				It is used to query a service registry to find the instances of the service that are currently available.

		Returns: A list of URLs in the format "http://servername:port" for the specified instance or service.
		"""

		if instance_id is not None:
			locate_params = {"instance_id": instance_id}

		elif len(kwargs) > 0:
			locate_params = kwargs

		else:
			L.warning("Please provide instance_id, service_id, or other custom id to locate the service(s).")
			return None

		# Each taget can have two records - one for ipv4 and second for ipv6. This information is redundant in the URL format.
		return set([
			"http://{}:{}".format(servername, port)
			for servername, port, family
			in await self._locate(locate_params)
		])

	async def _locate(self, locate_params) -> typing.Set[typing.Tuple]:
		"""
		Locate service instances based on their instance ID or service ID.

		Args:
			instance_id (str): The unique identifier for a specific instance of a service
			service_id (str): The ID of the service to locate

		Returns: a list of tuples containing the server name and port number of the located service(s).
		"""
		res = set()

		await asyncio.wait_for(self._ready_event.wait(), 600)

		if len(self._advertised_cache) == 0:
			L.warning("No instances to discover. Make sure [zookeeper] configuration is identical for all the ASAB services in the cluster.")
			return res

		for id_type, ids in self._advertised_cache.items():
			if id_type in locate_params:
				if locate_params[id_type] in ids:
					res = res | ids[locate_params[id_type]]

		return res


	async def discover(self) -> typing.Dict[str, typing.Dict[str, typing.Set[typing.Tuple]]]:
		# We need to make a copy of the cache so that the caller can't modify our cache.
		await asyncio.wait_for(self._ready_event.wait(), 600)
		return copy.deepcopy(self._advertised_cache)


	async def get_advertised_instances(self) -> typing.List[typing.Dict]:
		"""
		This method is here for backward compatibility. Use `discover()` method instead.
		Returns a list of dictionaries. Each dictionary represents an advertised instance
		obtained by iterating over the items in the `/run` path in ZooKeeper.
		"""
		# TODO: an obsolete log for this method
		advertised = []
		async for item, item_data in self._iter_zk_items():
			item_data['ephemeral_id'] = item
			advertised.append(item_data)

		return advertised


	async def _rescan_advertised_instances(self):
		"""
		Returns structured dataset of identifier types, identifiers of instances and hosts and ports where they can be found.
		It is a dict of identifier types as keys and dict as values. The second-layer dict has identifiers as keys an a set of tuples as a value. Each tuple contains host and port.

		Example of the data structure:
			{
				"instance_id": {
					"lmio-receiver-1": {("node1", 1234, socket.AF_INET)},
					"asab-remote-control-1": {("node2", 1234, socket.AF_INET6)}
					...
				},
				"service_id": {
					"lmio-receiver": {("node1", 1234, socket.AF_INET), ("node2", 1234, socket.AF_INET), ("node3", 1234, socket.AF_INET)},
					...
				},
				"custom1_id": {
					"myid123": {("node1", 5678, socket.AF_INET), ("node2", 5678, socket.AF_INET6)},
					...
				},
				"custom2_id": {
					"myid123": {("node1", 5678, socket.AF_INET), ("node2", 5678, socket.AF_INET)},
					...
				},
				...
			}
		"""
		if self._cache_lock.locked():
			# Only one rescan / cache update at a time
			return

		async with self._cache_lock:

			advertised = {
				"instance_id": {},
				"service_id": {},
			}

			iterator = self._iter_zk_items()
			if iterator is None:
				return  # reason logged from the _iter_zk_items method

			async for item, item_data in iterator:
				instance_id = item_data.get("instance_id")
				service_id = item_data.get("service_id")
				discovery: typing.Dict[str, list] = item_data.get("discovery", {})

				if instance_id is not None:
					discovery["instance_id"] = [instance_id]

				if instance_id is not None:
					discovery["service_id"] = [service_id]

				host = item_data.get("host")
				if host is None:
					continue

				web = item_data.get("web")
				if web is None:
					continue

				for i in web:

					try:
						ip = i[0]
						port = i[1]
					except KeyError:
						L.error("Unexpected format of 'web' section in advertised data: '{}'".format(web))
						return

					if ip == "0.0.0.0":
						family = socket.AF_INET
					elif ip == "::":
						family = socket.AF_INET6
					else:
						continue

					if discovery is not None:
						for id_type, ids in discovery.items():
							if advertised.get(id_type) is None:
								advertised[id_type] = {}

							for identifier in ids:
								if identifier is not None:
									if advertised[id_type].get(identifier) is None:
										advertised[id_type][identifier] = {(host, port, family)}
									else:
										advertised[id_type][identifier].add((host, port, family))

			self._advertised_cache = advertised
			self._ready_event.set()


	async def _iter_zk_items(self):
		base_path = self.ZooKeeperContainer.Path + "/run"

		def get_items():
			try:
				# Create the base path if it does not exist
				if not self.ZooKeeperContainer.ZooKeeper.Client.exists(base_path):
					self.ZooKeeperContainer.ZooKeeper.Client.create(base_path, b'', makepath=True)

				return self.ZooKeeperContainer.ZooKeeper.Client.get_children(base_path, watch=self._on_change_threadsafe)
			except (kazoo.exceptions.SessionExpiredError, kazoo.exceptions.ConnectionLoss):
				L.warning("Connection to ZooKeeper lost. Discovery Service could not fetch up-to-date state of the cluster services.")
				return None

			except kazoo.exceptions.KazooException:
				return None

		def get_data(item):
			try:
				data, stat = self.ZooKeeperContainer.ZooKeeper.Client.get((base_path + '/' + item), watch=self._on_change_threadsafe)
				return data
			except (kazoo.exceptions.SessionExpiredError, kazoo.exceptions.ConnectionLoss):
				L.warning("Connection to ZooKeeper lost. Discovery Service could not fetch up-to-date state of the cluster services.")
				return None

			except kazoo.exceptions.KazooException:
				# There's a race condition -> if ZK node is deleted between get_items and get_data calls, NoNodeError is raised. Let's just ignore it. The ZK node is already deleted.
				return None

		items = await self.ProactorService.execute(get_items)
		if items is None:
			return

		for item in items:
			item_data = await self.ProactorService.execute(get_data, item)
			if item_data is None:
				continue
			yield item, json.loads(item_data)

	def _on_change_threadsafe(self, watched_event):
		# Runs on a thread, returns the process back to the main thread
		self.App.TaskService.schedule_threadsafe(self._rescan_advertised_instances())


	def session(
		self,
		base_url: typing.Optional[str] = None,
		auth: typing.Union[str, aiohttp.ClientRequest, None] = None,
		headers: typing.Optional[typing.Mapping[str, str]] = None,
		**kwargs
	) -> aiohttp.ClientSession:
		"""
		Open HTTP session with custom hostname resolver for ASAB microservices.

		Args:
			:param base_url: Base URL to use for requests.
			:param auth: Client request to extract authorization from, or the string "internal", to use internal authorization.
			:param headers: Custom session HTTP headers.

		Usage:

		```python
		# Without authorization
		async with self.DiscoveryService.session() as session:
			# use URL in format: <protocol>://<value>.<key>.asab/<endpoint>
			# where key is "service_id" or "instance_id"
			# and value the respective service identificator
			async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
				...

		# Using end-user authorization (from the UI)
		async with self.DiscoveryService.session(auth=request) as session:
			async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
				...

		# Using internal m2m authorization
		async with self.DiscoveryService.session(auth="internal") as session:
			async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
				...
		"""
		_headers = {}

		if auth is None:
			# By default, use the authorization from the incoming request
			request = Request.get(None)
			if request is not None and "Authorization" in request.headers:
				_headers["Authorization"] = request.headers["Authorization"]

		elif isinstance(auth, aiohttp.web.Request):
			assert "Authorization" in auth.headers
			_headers["Authorization"] = auth.headers["Authorization"]

		elif auth == "internal":
			if self.InternalAuth is None:
				raise ModuleNotFoundError(
					"Internal auth is disabled because 'jwcrypto' module is not installed. "
					"Please run 'pip install jwcrypto' or install asab with 'authz' optional dependency."
				)
			_headers["Authorization"] = self.InternalAuth.get_authorization_header()

		else:
			raise ValueError(
				"Invalid 'auth' value. "
				"Only instances of aiohttp.web.Request or the literal string 'internal' are allowed. "
				"Found {}.".format(type(auth))
			)

		if headers is not None:
			_headers.update(headers)

		return aiohttp.ClientSession(
			base_url,
			connector=aiohttp.TCPConnector(resolver=DiscoveryResolver(self)),
			headers=_headers,
			**kwargs
		)
get_advertised_instances() async ¤

This method is here for backward compatibility. Use discover() method instead. Returns a list of dictionaries. Each dictionary represents an advertised instance obtained by iterating over the items in the /run path in ZooKeeper.

Source code in asab/api/discovery.py
async def get_advertised_instances(self) -> typing.List[typing.Dict]:
	"""
	This method is here for backward compatibility. Use `discover()` method instead.
	Returns a list of dictionaries. Each dictionary represents an advertised instance
	obtained by iterating over the items in the `/run` path in ZooKeeper.
	"""
	# TODO: an obsolete log for this method
	advertised = []
	async for item, item_data in self._iter_zk_items():
		item_data['ephemeral_id'] = item
		advertised.append(item_data)

	return advertised
locate(instance_id=None, **kwargs) async ¤

Return a list of URLs for a given instance or service ID.

Parameters:

Name Type Description Default
instance_id str

The ID of a specific instance of a service that the client wants to locate.

None
service_id str

The service_id parameter represents identifier of a service to locate. It is used to query a service registry to find the instances of the service that are currently available.

required

Returns: A list of URLs in the format "http://servername:port" for the specified instance or service.

Source code in asab/api/discovery.py
async def locate(self, instance_id: str = None, **kwargs) -> set:
	"""
	Return a list of URLs for a given instance or service ID.

	Args:
		instance_id (str): The ID of a specific instance of a service that the client wants to locate.
		service_id (str): The `service_id` parameter represents identifier of a service to locate.
			It is used to query a service registry to find the instances of the service that are currently available.

	Returns: A list of URLs in the format "http://servername:port" for the specified instance or service.
	"""

	if instance_id is not None:
		locate_params = {"instance_id": instance_id}

	elif len(kwargs) > 0:
		locate_params = kwargs

	else:
		L.warning("Please provide instance_id, service_id, or other custom id to locate the service(s).")
		return None

	# Each taget can have two records - one for ipv4 and second for ipv6. This information is redundant in the URL format.
	return set([
		"http://{}:{}".format(servername, port)
		for servername, port, family
		in await self._locate(locate_params)
	])
session(base_url=None, auth=None, headers=None, **kwargs) ¤

Open HTTP session with custom hostname resolver for ASAB microservices.

Parameters:

Name Type Description Default

param base_url: Base URL to use for requests.

required

param auth: Client request to extract authorization from, or the string "internal", to use internal authorization.

required

param headers: Custom session HTTP headers.

required

Usage:

```python

Without authorization¤

async with self.DiscoveryService.session() as session: # use URL in format: ://..asab/ # where key is "service_id" or "instance_id" # and value the respective service identificator async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp: ...

Using end-user authorization (from the UI)¤

async with self.DiscoveryService.session(auth=request) as session: async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp: ...

Using internal m2m authorization¤

async with self.DiscoveryService.session(auth="internal") as session: async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp: ...

Source code in asab/api/discovery.py
def session(
	self,
	base_url: typing.Optional[str] = None,
	auth: typing.Union[str, aiohttp.ClientRequest, None] = None,
	headers: typing.Optional[typing.Mapping[str, str]] = None,
	**kwargs
) -> aiohttp.ClientSession:
	"""
	Open HTTP session with custom hostname resolver for ASAB microservices.

	Args:
		:param base_url: Base URL to use for requests.
		:param auth: Client request to extract authorization from, or the string "internal", to use internal authorization.
		:param headers: Custom session HTTP headers.

	Usage:

	```python
	# Without authorization
	async with self.DiscoveryService.session() as session:
		# use URL in format: <protocol>://<value>.<key>.asab/<endpoint>
		# where key is "service_id" or "instance_id"
		# and value the respective service identificator
		async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
			...

	# Using end-user authorization (from the UI)
	async with self.DiscoveryService.session(auth=request) as session:
		async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
			...

	# Using internal m2m authorization
	async with self.DiscoveryService.session(auth="internal") as session:
		async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
			...
	"""
	_headers = {}

	if auth is None:
		# By default, use the authorization from the incoming request
		request = Request.get(None)
		if request is not None and "Authorization" in request.headers:
			_headers["Authorization"] = request.headers["Authorization"]

	elif isinstance(auth, aiohttp.web.Request):
		assert "Authorization" in auth.headers
		_headers["Authorization"] = auth.headers["Authorization"]

	elif auth == "internal":
		if self.InternalAuth is None:
			raise ModuleNotFoundError(
				"Internal auth is disabled because 'jwcrypto' module is not installed. "
				"Please run 'pip install jwcrypto' or install asab with 'authz' optional dependency."
			)
		_headers["Authorization"] = self.InternalAuth.get_authorization_header()

	else:
		raise ValueError(
			"Invalid 'auth' value. "
			"Only instances of aiohttp.web.Request or the literal string 'internal' are allowed. "
			"Found {}.".format(type(auth))
		)

	if headers is not None:
		_headers.update(headers)

	return aiohttp.ClientSession(
		base_url,
		connector=aiohttp.TCPConnector(resolver=DiscoveryResolver(self)),
		headers=_headers,
		**kwargs
	)

NotDiscoveredError ¤

Bases: RuntimeError

Raised when given service is not discovered.

Source code in asab/api/discovery.py
class NotDiscoveredError(RuntimeError):
	"""
	Raised when given service is not discovered.
	"""
	pass