Skip to content

Discovery Service¤

Service discovery enables communication among multiple ASAB microservices in a server cluster. Each microservice can search for and find address to API interface (URL) of any other service within the cluster.

In service discovery, there are two main roles: the "server" and the "client."

The server advertises its "position" in the cluster and provides an API for communication. The client uses this API to interact with the server.

In the context of service discovery, all microservices in the cluster act as both servers and clients.

Prerequisites¤

Following requirements must be fulfilled:

  • Zookeeper connection and configuration must be the same for all services in the cluster.
  • Zookeeper container must be initialized in the service.
  • asab.WebService and asab.WebContainer must be initialized.
  • asab.APIService must be initialized.
  • Environment variables NODE_ID, SERVICE_ID and INSTANCE_ID must be set.
  • INSTANCE_ID (or hostname if INSTANCE_ID is missing) must be resolvable.

Server - Advertising into ZooKeeper¤

Even though the service can provide multiple communication interfaces, major use case is the web API.

A "server" application has to provide the API and advertise its address into a consensus technlogy - ZooKeeper.

Application requirements¤

Services inside the application must be initialized in the right order.

class MyApp(asab.Application):
    def __init__(self):
        super.__init__(self)
        # Initialize web server
        self.add_module(asab.web.Module)
        websvc = self.get_service("asab.WebService")
        self.WebContainer = asab.web.WebContainer(websvc, "web")

        # Initialize zookeeper
        self.add_module(asab.zookeeper.Module)
        self.ZooKeeperService = self.get_service("asab.ZooKeeperService")
        self.ZooKeeperContainer = asab.zookeeper.ZooKeeperContainer(
            self.ZooKeeperService, "zookeeper"
        )

        # Initialize ApiService
        self.ASABApiService = asab.api.ApiService(self)
        self.ASABApiService.initialize_web(self.WebContainer)
        self.ASABApiService.initialize_zookeeper(self.ZooKeeperContainer)

Configuration¤

ZooKeeper configuration must be the same for all services in the cluster. Port can be set in the configuration.

myapp.conf
[zookeeper]
servers=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181

[web]
listen=0.0.0.0 8090

Environment variables¤

The discovery is based on provided "ids". Use either a set of specific ids provided as environemntal variable, or create custom discovery ids within the application.

Pre-cast ids used for service discovery. Provide them as environment variables:

Variable Description Example
SERVICE_ID Identifier of the ASAB microservice. my-app
INSTANCE_ID Identifier of a specific instance of the ASAB microservice. my-app-1, my-app-2, ...
NODE_ID Identifier of a cluster node. node-1, node-2, ...

Instance ID must be resolvable

ASAB framework cannot set up networking. In order to enable service discovery, INSTANCE_ID of a service must be resolavable from the client. If INSTANCE_ID is missing, hostname is taken instead and then, hostname must be resolvable.

Information advertised¤

When all requirements of a server-side microservice are fullfilled, the service advertises into the consensus a unique information bound to its runtime.

The file of JSON format contains information describing the running application.

asab/run/ASABMyApplication.0000090098
{
    "host": "server-name-1",
    "appclass": "MyApp",
    "node_id": "node-1",
    "service_id": "my-app",
    "instance_id": "my-app-1",
    "launch_time": "2023-12-01T10:52:29.656397Z",
    "process_id": 1,
    "containerized": true,
    "created_at": "2023-11-23T11:08:22.454248Z",
    "version": "v23.47-alpha",
    "CI_COMMIT_TAG": "v23.47-alpha",
    "CI_COMMIT_REF_NAME": "v23.47-alpha",
    "CI_COMMIT_SHA": "...",
    "CI_COMMIT_TIMESTAMP": "2023-11-23T11:00:48+00:00",
    "CI_JOB_ID": "...",
    "CI_PIPELINE_CREATED_AT": "2023-11-23T11:06:40Z",
    "CI_RUNNER_ID": "..",
    "CI_RUNNER_EXECUTABLE_ARCH": "linux/amd64",
    "web": [
        [
            "0.0.0.0",
            8090
        ]
    ]
}

