Skip to content

Discovery Service¤

Service discovery enables communication among multiple ASAB microservices in a server cluster. Each microservice can search for and find address to API interface (URL) of any other service within the cluster.

In service discovery, there are two main roles: the "server" and the "client."

The server advertises its "position" in the cluster and provides an API for communication. The client uses this API to interact with the server.

In the context of service discovery, all microservices in the cluster act as both servers and clients.

Prerequisites¤

Following requirements must be fulfilled:

  • Zookeeper connection and configuration must be the same for all services in the cluster.
  • Zookeeper container must be initialized in the service.
  • asab.WebService and asab.WebContainer must be initialized.
  • asab.APIService must be initialized.
  • Environment variables NODE_ID, SERVICE_ID and INSTANCE_ID must be set.
  • INSTANCE_ID (or hostname if INSTANCE_ID is missing) must be resolvable.

Server - Advertising into ZooKeeper¤

Even though the service can provide multiple communication interfaces, major use case is the web API.

A "server" application has to provide the API and advertise its address into a consensus technlogy - ZooKeeper.

Application requirements¤

Services inside the application must be initialized in the right order.

class MyApp(asab.Application):
    def __init__(self):
        super.__init__(self)
        # Initialize web server
        self.add_module(asab.web.Module)
        websvc = self.get_service("asab.WebService")
        self.WebContainer = asab.web.WebContainer(websvc, "web")

        # Initialize zookeeper
        self.add_module(asab.zookeeper.Module)
        self.ZooKeeperService = self.get_service("asab.ZooKeeperService")
        self.ZooKeeperContainer = asab.zookeeper.ZooKeeperContainer(
            self.ZooKeeperService, "zookeeper"
        )

        # Initialize ApiService
        self.ASABApiService = asab.api.ApiService(self)
        self.ASABApiService.initialize_web(self.WebContainer)
        self.ASABApiService.initialize_zookeeper(self.ZooKeeperContainer)

Configuration¤

ZooKeeper configuration must be the same for all services in the cluster. Port can be set in the configuration.

myapp.conf
[zookeeper]
servers=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181

[web]
listen=0.0.0.0 8090

Environment variables¤

The discovery is based on provided "ids". Use either a set of specific ids provided as environemntal variable, or create custom discovery ids within the application.

Pre-cast ids used for service discovery. Provide them as environment variables:

Variable Description Example
SERVICE_ID Identifier of the ASAB microservice. my-app
INSTANCE_ID Identifier of a specific instance of the ASAB microservice. my-app-1, my-app-2, ...
NODE_ID Identifier of a cluster node. node-1, node-2, ...

Instance ID must be resolvable

ASAB framework cannot set up networking. In order to enable service discovery, INSTANCE_ID of a service must be resolavable from the client. If INSTANCE_ID is missing, hostname is taken instead and then, hostname must be resolvable.

Information advertised¤

When all requirements of a server-side microservice are fullfilled, the service advertises into the consensus a unique information bound to its runtime.

The file of JSON format contains information describing the running application.

asab/run/ASABMyApplication.0000090098
{
    "host": "server-name-1",
    "appclass": "MyApp",
    "node_id": "node-1",
    "service_id": "my-app",
    "instance_id": "my-app-1",
    "launch_time": "2023-12-01T10:52:29.656397Z",
    "process_id": 1,
    "containerized": true,
    "created_at": "2023-11-23T11:08:22.454248Z",
    "version": "v23.47-alpha",
    "CI_COMMIT_TAG": "v23.47-alpha",
    "CI_COMMIT_REF_NAME": "v23.47-alpha",
    "CI_COMMIT_SHA": "...",
    "CI_COMMIT_TIMESTAMP": "2023-11-23T11:00:48+00:00",
    "CI_JOB_ID": "...",
    "CI_PIPELINE_CREATED_AT": "2023-11-23T11:06:40Z",
    "CI_RUNNER_ID": "..",
    "CI_RUNNER_EXECUTABLE_ARCH": "linux/amd64",
    "web": [
        [
            "0.0.0.0",
            8090
        ]
    ]
}

Client - Using Service Discovery¤

All services are being "server" and "client" at the same time. This paragraph describes how to discover (as a client) a service in the cluster.