Client - Using Service Discovery¤

All services are being "server" and "client" at the same time. This paragraph describes how to discover (as a client) a service in the cluster.

Call API using DiscoveryService.session()¤

Once the service propagates itself into ZooKeeper, other services in the cluster can use its API.

DiscoveryService provides a method session() which inherits from aiohttp.ClientSession. It can be used the same way as aiohttp.ClientSession. Instead of explicit URL, use URL with asab domain.

The URL is constructed in the format http://<value>.<key>.asab/... where key is the name of the identifier (e.g. instance_id, service_id) and value is its value (e.g. my_app_1)

Example

class MyApp(asab.Application):
    def __init__(self):
        super.__init__(self)
        # Initialize web server
        self.add_module(asab.web.Module)
        websvc = self.get_service("asab.WebService")
        self.WebContainer = asab.web.WebContainer(websvc, "web")

        # Initialize zookeeper
        self.add_module(asab.zookeeper.Module)
        self.ZooKeeperService = self.get_service("asab.ZooKeeperService")
        self.ZooKeeperContainer = asab.zookeeper.ZooKeeperContainer(
            self.ZooKeeperService, "zookeeper"
        )

        # The DiscoverySession is functional only with ApiService initialized.
        self.ASABApiService = asab.api.ApiService(self)
        self.ASABApiService.initialize_web(self.WebContainer)
        self.ASABApiService.initialize_zookeeper(self.ZooKeeperContainer)

        self.DiscoveryService = self.get_service("asab.DiscoveryService")

    async def main(self):
        async with self.DiscoveryService.session() as session: 
            try:
                # use URL in format: <protocol>://<value>.<key>.asab/<endpoint> where key is "service_id" or "instance_id" and value the respective service identificator
                async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
                    if resp.status == 200:
                        config = await resp.json()
            except asab.api.discovery.NotDiscoveredError as e:
                L.error(e)

Warning

asab.DiscoveryService is functional only with ApiService initialized. That means, WebContainer and ZooKeeperContainer must be also present in the application.

Warning

Discovery Service searches for microservices in the same ZooKeeper path as defined in the configuration. Therefore, all services in the cluster should be configured with the same ZooKeeper path.

locate()¤

Returns set of URLs based on an id of a service. Provide the filter as a dictionary. The keys of the dictionary can be node_id, service_id, instance_id or custom ids.

Example

To look for all URLs of a service called very-nice-service, use inside the application:

await self.DiscoveryService.locate({"service_id": "very-nice-service"})
It returns a set of values, URLs. E.g.
{"http://very-nice-service-1:8888", "http://very-nice-service-2:8889"}

discover()¤

Returns a dictionary with all known services, organized by their ids and information on how to resolve them.

Example

await self.DiscoveryService.discover()

Custom discovery ids¤

Custom identifiers can be set during runtime.

Example

Inside the application, when Api Service is already initialized:

self.ASABApiService.update_discovery({"custom_id": ["id1", "id2", "id3"]})

The argument of the method must be a dictionary, where key is string and value is a list. The custom_id can be used in both discovery session and locate() method.

Using service discovery during authorization¤

When using authorization server e.g. SeaCat Auth to provide authorization for each API call, it can be also found in the cluster through service discovery. In order to connect asab.AuthService with the service discovery, several requiremetns must be met:

  • API Service must be fully initiliazed BEFORE Auth Service.
  • Authorization server (SeaCat Auth) must be present in the cluster, resolvable by its instance id, and advertising itself into the ZooKeeper (being server itself).
  • [auth] configuration section must contain URL recognizable by the service discovery.

Auth Service using service discovery

API Service must be fully initiliazed BEFORE Auth Service.

class MyApp(asab.Application):
    def __init__(self):
        super.__init__(self)
        # Initialize web server
        self.add_module(asab.web.Module)
        websvc = self.get_service("asab.WebService")
        self.WebContainer = asab.web.WebContainer(websvc, "web")

        # Initialize zookeeper
        self.add_module(asab.zookeeper.Module)
        self.ZooKeeperService = self.get_service("asab.ZooKeeperService")
        self.ZooKeeperContainer = asab.zookeeper.ZooKeeperContainer(
            self.ZooKeeperService, "zookeeper"
        )

        # The DiscoverySession is functional only with ApiService initialized.
        self.ASABApiService = asab.api.ApiService(self)
        self.ASABApiService.initialize_web(self.WebContainer)
        self.ASABApiService.initialize_zookeeper(self.ZooKeeperContainer)

        self.DiscoveryService = self.get_service("asab.DiscoveryService")

        # Initialize authorization after ASABApiService.initialize_zookeeper() to get DiscoveryService into auth module
        self.AuthService = asab.web.auth.AuthService(self)
        self.AuthService.install(self.WebContainer)

[auth] configuration section must contain URL recognizable by the service discovery.

my_app.conf
[auth]
"public_keys_url": "http://seacat-auth.service_id.asab/.well-known/jwks.json"

Reference¤

asab.api.discovery ¤

DiscoveryResolver ¤

Bases: DefaultResolver

Custom resolver for Discovery Session based on default aiohttp resolver.

Source code in asab/api/discovery.py
class DiscoveryResolver(aiohttp.DefaultResolver):
	"""
	Custom resolver for Discovery Session based on default `aiohttp` resolver.
	"""

	def __init__(self, svc) -> None:
		super().__init__()
		self.DiscoveryService = svc


	async def resolve(self, hostname: str, port: int = 0, family: int = socket.AF_INET) -> typing.List[typing.Dict[str, typing.Any]]:
		"""
		Resolve a hostname only with '.asab' domain. and return a list of dictionaries
		containing information about the resolved hosts further used by `aiohttp.TCPConnector`.

		The hostname to resolve must be in the format of "<value>.<key>.asab",
		where key is "service_id" or "instance_id" and value is the particular identificator of the service to be resolved.
		The "asab" domain is required for resolution, otherwise it is treated like normal URL.
		"""
		url_split = hostname.rsplit(".", 2)

		# If there is no asab domain, it is normal URL and resolve it normally
		if url_split[-1] != "asab":
			return await super().resolve(hostname, port, family)

		# Make sure the format of the hostname is right
		if len(url_split) != 3:
			raise NotDiscoveredError("Invalid format of the hostname '{}'. Use e.g. `asab-config.service_id.asab` instead.".format(hostname))

		hosts = []
		located_instances = await self.DiscoveryService._locate({url_split[1]: url_split[0]})
		if located_instances is None or len(located_instances) == 0:
			raise NotDiscoveredError("Failed to discover '{}'.".format(hostname))

		for phostname, pport, pfamily in located_instances:
			try:
				resolved = await super().resolve(phostname, pport, pfamily)
			except socket.gaierror:
				# Skip unresolved hosts
				continue
			hosts.extend(resolved)

		if len(hosts) == 0:
			raise NotDiscoveredError("Failed to resolve any of the hosts for '{}'.".format(hostname))

		return hosts
resolve(hostname, port=0, family=socket.AF_INET) async ¤

Resolve a hostname only with '.asab' domain. and return a list of dictionaries containing information about the resolved hosts further used by aiohttp.TCPConnector.

The hostname to resolve must be in the format of "..asab", where key is "service_id" or "instance_id" and value is the particular identificator of the service to be resolved. The "asab" domain is required for resolution, otherwise it is treated like normal URL.