Call API using DiscoveryService.session()¤

Once the service propagates itself into ZooKeeper, other services in the cluster can use its API.

DiscoveryService provides a method session() which inherits from aiohttp.ClientSession. It can be used the same way as aiohttp.ClientSession. Instead of explicit URL, use URL with asab domain.

The URL is constructed in the format http://<value>.<key>.asab/... where key is the name of the identifier (e.g. instance_id, service_id) and value is its value (e.g. my_app_1)

Example

class MyApp(asab.Application):
    def __init__(self):
        super.__init__(self)
        # Initialize web server
        self.add_module(asab.web.Module)
        websvc = self.get_service("asab.WebService")
        self.WebContainer = asab.web.WebContainer(websvc, "web")

        # Initialize zookeeper
        self.add_module(asab.zookeeper.Module)
        self.ZooKeeperService = self.get_service("asab.ZooKeeperService")
        self.ZooKeeperContainer = asab.zookeeper.ZooKeeperContainer(
            self.ZooKeeperService, "zookeeper"
        )

        # The DiscoverySession is functional only with ApiService initialized.
        self.ASABApiService = asab.api.ApiService(self)
        self.ASABApiService.initialize_web(self.WebContainer)
        self.ASABApiService.initialize_zookeeper(self.ZooKeeperContainer)

        self.DiscoveryService = self.get_service("asab.DiscoveryService")

    async def main(self):
        async with self.DiscoveryService.session() as session: 
            try:
                # use URL in format: <protocol>://<value>.<key>.asab/<endpoint> where key is "service_id" or "instance_id" and value the respective service identificator
                async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
                    if resp.status == 200:
                        config = await resp.json()
            except asab.api.discovery.NotDiscoveredError as e:
                L.error(e)

Warning

asab.DiscoveryService is functional only with ApiService initialized. That means, WebContainer and ZooKeeperContainer must be also present in the application.

Warning

Discovery Service searches for microservices in the same ZooKeeper path as defined in the configuration. Therefore, all services in the cluster should be configured with the same ZooKeeper path.

locate()¤

Returns set of URLs based on an id of a service. Provide the filter as a dictionary. The keys of the dictionary can be node_id, service_id, instance_id or custom ids.

Example

To look for all URLs of a service called very-nice-service, use inside the application:

await self.DiscoveryService.locate({"service_id": "very-nice-service"})
It returns a set of values, URLs. E.g.
{"http://very-nice-service-1:8888", "http://very-nice-service-2:8889"}

discover()¤

Returns a dictionary with all known services, organized by their ids and information on how to resolve them.

Example

await self.DiscoveryService.discover()

Custom discovery ids¤

Custom identifiers can be set during runtime.

Example

Inside the application, when Api Service is already initialized:

self.ASABApiService.update_discovery({"custom_id": ["id1", "id2", "id3"]})

The argument of the method must be a dictionary, where key is string and value is a list. The custom_id can be used in both discovery session and locate() method.

Using service discovery during authorization¤

When using authorization server e.g. SeaCat Auth to provide authorization for each API call, it can be also found in the cluster through service discovery. In order to connect asab.AuthService with the service discovery, several requiremetns must be met:

  • API Service must be fully initiliazed BEFORE Auth Service.
  • Authorization server (SeaCat Auth) must be present in the cluster, resolvable by its instance id, and advertising itself into the ZooKeeper (being server itself).
  • [auth] configuration section must contain URL recognizable by the service discovery.

Auth Service using service discovery

API Service must be fully initiliazed BEFORE Auth Service.

class MyApp(asab.Application):
    def __init__(self):
        super.__init__(self)
        # Initialize web server
        self.add_module(asab.web.Module)
        websvc = self.get_service("asab.WebService")
        self.WebContainer = asab.web.WebContainer(websvc, "web")

        # Initialize zookeeper
        self.add_module(asab.zookeeper.Module)
        self.ZooKeeperService = self.get_service("asab.ZooKeeperService")
        self.ZooKeeperContainer = asab.zookeeper.ZooKeeperContainer(
            self.ZooKeeperService, "zookeeper"
        )

        # The DiscoverySession is functional only with ApiService initialized.
        self.ASABApiService = asab.api.ApiService(self)
        self.ASABApiService.initialize_web(self.WebContainer)
        self.ASABApiService.initialize_zookeeper(self.ZooKeeperContainer)

        self.DiscoveryService = self.get_service("asab.DiscoveryService")

        # Initialize authorization after ASABApiService.initialize_zookeeper() to get DiscoveryService into auth module
        self.AuthService = asab.web.auth.AuthService(self)
        self.AuthService.install(self.WebContainer)

[auth] configuration section must contain URL recognizable by the service discovery.

my_app.conf
[auth]
"public_keys_url": "http://seacat-auth.service_id.asab/.well-known/jwks.json"

Reference¤

asab.api.discovery ¤

DiscoveryResolver ¤

Bases: DefaultResolver

Custom resolver for Discovery Session based on default aiohttp resolver.

Source code in asab/api/discovery.py
class DiscoveryResolver(aiohttp.DefaultResolver):
	"""
	Custom resolver for Discovery Session based on default `aiohttp` resolver.
	"""

	def __init__(self, svc) -> None:
		super().__init__()
		self.DiscoveryService = svc


	async def resolve(self, hostname: str, port: int = 0, family: int = socket.AF_INET) -> typing.List[typing.Dict[str, typing.Any]]:
		"""
		Resolve a hostname only with '.asab' domain. and return a list of dictionaries
		containing information about the resolved hosts further used by `aiohttp.TCPConnector`.

		The hostname to resolve must be in the format of "<value>.<key>.asab",
		where key is "service_id" or "instance_id" and value is the particular identificator of the service to be resolved.
		The "asab" domain is required for resolution, otherwise it is treated like normal URL.
		"""
		url_split = hostname.rsplit(".", 2)

		# If there is no asab domain, it is normal URL and resolve it normally
		if url_split[-1] != "asab":
			return await super().resolve(hostname, port, family)

		# Make sure the format of the hostname is right
		if len(url_split) != 3:
			raise NotDiscoveredError("Invalid format of the hostname '{}'. Use e.g. `asab-config.service_id.asab` instead.".format(hostname))

		hosts = []
		located_instances = await self.DiscoveryService._locate({url_split[1]: url_split[0]})
		if located_instances is None or len(located_instances) == 0:
			raise NotDiscoveredError("Failed to discover '{}'.".format(hostname))

		for phostname, pport, pfamily in located_instances:
			try:
				resolved = await super().resolve(phostname, pport, pfamily)
			except socket.gaierror:
				# Skip unresolved hosts
				continue
			hosts.extend(resolved)

		if len(hosts) == 0:
			raise NotDiscoveredError("Failed to resolve any of the hosts for '{}'.".format(hostname))

		return hosts
resolve(hostname, port=0, family=socket.AF_INET) async ¤

Resolve a hostname only with '.asab' domain. and return a list of dictionaries containing information about the resolved hosts further used by aiohttp.TCPConnector.

The hostname to resolve must be in the format of "..asab", where key is "service_id" or "instance_id" and value is the particular identificator of the service to be resolved. The "asab" domain is required for resolution, otherwise it is treated like normal URL.

Source code in asab/api/discovery.py
async def resolve(self, hostname: str, port: int = 0, family: int = socket.AF_INET) -> typing.List[typing.Dict[str, typing.Any]]:
	"""
	Resolve a hostname only with '.asab' domain. and return a list of dictionaries
	containing information about the resolved hosts further used by `aiohttp.TCPConnector`.

	The hostname to resolve must be in the format of "<value>.<key>.asab",
	where key is "service_id" or "instance_id" and value is the particular identificator of the service to be resolved.
	The "asab" domain is required for resolution, otherwise it is treated like normal URL.
	"""
	url_split = hostname.rsplit(".", 2)

	# If there is no asab domain, it is normal URL and resolve it normally
	if url_split[-1] != "asab":
		return await super().resolve(hostname, port, family)

	# Make sure the format of the hostname is right
	if len(url_split) != 3:
		raise NotDiscoveredError("Invalid format of the hostname '{}'. Use e.g. `asab-config.service_id.asab` instead.".format(hostname))

	hosts = []
	located_instances = await self.DiscoveryService._locate({url_split[1]: url_split[0]})
	if located_instances is None or len(located_instances) == 0:
		raise NotDiscoveredError("Failed to discover '{}'.".format(hostname))

	for phostname, pport, pfamily in located_instances:
		try:
			resolved = await super().resolve(phostname, pport, pfamily)
		except socket.gaierror:
			# Skip unresolved hosts
			continue
		hosts.extend(resolved)

	if len(hosts) == 0:
		raise NotDiscoveredError("Failed to resolve any of the hosts for '{}'.".format(hostname))

	return hosts