Source code in asab/api/discovery.py
async def resolve(self, hostname: str, port: int = 0, family: int = socket.AF_INET) -> typing.List[typing.Dict[str, typing.Any]]:
	"""
	Resolve a hostname only with '.asab' domain. and return a list of dictionaries
	containing information about the resolved hosts further used by `aiohttp.TCPConnector`.

	The hostname to resolve must be in the format of "<value>.<key>.asab",
	where key is "service_id" or "instance_id" and value is the particular identificator of the service to be resolved.
	The "asab" domain is required for resolution, otherwise it is treated like normal URL.
	"""
	url_split = hostname.rsplit(".", 2)

	# If there is no asab domain, it is normal URL and resolve it normally
	if url_split[-1] != "asab":
		return await super().resolve(hostname, port, family)

	# Make sure the format of the hostname is right
	if len(url_split) != 3:
		raise NotDiscoveredError("Invalid format of the hostname '{}'. Use e.g. `asab-config.service_id.asab` instead.".format(hostname))

	hosts = []
	located_instances = await self.DiscoveryService._locate({url_split[1]: url_split[0]})
	if located_instances is None or len(located_instances) == 0:
		raise NotDiscoveredError("Failed to discover '{}'.".format(hostname))

	for phostname, pport, pfamily in located_instances:
		try:
			resolved = await super().resolve(phostname, pport, pfamily)
		except socket.gaierror:
			# Skip unresolved hosts
			continue
		hosts.extend(resolved)

	if len(hosts) == 0:
		raise NotDiscoveredError("Failed to resolve any of the hosts for '{}'.".format(hostname))

	return hosts

DiscoveryService ¤

Bases: Service

Service for discovering ASAB microservices in a server cluster. It is based on searching in ZooKeeper /run path.

Source code in asab/api/discovery.py
class DiscoveryService(Service):
	"""
	Service for discovering ASAB microservices in a server cluster. It is based on searching in ZooKeeper `/run` path.
	"""
	def __init__(self, app, zkc, service_name="asab.DiscoveryService") -> None:
		super().__init__(app, service_name)
		self.ZooKeeperContainer = zkc
		self.ProactorService = zkc.ProactorService
		self.InternalAuthKeyPath = "/asab/auth/internal_auth_private.key"
		self.InternalAuthKey = None
		self.InternalAuthToken = None
		self.InternalAuthTokenExpiration: datetime.timedelta = datetime.timedelta(seconds=5 * 300)

		self._advertised_cache = dict()
		self._cache_lock = asyncio.Lock()
		self._ready_event = asyncio.Event()

		self.App.PubSub.subscribe("Application.tick/300!", self._on_tick)
		self.App.PubSub.subscribe("ZooKeeperContainer.state/CONNECTED!", self._on_zk_ready)



	def _on_tick(self, msg):
		self.App.TaskService.schedule(self._rescan_advertised_instances())
		if jwcrypto is not None:
			self._ensure_internal_auth_token()


	def _on_zk_ready(self, msg, zkc):
		if zkc == self.ZooKeeperContainer:
			self.App.TaskService.schedule(self._rescan_advertised_instances())
			if jwcrypto is not None:
				self.App.TaskService.schedule(self._ensure_internal_auth_key(zkc))


	async def _ensure_internal_auth_key(self, zkc=None):
		zkc = zkc or self.ZooKeeperContainer
		private_key_json = None
		# Attempt to create and write a new private key
		# while avoiding race condition with other ASAB services
		while not private_key_json:
			# Try to get the key
			try:
				private_key_json, _ = zkc.ZooKeeper.Client.get(self.InternalAuthKeyPath)
				break
			except kazoo.exceptions.NoNodeError:
				pass

			# Generate a new key
			private_key = jwcrypto.jwk.JWK.generate(kty="EC", crv="P-256")
			private_key_json = json.dumps(private_key.export(as_dict=True)).encode("utf-8")
			try:
				zkc.ZooKeeper.Client.create(self.InternalAuthKeyPath, private_key_json, makepath=True)
				L.info("Internal auth key created.", struct_data={
					"kid": private_key.key_id, "path": self.InternalAuthKeyPath})
			except kazoo.exceptions.NodeExistsError:
				# Another ASAB service has probably created the key in the meantime
				pass

		private_key = jwcrypto.jwk.JWK.from_json(private_key_json)
		if private_key != self.InternalAuthKey:
			# Private key has changed
			self.InternalAuthKey = private_key
			self._ensure_internal_auth_token(force_new=True)


	def _ensure_internal_auth_token(self, force_new: bool = False):
		assert self.InternalAuthKey

		def _get_own_discovery_url():
			instance_id = os.getenv("INSTANCE_ID", None)
			if instance_id:
				return "http://{}.instance_id.asab".format(instance_id)

			service_id = os.getenv("SERVICE_ID", None)
			if service_id:
				return "http://{}.service_id.asab".format(service_id)

			return "http://{}".format(self.App.HostName)

		if self.InternalAuthToken and not force_new:
			claims = json.loads(self.InternalAuthToken.claims)
			if claims.get("exp") > (
				datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(seconds=300)
			).timestamp():
				# Token is valid and does not expire soon
				return

		# Use this service's discovery URL as issuer ID and authorized party ID
		my_discovery_url = _get_own_discovery_url()
		expiration = datetime.datetime.now(datetime.timezone.utc) + self.InternalAuthTokenExpiration
		claims = {
			# Issuer (URL of the app that created the token)
			"iss": my_discovery_url,
			# Issued at
			"iat": int(datetime.datetime.now(datetime.timezone.utc).timestamp()),
			# Expires at
			"exp": int((expiration).timestamp()),
			# Authorized party
			"azp": my_discovery_url,
			# Audience (who is allowed to use this token)
			"aud": "http://{}".format(self.App.HostName),  # TODO: Something that signifies "anyone in this internal space"
			# Tenants and resources
			"resources": {
				"*": ["authz:superuser"],
			}
		}

		self.InternalAuthToken = jwcrypto.jwt.JWT(
			header={
				"alg": "ES256",
				"typ": "JWT",
				"kid": self.InternalAuthKey.key_id,
			},
			claims=json.dumps(claims)
		)
		self.InternalAuthToken.make_signed_token(self.InternalAuthKey)

		L.info("New internal auth token issued.", struct_data={"exp": expiration})


	async def locate(self, instance_id: str = None, **kwargs) -> set:
		"""
		Return a list of URLs for a given instance or service ID.

		Args:
			instance_id (str): The ID of a specific instance of a service that the client wants to locate.
			service_id (str): The `service_id` parameter represents identifier of a service to locate.
				It is used to query a service registry to find the instances of the service that are currently available.

		Returns: A list of URLs in the format "http://servername:port" for the specified instance or service.
		"""

		if instance_id is not None:
			locate_params = {"instance_id": instance_id}

		elif len(kwargs) > 0:
			locate_params = kwargs

		else:
			L.warning("Please provide instance_id, service_id, or other custom id to locate the service(s).")
			return None

		# Each taget can have two records - one for ipv4 and second for ipv6. This information is redundant in the URL format.
		return set([
			"http://{}:{}".format(servername, port)
			for servername, port, family
			in await self._locate(locate_params)
		])

	async def _locate(self, locate_params) -> typing.Set[typing.Tuple]:
		"""
		Locate service instances based on their instance ID or service ID.

		Args:
			instance_id (str): The unique identifier for a specific instance of a service
			service_id (str): The ID of the service to locate

		Returns: a list of tuples containing the server name and port number of the located service(s).
		"""
		res = set()

		await asyncio.wait_for(self._ready_event.wait(), 600)

		if len(self._advertised_cache) == 0:
			L.warning("No instances to discover. Make sure [zookeeper] configuration is identical for all the ASAB services in the cluster.")
			return res

		for id_type, ids in self._advertised_cache.items():
			if id_type in locate_params:
				if locate_params[id_type] in ids:
					res = res | ids[locate_params[id_type]]

		return res


	async def discover(self) -> typing.Dict[str, typing.Dict[str, typing.Set[typing.Tuple]]]:
		# We need to make a copy of the cache so that the caller can't modify our cache.
		await asyncio.wait_for(self._ready_event.wait(), 600)
		return copy.deepcopy(self._advertised_cache)


	async def get_advertised_instances(self) -> typing.List[typing.Dict]:
		"""
		This method is here for backward compatibility. Use `discover()` method instead.
		Returns a list of dictionaries. Each dictionary represents an advertised instance
		obtained by iterating over the items in the `/run` path in ZooKeeper.
		"""
		# TODO: an obsolete log for this method
		advertised = []
		async for item, item_data in self._iter_zk_items():
			item_data['ephemeral_id'] = item
			advertised.append(item_data)

		return advertised


	async def _rescan_advertised_instances(self):
		"""
		Returns structured dataset of identifier types, identifiers of instances and hosts and ports where they can be found.
		It is a dict of identifier types as keys and dict as values. The second-layer dict has identifiers as keys an a set of tuples as a value. Each tuple contains host and port.

		Example of the data structure:
			{
				"instance_id": {
					"lmio-receiver-1": {("node1", 1234, socket.AF_INET)},
					"asab-remote-control-1": {("node2", 1234, socket.AF_INET6)}
					...
				},
				"service_id": {
					"lmio-receiver": {("node1", 1234, socket.AF_INET), ("node2", 1234, socket.AF_INET), ("node3", 1234, socket.AF_INET)},
					...
				},
				"custom1_id": {
					"myid123": {("node1", 5678, socket.AF_INET), ("node2", 5678, socket.AF_INET6)},
					...
				},
				"custom2_id": {
					"myid123": {("node1", 5678, socket.AF_INET), ("node2", 5678, socket.AF_INET)},
					...
				},
				...
			}
		"""
		if self._cache_lock.locked():
			# Only one rescan / cache update at a time
			return

		async with self._cache_lock:

			advertised = {
				"instance_id": {},
				"service_id": {},
			}

			iterator = self._iter_zk_items()
			if iterator is None:
				return  # reason logged from the _iter_zk_items method

			async for item, item_data in iterator:
				instance_id = item_data.get("instance_id")
				service_id = item_data.get("service_id")
				discovery: typing.Dict[str, list] = item_data.get("discovery", {})

				if instance_id is not None:
					discovery["instance_id"] = [instance_id]

				if instance_id is not None:
					discovery["service_id"] = [service_id]

				host = item_data.get("host")
				if host is None:
					continue

				web = item_data.get("web")
				if web is None:
					continue

				for i in web:

					try:
						ip = i[0]
						port = i[1]
					except KeyError:
						L.error("Unexpected format of 'web' section in advertised data: '{}'".format(web))
						return

					if ip == "0.0.0.0":
						family = socket.AF_INET
					elif ip == "::":
						family = socket.AF_INET6
					else:
						continue

					if discovery is not None:
						for id_type, ids in discovery.items():
							if advertised.get(id_type) is None:
								advertised[id_type] = {}

							for identifier in ids:
								if identifier is not None:
									if advertised[id_type].get(identifier) is None:
										advertised[id_type][identifier] = {(host, port, family)}
									else:
										advertised[id_type][identifier].add((host, port, family))

			self._advertised_cache = advertised
			self._ready_event.set()


	async def _iter_zk_items(self):
		base_path = self.ZooKeeperContainer.Path + "/run"

		def get_items():
			try:
				# Create the base path if it does not exist
				if not self.ZooKeeperContainer.ZooKeeper.Client.exists(base_path):
					self.ZooKeeperContainer.ZooKeeper.Client.create(base_path, b'', makepath=True)

				return self.ZooKeeperContainer.ZooKeeper.Client.get_children(base_path, watch=self._on_change_threadsafe)
			except (kazoo.exceptions.SessionExpiredError, kazoo.exceptions.ConnectionLoss):
				L.warning("Connection to ZooKeeper lost. Discovery Service could not fetch up-to-date state of the cluster services.")
				return None

			except kazoo.exceptions.KazooException:
				return None

		def get_data(item):
			try:
				data, stat = self.ZooKeeperContainer.ZooKeeper.Client.get((base_path + '/' + item), watch=self._on_change_threadsafe)
				return data
			except (kazoo.exceptions.SessionExpiredError, kazoo.exceptions.ConnectionLoss):
				L.warning("Connection to ZooKeeper lost. Discovery Service could not fetch up-to-date state of the cluster services.")
				return None

			except kazoo.exceptions.KazooException:
				# There's a race condition -> if ZK node is deleted between get_items and get_data calls, NoNodeError is raised. Let's just ignore it. The ZK node is already deleted.
				return None

		items = await self.ProactorService.execute(get_items)
		if items is None:
			return

		for item in items:
			item_data = await self.ProactorService.execute(get_data, item)
			if item_data is None:
				continue
			yield item, json.loads(item_data)

	def _on_change_threadsafe(self, watched_event):
		# Runs on a thread, returns the process back to the main thread
		def _update_cache():
			self.App.TaskService.schedule(self._rescan_advertised_instances())
		self.App.Loop.call_soon_threadsafe(_update_cache)


	def session(self, base_url=None, auth=None, headers=None, **kwargs) -> aiohttp.ClientSession:
		"""
		Usage:

		```python
		async with self.DiscoveryService.session() as session:
			# use URL in format: <protocol>://<value>.<key>.asab/<endpoint>
			# where key is "service_id" or "instance_id"
			# and value the respective service identificator
			async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
				...

		# Using end-user authorization (from the UI)
		async with self.DiscoveryService.session(auth=request) as session:
			async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
				...

		# Using internal m2m authorization
		async with self.DiscoveryService.session(auth="internal") as session:
			async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
				...
		"""
		if isinstance(auth, aiohttp.ClientRequest):
			if headers is None:
				headers = {}
			assert "Authorization" in auth.headers
			headers["Authorization"] = auth.headers.get("Authorization")
		elif auth == "internal":
			if jwcrypto is None:
				raise ModuleNotFoundError(
					"You are trying to use internal auth without 'jwcrypto' installed. "
					"Please run 'pip install jwcrypto' or install asab with 'authz' optional dependency."
				)
			if headers is None:
				headers = {}
			headers["Authorization"] = "Bearer {}".format(self.InternalAuthToken.serialize())
		elif auth is None:
			pass
		else:
			raise ValueError(
				"Invalid 'auth' value. "
				"Only instances of aiohttp.ClientRequest or the literal string 'internal' are allowed. "
				"Found {}.".format(type(auth))
			)
		return aiohttp.ClientSession(
			base_url,
			connector=aiohttp.TCPConnector(resolver=DiscoveryResolver(self)),
			headers=headers,
			**kwargs
		)