DiscoveryService ¤

Bases: Service

Service for discovering ASAB microservices in a server cluster. It is based on searching in ZooKeeper /run path.

Source code in asab/api/discovery.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
class DiscoveryService(Service):
	"""
	Service for discovering ASAB microservices in a server cluster. It is based on searching in ZooKeeper `/run` path.
	"""
	def __init__(self, app, zkc, service_name="asab.DiscoveryService") -> None:
		super().__init__(app, service_name)
		self.ZooKeeperContainer = zkc
		self.ProactorService = zkc.ProactorService
		self.InternalAuthKeyPath = "/asab/auth/internal_auth_private.key"
		self.InternalAuthKey = None
		self.InternalAuthToken = None
		self.InternalAuthTokenExpiration: datetime.timedelta = datetime.timedelta(seconds=5 * 300)

		self._advertised_cache = dict()
		self._cache_lock = asyncio.Lock()
		self._ready_event = asyncio.Event()

		self.App.PubSub.subscribe("Application.tick/300!", self._on_tick)
		self.App.PubSub.subscribe("ZooKeeperContainer.state/CONNECTED!", self._on_zk_ready)



	def _on_tick(self, msg):
		self.App.TaskService.schedule(self._rescan_advertised_instances())
		if jwcrypto is not None:
			self._ensure_internal_auth_token()


	def _on_zk_ready(self, msg, zkc):
		if zkc == self.ZooKeeperContainer:
			self.App.TaskService.schedule(self._rescan_advertised_instances())
			if jwcrypto is not None:
				self.App.TaskService.schedule(self._ensure_internal_auth_key(zkc))


	async def _ensure_internal_auth_key(self, zkc=None):
		zkc = zkc or self.ZooKeeperContainer
		private_key_json = None
		# Attempt to create and write a new private key
		# while avoiding race condition with other ASAB services
		while not private_key_json:
			# Try to get the key
			try:
				private_key_json, _ = zkc.ZooKeeper.Client.get(self.InternalAuthKeyPath)
				break
			except kazoo.exceptions.NoNodeError:
				pass

			# Generate a new key
			private_key = jwcrypto.jwk.JWK.generate(kty="EC", crv="P-256")
			private_key_json = json.dumps(private_key.export(as_dict=True)).encode("utf-8")
			try:
				zkc.ZooKeeper.Client.create(self.InternalAuthKeyPath, private_key_json, makepath=True)
				L.info("Internal auth key created.", struct_data={
					"kid": private_key.key_id, "path": self.InternalAuthKeyPath})
			except kazoo.exceptions.NodeExistsError:
				# Another ASAB service has probably created the key in the meantime
				pass

		private_key = jwcrypto.jwk.JWK.from_json(private_key_json)
		if private_key != self.InternalAuthKey:
			# Private key has changed
			self.InternalAuthKey = private_key
			self._ensure_internal_auth_token(force_new=True)


	def _ensure_internal_auth_token(self, force_new: bool = False):
		assert self.InternalAuthKey

		def _get_own_discovery_url():
			instance_id = os.getenv("INSTANCE_ID", None)
			if instance_id:
				return "http://{}.instance_id.asab".format(instance_id)

			service_id = os.getenv("SERVICE_ID", None)
			if service_id:
				return "http://{}.service_id.asab".format(service_id)

			return "http://{}".format(self.App.HostName)

		if self.InternalAuthToken and not force_new:
			claims = json.loads(self.InternalAuthToken.claims)
			if claims.get("exp") > (
				datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(seconds=300)
			).timestamp():
				# Token is valid and does not expire soon
				return

		# Use this service's discovery URL as issuer ID and authorized party ID
		my_discovery_url = _get_own_discovery_url()
		expiration = datetime.datetime.now(datetime.timezone.utc) + self.InternalAuthTokenExpiration
		claims = {
			# Issuer (URL of the app that created the token)
			"iss": my_discovery_url,
			# Issued at
			"iat": int(datetime.datetime.now(datetime.timezone.utc).timestamp()),
			# Expires at
			"exp": int((expiration).timestamp()),
			# Authorized party
			"azp": my_discovery_url,
			# Audience (who is allowed to use this token)
			"aud": "http://{}".format(self.App.HostName),  # TODO: Something that signifies "anyone in this internal space"
			# Tenants and resources
			"resources": {
				"*": ["authz:superuser"],
			}
		}

		self.InternalAuthToken = jwcrypto.jwt.JWT(
			header={
				"alg": "ES256",
				"typ": "JWT",
				"kid": self.InternalAuthKey.key_id,
			},
			claims=json.dumps(claims)
		)
		self.InternalAuthToken.make_signed_token(self.InternalAuthKey)

		L.info("New internal auth token issued.", struct_data={"exp": expiration})


	async def locate(self, instance_id: str = None, **kwargs) -> set:
		"""
		Return a list of URLs for a given instance or service ID.

		Args:
			instance_id (str): The ID of a specific instance of a service that the client wants to locate.
			service_id (str): The `service_id` parameter represents identifier of a service to locate.
				It is used to query a service registry to find the instances of the service that are currently available.

		Returns: A list of URLs in the format "http://servername:port" for the specified instance or service.
		"""

		if instance_id is not None:
			locate_params = {"instance_id": instance_id}

		elif len(kwargs) > 0:
			locate_params = kwargs

		else:
			L.warning("Please provide instance_id, service_id, or other custom id to locate the service(s).")
			return None

		# Each taget can have two records - one for ipv4 and second for ipv6. This information is redundant in the URL format.
		return set([
			"http://{}:{}".format(servername, port)
			for servername, port, family
			in await self._locate(locate_params)
		])

	async def _locate(self, locate_params) -> typing.Set[typing.Tuple]:
		"""
		Locate service instances based on their instance ID or service ID.

		Args:
			instance_id (str): The unique identifier for a specific instance of a service
			service_id (str): The ID of the service to locate

		Returns: a list of tuples containing the server name and port number of the located service(s).
		"""
		res = set()

		await asyncio.wait_for(self._ready_event.wait(), 600)

		if len(self._advertised_cache) == 0:
			L.warning("No instances to discover. Make sure [zookeeper] configuration is identical for all the ASAB services in the cluster.")
			return res

		for id_type, ids in self._advertised_cache.items():
			if id_type in locate_params:
				if locate_params[id_type] in ids:
					res = res | ids[locate_params[id_type]]

		return res


	async def discover(self) -> typing.Dict[str, typing.Dict[str, typing.Set[typing.Tuple]]]:
		# We need to make a copy of the cache so that the caller can't modify our cache.
		await asyncio.wait_for(self._ready_event.wait(), 600)
		return copy.deepcopy(self._advertised_cache)


	async def get_advertised_instances(self) -> typing.List[typing.Dict]:
		"""
		This method is here for backward compatibility. Use `discover()` method instead.
		Returns a list of dictionaries. Each dictionary represents an advertised instance
		obtained by iterating over the items in the `/run` path in ZooKeeper.
		"""
		# TODO: an obsolete log for this method
		advertised = []
		async for item, item_data in self._iter_zk_items():
			item_data['ephemeral_id'] = item
			advertised.append(item_data)

		return advertised


	async def _rescan_advertised_instances(self):
		"""
		Returns structured dataset of identifier types, identifiers of instances and hosts and ports where they can be found.
		It is a dict of identifier types as keys and dict as values. The second-layer dict has identifiers as keys an a set of tuples as a value. Each tuple contains host and port.

		Example of the data structure:
			{
				"instance_id": {
					"lmio-receiver-1": {("node1", 1234, socket.AF_INET)},
					"asab-remote-control-1": {("node2", 1234, socket.AF_INET6)}
					...
				},
				"service_id": {
					"lmio-receiver": {("node1", 1234, socket.AF_INET), ("node2", 1234, socket.AF_INET), ("node3", 1234, socket.AF_INET)},
					...
				},
				"custom1_id": {
					"myid123": {("node1", 5678, socket.AF_INET), ("node2", 5678, socket.AF_INET6)},
					...
				},
				"custom2_id": {
					"myid123": {("node1", 5678, socket.AF_INET), ("node2", 5678, socket.AF_INET)},
					...
				},
				...
			}
		"""
		if self._cache_lock.locked():
			# Only one rescan / cache update at a time
			return

		async with self._cache_lock:

			advertised = {
				"instance_id": {},
				"service_id": {},
			}

			iterator = self._iter_zk_items()
			if iterator is None:
				return  # reason logged from the _iter_zk_items method

			async for item, item_data in iterator:
				instance_id = item_data.get("instance_id")
				service_id = item_data.get("service_id")
				discovery: typing.Dict[str, list] = item_data.get("discovery", {})

				if instance_id is not None:
					discovery["instance_id"] = [instance_id]

				if instance_id is not None:
					discovery["service_id"] = [service_id]

				host = item_data.get("host")
				if host is None:
					continue

				web = item_data.get("web")
				if web is None:
					continue

				for i in web:

					try:
						ip = i[0]
						port = i[1]
					except KeyError:
						L.error("Unexpected format of 'web' section in advertised data: '{}'".format(web))
						return

					if ip == "0.0.0.0":
						family = socket.AF_INET
					elif ip == "::":
						family = socket.AF_INET6
					else:
						continue

					if discovery is not None:
						for id_type, ids in discovery.items():
							if advertised.get(id_type) is None:
								advertised[id_type] = {}

							for identifier in ids:
								if identifier is not None:
									if advertised[id_type].get(identifier) is None:
										advertised[id_type][identifier] = {(host, port, family)}
									else:
										advertised[id_type][identifier].add((host, port, family))

			self._advertised_cache = advertised
			self._ready_event.set()


	async def _iter_zk_items(self):
		base_path = self.ZooKeeperContainer.Path + "/run"

		def get_items():
			try:
				# Create the base path if it does not exist
				if not self.ZooKeeperContainer.ZooKeeper.Client.exists(base_path):
					self.ZooKeeperContainer.ZooKeeper.Client.create(base_path, b'', makepath=True)

				return self.ZooKeeperContainer.ZooKeeper.Client.get_children(base_path, watch=self._on_change_threadsafe)
			except (kazoo.exceptions.SessionExpiredError, kazoo.exceptions.ConnectionLoss):
				L.warning("Connection to ZooKeeper lost. Discovery Service could not fetch up-to-date state of the cluster services.")
				return None

			except kazoo.exceptions.KazooException:
				return None

		def get_data(item):
			try:
				data, stat = self.ZooKeeperContainer.ZooKeeper.Client.get((base_path + '/' + item), watch=self._on_change_threadsafe)
				return data
			except (kazoo.exceptions.SessionExpiredError, kazoo.exceptions.ConnectionLoss):
				L.warning("Connection to ZooKeeper lost. Discovery Service could not fetch up-to-date state of the cluster services.")
				return None

			except kazoo.exceptions.KazooException:
				# There's a race condition -> if ZK node is deleted between get_items and get_data calls, NoNodeError is raised. Let's just ignore it. The ZK node is already deleted.
				return None

		items = await self.ProactorService.execute(get_items)
		if items is None:
			return

		for item in items:
			item_data = await self.ProactorService.execute(get_data, item)
			if item_data is None:
				continue
			yield item, json.loads(item_data)

	def _on_change_threadsafe(self, watched_event):
		# Runs on a thread, returns the process back to the main thread
		def _update_cache():
			self.App.TaskService.schedule(self._rescan_advertised_instances())
		self.App.Loop.call_soon_threadsafe(_update_cache)


	def session(
		self,
		base_url: typing.Optional[str] = None,
		tenant: typing.Optional[str] = None,
		auth: typing.Union[str, aiohttp.ClientRequest, None] = None,
		headers: typing.Optional[typing.Mapping[str, str]] = None,
		**kwargs
	) -> aiohttp.ClientSession:
		"""
		Open HTTP session with custom hostname resolver for ASAB microservices.

		Args:
			:param base_url: Base URL to use for requests.
			:param tenant: Request tenant ID.
			:param auth: Client request to extract authorization from, or the string "internal", to use internal authorization.
			:param headers: Custom session HTTP headers.

		Usage:

		```python
		# Without authorization
		async with self.DiscoveryService.session() as session:
			# use URL in format: <protocol>://<value>.<key>.asab/<endpoint>
			# where key is "service_id" or "instance_id"
			# and value the respective service identificator
			async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
				...

		# Using end-user authorization (from the UI)
		async with self.DiscoveryService.session(auth=request) as session:
			async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
				...

		# Using internal m2m authorization
		async with self.DiscoveryService.session(auth="internal") as session:
			async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
				...
		"""
		_headers = {}

		if auth is None:
			# By default, use the authorization from the incoming request
			request = Request.get(None)
			if request is not None and "Authorization" in request.headers:
				_headers["Authorization"] = request.headers["Authorization"]

		elif isinstance(auth, aiohttp.web.Request):
			assert "Authorization" in auth.headers
			_headers["Authorization"] = auth.headers["Authorization"]

		elif auth == "internal":
			if jwcrypto is None:
				raise ModuleNotFoundError(
					"You are trying to use internal auth without 'jwcrypto' installed. "
					"Please run 'pip install jwcrypto' or install asab with 'authz' optional dependency."
				)

			authz = Authz.get(None)
			if authz is not None:
				L.warning(
					"Using internal (superuser) authorization in an already authorized context. "
					"This is potentially unwanted and dangerous.",
				)

			_headers["Authorization"] = "Bearer {}".format(self.InternalAuthToken.serialize())

		else:
			raise ValueError(
				"Invalid 'auth' value. "
				"Only instances of aiohttp.web.Request or the literal string 'internal' are allowed. "
				"Found {}.".format(type(auth))
			)

		# Set tenant header
		if tenant is not None:
			# Use tenant from argument
			_headers["X-Tenant"] = tenant
		else:
			# Use tenant from context
			tenant = Tenant.get(None)
			if tenant is not None:
				_headers["X-Tenant"] = tenant

		if headers is not None:
			_headers.update(headers)

		return aiohttp.ClientSession(
			base_url,
			connector=aiohttp.TCPConnector(resolver=DiscoveryResolver(self)),
			headers=_headers,
			**kwargs
		)