get_advertised_instances() async ¤

This method is here for backward compatibility. Use discover() method instead. Returns a list of dictionaries. Each dictionary represents an advertised instance obtained by iterating over the items in the /run path in ZooKeeper.

Source code in asab/api/discovery.py
async def get_advertised_instances(self) -> typing.List[typing.Dict]:
	"""
	This method is here for backward compatibility. Use `discover()` method instead.
	Returns a list of dictionaries. Each dictionary represents an advertised instance
	obtained by iterating over the items in the `/run` path in ZooKeeper.
	"""
	# TODO: an obsolete log for this method
	advertised = []
	async for item, item_data in self._iter_zk_items():
		item_data['ephemeral_id'] = item
		advertised.append(item_data)

	return advertised
locate(instance_id=None, **kwargs) async ¤

Return a list of URLs for a given instance or service ID.

Parameters:

Name Type Description Default
instance_id str

The ID of a specific instance of a service that the client wants to locate.

None
service_id str

The service_id parameter represents identifier of a service to locate. It is used to query a service registry to find the instances of the service that are currently available.

required

Returns: A list of URLs in the format "http://servername:port" for the specified instance or service.

Source code in asab/api/discovery.py
async def locate(self, instance_id: str = None, **kwargs) -> set:
	"""
	Return a list of URLs for a given instance or service ID.

	Args:
		instance_id (str): The ID of a specific instance of a service that the client wants to locate.
		service_id (str): The `service_id` parameter represents identifier of a service to locate.
			It is used to query a service registry to find the instances of the service that are currently available.

	Returns: A list of URLs in the format "http://servername:port" for the specified instance or service.
	"""

	if instance_id is not None:
		locate_params = {"instance_id": instance_id}

	elif len(kwargs) > 0:
		locate_params = kwargs

	else:
		L.warning("Please provide instance_id, service_id, or other custom id to locate the service(s).")
		return None

	# Each taget can have two records - one for ipv4 and second for ipv6. This information is redundant in the URL format.
	return set([
		"http://{}:{}".format(servername, port)
		for servername, port, family
		in await self._locate(locate_params)
	])
session(base_url=None, auth=None, headers=None, **kwargs) ¤

Usage:

```python async with self.DiscoveryService.session() as session: # use URL in format: ://..asab/ # where key is "service_id" or "instance_id" # and value the respective service identificator async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp: ...

Using end-user authorization (from the UI)¤

async with self.DiscoveryService.session(auth=request) as session: async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp: ...

Using internal m2m authorization¤

async with self.DiscoveryService.session(auth="internal") as session: async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp: ...

Source code in asab/api/discovery.py
def session(self, base_url=None, auth=None, headers=None, **kwargs) -> aiohttp.ClientSession:
	"""
	Usage:

	```python
	async with self.DiscoveryService.session() as session:
		# use URL in format: <protocol>://<value>.<key>.asab/<endpoint>
		# where key is "service_id" or "instance_id"
		# and value the respective service identificator
		async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
			...

	# Using end-user authorization (from the UI)
	async with self.DiscoveryService.session(auth=request) as session:
		async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
			...

	# Using internal m2m authorization
	async with self.DiscoveryService.session(auth="internal") as session:
		async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
			...
	"""
	if isinstance(auth, aiohttp.ClientRequest):
		if headers is None:
			headers = {}
		assert "Authorization" in auth.headers
		headers["Authorization"] = auth.headers.get("Authorization")
	elif auth == "internal":
		if jwcrypto is None:
			raise ModuleNotFoundError(
				"You are trying to use internal auth without 'jwcrypto' installed. "
				"Please run 'pip install jwcrypto' or install asab with 'authz' optional dependency."
			)
		if headers is None:
			headers = {}
		headers["Authorization"] = "Bearer {}".format(self.InternalAuthToken.serialize())
	elif auth is None:
		pass
	else:
		raise ValueError(
			"Invalid 'auth' value. "
			"Only instances of aiohttp.ClientRequest or the literal string 'internal' are allowed. "
			"Found {}.".format(type(auth))
		)
	return aiohttp.ClientSession(
		base_url,
		connector=aiohttp.TCPConnector(resolver=DiscoveryResolver(self)),
		headers=headers,
		**kwargs
	)

NotDiscoveredError ¤

Bases: RuntimeError

Raised when given service is not discovered.

Source code in asab/api/discovery.py
class NotDiscoveredError(RuntimeError):
	"""
	Raised when given service is not discovered.
	"""
	pass