get_advertised_instances() async ¤

This method is here for backward compatibility. Use discover() method instead. Returns a list of dictionaries. Each dictionary represents an advertised instance obtained by iterating over the items in the /run path in ZooKeeper.

Source code in asab/api/discovery.py
async def get_advertised_instances(self) -> typing.List[typing.Dict]:
	"""
	This method is here for backward compatibility. Use `discover()` method instead.
	Returns a list of dictionaries. Each dictionary represents an advertised instance
	obtained by iterating over the items in the `/run` path in ZooKeeper.
	"""
	# TODO: an obsolete log for this method
	advertised = []
	async for item, item_data in self._iter_zk_items():
		item_data['ephemeral_id'] = item
		advertised.append(item_data)

	return advertised
locate(instance_id=None, **kwargs) async ¤

Return a list of URLs for a given instance or service ID.

Parameters:

Name Type Description Default
instance_id str

The ID of a specific instance of a service that the client wants to locate.

None
service_id str

The service_id parameter represents identifier of a service to locate. It is used to query a service registry to find the instances of the service that are currently available.

required

Returns: A list of URLs in the format "http://servername:port" for the specified instance or service.

Source code in asab/api/discovery.py
async def locate(self, instance_id: str = None, **kwargs) -> set:
	"""
	Return a list of URLs for a given instance or service ID.

	Args:
		instance_id (str): The ID of a specific instance of a service that the client wants to locate.
		service_id (str): The `service_id` parameter represents identifier of a service to locate.
			It is used to query a service registry to find the instances of the service that are currently available.

	Returns: A list of URLs in the format "http://servername:port" for the specified instance or service.
	"""

	if instance_id is not None:
		locate_params = {"instance_id": instance_id}

	elif len(kwargs) > 0:
		locate_params = kwargs

	else:
		L.warning("Please provide instance_id, service_id, or other custom id to locate the service(s).")
		return None

	# Each taget can have two records - one for ipv4 and second for ipv6. This information is redundant in the URL format.
	return set([
		"http://{}:{}".format(servername, port)
		for servername, port, family
		in await self._locate(locate_params)
	])
session(base_url=None, tenant=None, auth=None, headers=None, **kwargs) ¤

Open HTTP session with custom hostname resolver for ASAB microservices.

Parameters:

Name Type Description Default

param base_url: Base URL to use for requests.

required

param tenant: Request tenant ID.

required

param auth: Client request to extract authorization from, or the string "internal", to use internal authorization.

required

param headers: Custom session HTTP headers.

required

Usage:

```python

Without authorization¤

async with self.DiscoveryService.session() as session: # use URL in format: ://..asab/ # where key is "service_id" or "instance_id" # and value the respective service identificator async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp: ...

Using end-user authorization (from the UI)¤

async with self.DiscoveryService.session(auth=request) as session: async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp: ...

Using internal m2m authorization¤

async with self.DiscoveryService.session(auth="internal") as session: async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp: ...

Source code in asab/api/discovery.py
def session(
	self,
	base_url: typing.Optional[str] = None,
	tenant: typing.Optional[str] = None,
	auth: typing.Union[str, aiohttp.ClientRequest, None] = None,
	headers: typing.Optional[typing.Mapping[str, str]] = None,
	**kwargs
) -> aiohttp.ClientSession:
	"""
	Open HTTP session with custom hostname resolver for ASAB microservices.

	Args:
		:param base_url: Base URL to use for requests.
		:param tenant: Request tenant ID.
		:param auth: Client request to extract authorization from, or the string "internal", to use internal authorization.
		:param headers: Custom session HTTP headers.

	Usage:

	```python
	# Without authorization
	async with self.DiscoveryService.session() as session:
		# use URL in format: <protocol>://<value>.<key>.asab/<endpoint>
		# where key is "service_id" or "instance_id"
		# and value the respective service identificator
		async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
			...

	# Using end-user authorization (from the UI)
	async with self.DiscoveryService.session(auth=request) as session:
		async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
			...

	# Using internal m2m authorization
	async with self.DiscoveryService.session(auth="internal") as session:
		async with session.get("http://my_application_1.instance_id.asab/asab/v1/config") as resp:
			...
	"""
	_headers = {}

	if auth is None:
		# By default, use the authorization from the incoming request
		request = Request.get(None)
		if request is not None and "Authorization" in request.headers:
			_headers["Authorization"] = request.headers["Authorization"]

	elif isinstance(auth, aiohttp.web.Request):
		assert "Authorization" in auth.headers
		_headers["Authorization"] = auth.headers["Authorization"]

	elif auth == "internal":
		if jwcrypto is None:
			raise ModuleNotFoundError(
				"You are trying to use internal auth without 'jwcrypto' installed. "
				"Please run 'pip install jwcrypto' or install asab with 'authz' optional dependency."
			)

		authz = Authz.get(None)
		if authz is not None:
			L.warning(
				"Using internal (superuser) authorization in an already authorized context. "
				"This is potentially unwanted and dangerous.",
			)

		_headers["Authorization"] = "Bearer {}".format(self.InternalAuthToken.serialize())

	else:
		raise ValueError(
			"Invalid 'auth' value. "
			"Only instances of aiohttp.web.Request or the literal string 'internal' are allowed. "
			"Found {}.".format(type(auth))
		)

	# Set tenant header
	if tenant is not None:
		# Use tenant from argument
		_headers["X-Tenant"] = tenant
	else:
		# Use tenant from context
		tenant = Tenant.get(None)
		if tenant is not None:
			_headers["X-Tenant"] = tenant

	if headers is not None:
		_headers.update(headers)

	return aiohttp.ClientSession(
		base_url,
		connector=aiohttp.TCPConnector(resolver=DiscoveryResolver(self)),
		headers=_headers,
		**kwargs
	)

NotDiscoveredError ¤

Bases: RuntimeError

Raised when given service is not discovered.

Source code in asab/api/discovery.py
class NotDiscoveredError(RuntimeError):
	"""
	Raised when given service is not discovered.
	"""
	pass