Skip to content

Library Service¤

The ASAB Library is the concept of shared data content across microservices in the cluster. In the cluster/cloud microservice architectures, all microservices must have access to unified resources. The Library provides a read-only interface for listing and reading this content.

The Library is designed to be read-only. It also allows to "stack" various libraries into one view (overlayed), merging the content of each library into one united space.

The library can also notify the ASAB microservice about changes, e.g. for automated update/reload.

Library structure¤

The library content is organized in a simplified file system manner, with directories and files.

Example of the library structure

+ /MyDirectory/
    - /MyDirectory/item1.yaml
    - /MyDirectory/item2.json
+ /AnotherDirectory/
    - /AnotherDirectory/item3.yaml
    + /AnotherDirectory/Folder/
        - /folder2/Folder/item4.json

Library path rules

  • Any path must start with "/", including the root path.
  • The directory path must end with "/".
  • The directory name cannot contain ".".
  • The item path must end with a file extension (e.g. ".txt", ".json", ...).

Layers¤

The library content can be organized into an unlimited number of layers. Each layer is represented by a provider (e.g. filesystem, zookeeper, git, ...) with a specific configuration. Two layers can have the same provider but different base paths.

The layers of the library are like slices of Swiss cheese layered on top of each other. Only if there is a hole in the top layer can you see the layer that shows through underneath. It means that files of the upper layer overwrite files with the same path in the lower layers.

Illustration of Library layers

Illustration of ASAB Library layers. Green items are visible, grey items are hidden. Moreover, Layer 1 contains .disabled.yaml file containing the list of disabled files.

Tip

In the most applications, it is common to create the first layer with Zookeeper provider and the layers beneath with git or libsreg provider. This allows you to quickly modify items of the Library on the first layer.

Disabling files¤

The library concept supports multi-tenancy. By default, all items of the library are visible for everyone, but you can disable some of them for specific tenants.

In order to disable some items of the library, create a file /.disabled.yaml. This file must be created on the first layer.

In the following example, file1.txt is disabled for tenant-1 and tenant-2, file2.txt for tenant-1 and file3.txt for every tenant.

.disabled.yaml
/file1.txt:
    - tenant-1
    - tenant-2
/file2.txt: tenant-1
/file3.txt: '*'

Warning

When disabling a file for all tenants with a star, don't forget to close it in quotation marks. Otherwise, YAML would interpret star as an alias. Read more about anchors and aliases.

Library service¤

The library service may exist in multiple instances, with different paths setups. For that reason, you have to provide a unique service_name and there is no default value for that.

Each Library item is represented by LibraryItem dataclass. Read more in the reference section.

Example of the use:

import asab
import asab.library

class MyApplication(asab.Application):

    async def initialize(self):
        self.LibraryService = asab.library.LibraryService(self, "LibraryService") #(1)!
        self.PubSub.subscribe("Library.ready!", self.on_library_ready) #(2)!

    async def on_library_ready(self, event_name, library): #(3)!

        for item in await self.LibraryService.list("/", recursive=True): #(4)!
            print("*", item)
            if item.type == 'item': #(5)!
                itemio = await self.LibraryService.read(item.name) #(6)!
                if itemio is not None:
                    with itemio: #(7)!
                        content = itemio.read()
                        print("- content: {} bytes".format(len(content)))
                else:
                    print("  - (DISABLED)")

if __name__ == '__main__':
    app = MyApplication()
    app.run()
  1. Initializes the Library Service. Remember to specify a unique service_name.
  2. When the Library is initialized, Library.ready! PubSub message is emitted.
  3. The callback has to possess two arguments. event_name is the message "Library.ready!", library is the specific provider with which is the Library initialized.
  4. list() method returns list of LibraryItems. For more information, see the reference section.
  5. item.type can be either 'item' or 'dir'.
  6. read() coroutine returns item IO object or None if the file is disabled.
  7. Item IO object is used as a context manager.

Example of the library configuration:

[library]
providers:
    provider+1://...
    provider+2://...
    provider+3://...

PubSub messages¤

The Library is created in not-ready state. After the connection with the technologies behind is established, every library provider changes its state to ready. The Library switches to ready state after all its providers are ready.

If some of the providers is disconnected, the Library switches to not-ready state again till the connection is reestablished.

Every time the Library changes its state, PubSub message is published, with the arguments provider and path.

Message Published when...
Library.not_ready! at least one provider is not ready.
Library.ready! all of the providers are ready.
Library.change! the content of the Library has changed.

Notification on changes¤

Some providers are able to detect changes of the library items.

Example

class MyApplication(asab.Application):

async def initialize(self):
    self.PubSub.subscribe("Library.ready!", self.on_library_ready
    self.PubSub.subscribe("Library.change!", self.on_library_change)

async def on_library_ready(self, event_name, library=None):
    await self.LibraryService.subscribe(["/asab"]) #(1)!

def on_library_change(self, message, provider, path): #(2)!
    print("New changes in the library found by provider: '{}'".format(provider))
  1. self.LibraryService.subscribe() method takes either a single path as a string or multiple paths in list and watches for changes in them.
  2. This coroutine takes three arguments: message (Library.change! in this case), provider (name of the provider that has detected changes) and path (the path where changes were made).

Info

Note that the some of the providers detect changes immediately while others detect them periodically. For example, git provider pulls the repository every minute, only after that the changes can be detected.

Providers¤

The list of available providers:

Provider Read the content Notify on changes
Filesystem
Apache Zookeeper
Microsoft Azure Storage
Git
Libraries repository

Filesystem¤

The most basic provider that reads data from the local filesystem. The notification on changes functionality is available only for Linux systems, as it uses inotify.

Configuration examples:

[library]
providers: /home/user/directory
[library]
providers: ./this_directory
[library]
providers: file:///home/user/directory

Apache Zookeeper¤

ZooKeeper as a consensus technology is vital for microservices in the cluster.

There are several configuration strategies:

1) Configuration from [zookeeper] section.

[zookeeper]
servers=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
path=/library

[library]
providers:
    zk://

2) Specify a path of a ZooKeeper node where only library lives.

[zookeeper]
servers=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
path=/else

[library]
providers:
    zk:///library
[zookeeper]
servers=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
path=/else

[library]
providers:
    zk:///

3) Configuration from the URL in the [library] section.

[library]
providers:
    zk://zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181/library

4) Configuration from [zookeeper] section and joined [path]{.title-ref} from [zookeeper] and [library] sections.

> The resulting path will be [/else/library]{.title-ref}.
[zookeeper]
servers=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
path=/else

[library]
providers:
    zk://./library

If a path from the [zookeeper] section is missing, an application class name will be used, e.g. /BSQueryApp/library.

Microsoft Azure Storage¤

You can configure the microservice to read from the Microsoft Azure Storage container.

Configuration:

[library]
providers: azure+https://ACCOUNT-NAME.blob.core.windows.net/BLOB-CONTAINER

If Container Public Access Level is not set to "Public access", then "Access Policy" must be created with "Read" and "List" permissions and "Shared Access Signature" (SAS) query string must be added to a URL in a configuration:

[library]
providers: azure+https://ACCOUNT-NAME.blob.core.windows.net/BLOB-CONTAINER?sv=2020-10-02&si=XXXX&sr=c&sig=XXXXXXXXXXXXXX

Git repository¤

Warning

Connection to git repositories requires pygit2 library to be installed.

pip install pygit2

Please follow this format in the configuration:

[library]
providers: git+http(s)://<username>:<deploy-token>@<path>#<branch>

Cloning from GitHub repository:

Using a public repository from GitHub, the configuration may look like this:

[library]
providers: git+https://github.com/john/awesome_project.git

Using custom branch:

Use hash #<branch-name> to clone a repository from a selected branch:

[library]
providers: git+https://github.com/john/awesome_project.git#name-of-the-branch

Deploy tokens in GitLab¤

GitLab uses deploy tokens to enable authentication of deployment tasks, independent of a user account. Authentication through deploy tokens is the only supported option for now.

If you want to create a deploy token for your GitLab repository, follow these steps from the manual:

  1. Go to Settings > Repository > Deploy tokens section in your repository. (Note that you have to possess a "Maintainer" or "Owner" role for the repository.)
  2. Expand the "Deploy tokens" section. The list of current Active Deploy Tokens will be displayed.
  3. Complete the fields and scopes. We recommend a custom "username", as you will need it later for the URL in the configuration.
  4. Record the deploy token's values before leaving or refreshing the page! After that, you cannot access it again.

After the deploy token is created, use the URL for the repository in the following format:

[library]
providers: git+https://<username>:<deploy_token>@gitlab.example.com/john/awesome_project.git

Where does the repository clone?¤

The git provider clones the repository into a temporary directory. The default path for the cloned repository is /tmp/asab.library.git/ and it can be changed manually:

[library:git]
repodir=path/to/repository/cache

Libraries repository¤

The libsreg provider downloads the content from the distribution URL. The distribution URL points to HTTP(S) server where content archives are published.

Configuration examples:

[library]
providers: libsreg+https://libsreg.example.com/my-library

More than one distribution server can be specified:

[library]
providers: libsreg+https://libsreg1.example.com,libsreg2.example.com/my-library

This variant provides more resiliency against a distribution server unavailability.

A structure of the distribution server filesystem:

+ /my-library/
  - my-library-master.tar.xz
  - my-library-master.tar.xz.sha256
  - my-library-production.tar.xz
  - my-library-production.tar.xz.sha256
  - my-library-v43.41.tar.xz
  - my-library-v43.41.tar.xz.sha256
  ...
  • *.tar.xz: This is the TAR/XZ archive of the actual content
  • *.tar.xz.sha256: SHA256 checksum of the archive

The structure of the distribution is as follows:

/{archname}/{archname}-{version}.tar.xz

  • archname: A name of the distribution archive, my-library in the example above
  • version: A version of the distribution archive, master, production are typically GIT branches, v43.41 is a GIT tag.

Tip

This provider is designed to use Microsoft Azure Storage as a distribution point. Is is assumed that the content archives are uploaded to the distribution point using CI/CD.

Reference¤

asab.library.LibraryService ¤

Bases: Service

Configuration:

[library]
providers:
        provider+1://
        provider+2://
        provider+3://

The order of providers is important, the priority (or layering) is top-down.

Each library provider is specified by URL/URI schema:

  • zk:// or zookeeper:// for ZooKeeper provider
  • file:// or local path for FileSystem provider
  • azure+https:// for Microsoft Azure Storage provider.
  • git+https:// for Git provider.
  • libsreg+https:// for Libraries provider.

The first provider is responsible for providing /.disabled.yaml.

A library is created in “not ready” state, each provider then informs the library when it is ready (eg. Zookeeper provider needs to connect to Zookeeper servers). Only after all providers are ready, the library itself becomes ready. The library indicates that by the PubSub event Library.ready!.

Source code in asab/library/service.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
class LibraryService(Service):
	"""
	Configuration:

	```ini
	[library]
	providers:
		provider+1://
		provider+2://
		provider+3://
	```

	The order of providers is important, the priority (or layering) is top-down.

	Each library provider is specified by URL/URI schema:

	* `zk://` or `zookeeper://` for ZooKeeper provider
	* `file://` or local path for FileSystem provider
	* `azure+https://` for Microsoft Azure Storage provider.
	* `git+https://` for Git provider.
	* `libsreg+https://` for Libraries provider.

	The first provider is responsible for providing `/.disabled.yaml`.

	A library is created in “not ready” state, each provider then informs the library when it is ready
	(eg. Zookeeper provider needs to connect to Zookeeper servers). Only after all providers are ready, the library itself becomes ready.
	The library indicates that by the PubSub event `Library.ready!`.
	"""

	def __init__(self, app: Application, service_name: str, paths: typing.Union[str, typing.List[str], None] = None):
		"""
		Initialize the LibraryService.

		The library service is designed to "exist" in multiple instances,
		with different `paths` setups.
		For that reason, you have to provide unique `service_name`
		and there is no _default_ value for that.

		If `paths` are not provided, they are fetched from `[library]providers` configuration.

		Args:
			app: The ASAB Application.
			service_name: A unique name of the service.
			paths (str | list[str] | None ): Either single path or list of paths with which LibraryService is connected.
		"""

		super().__init__(app, service_name)
		self.Libraries: list[LibraryProviderABC] = []
		self.Disabled: dict = {}
		self.DisabledPaths: list = []

		if paths is None:
			# load them from configuration
			try:
				paths = Config.getmultiline("library", "providers")
			except configparser.NoOptionError:
				L.critical("'providers' option is not present in configuration section 'library'.")
				raise SystemExit("Exit due to a critical configuration error.")

		# paths can be string if specified as argument
		if isinstance(paths, str):
			paths = re.split(r"\s+", paths)

		for layer, path in enumerate(paths):
			# Create library for each layer of paths
			self._create_library(path, layer)

		app.PubSub.subscribe("Application.tick/60!", self._on_tick60)


	async def finalize(self, app):
		while len(self.Libraries) > 0:
			lib = self.Libraries.pop(-1)
			await lib.finalize(self.App)

	async def _on_tick60(self, message_type):
		await self._read_disabled()

	def _create_library(self, path, layer):
		library_provider = None
		if path.startswith('zk://') or path.startswith('zookeeper://'):
			from .providers.zookeeper import ZooKeeperLibraryProvider
			library_provider = ZooKeeperLibraryProvider(self, path, layer)

		elif path.startswith('./') or path.startswith('/') or path.startswith('file://'):
			from .providers.filesystem import FileSystemLibraryProvider
			library_provider = FileSystemLibraryProvider(self, path, layer)

		elif path.startswith('azure+https://'):
			from .providers.azurestorage import AzureStorageLibraryProvider
			library_provider = AzureStorageLibraryProvider(self, path, layer)

		elif path.startswith('git+'):
			from .providers.git import GitLibraryProvider
			library_provider = GitLibraryProvider(self, path, layer)

		elif path.startswith('libsreg+'):
			from .providers.libsreg import LibsRegLibraryProvider
			library_provider = LibsRegLibraryProvider(self, path, layer)

		elif path == '' or path.startswith("#") or path.startswith(";"):
			# This is empty or commented line
			return

		else:
			L.error("Incorrect/unknown provider for '{}'".format(path))
			raise SystemExit("Exit due to a critical configuration error.")

		self.Libraries.append(library_provider)

	def is_ready(self) -> bool:
		"""
		Check if all the libraries are ready.

		Returns:
			True if all libraries are ready, otherwise False.
		"""
		if len(self.Libraries) == 0:
			return False

		return functools.reduce(
			lambda x, provider: provider.IsReady and x,
			self.Libraries,
			True
		)

	async def _set_ready(self, provider):
		if len(self.Libraries) == 0:
			return

		if (provider == self.Libraries[0]) and provider.IsReady:
			await self._read_disabled()

		if self.is_ready():
			L.log(LOG_NOTICE, "is ready.", struct_data={'name': self.Name})
			self.App.PubSub.publish("Library.ready!", self)
		elif not provider.IsReady:
			L.log(LOG_NOTICE, "is NOT ready.", struct_data={'name': self.Name})
			self.App.PubSub.publish("Library.not_ready!", self)

	async def find(self, path: str) -> typing.Optional[typing.List[str]]:
		"""
		Searches for files with a specific name within a library, using the provided path.

		The method traverses the library directories, looking for files that match the given filename.
		It returns a list of paths leading to these files, or `None` if no such files are found.

		Args:
			path (str): The path specifying the filename and its location within the library.
						The path should start with a forward slash and include the filename.
						Example: '/library/Templates/.setup.yaml'

		Returns:
			typing.Optional[typing.List[str]]: A list containing the paths to the found files,
											`or an `empty list` if no files are found that match the filename.
		"""
		# File path must start with '/'
		assert path[:1] == '/', "Item path '{}' must start with a forward slash (/). For example: /library/Templates/.setup.yaml".format(path)
		# File path must end with the filename
		assert len(os.path.splitext(path)[1]) > 0, "Item path '{}' must end with an extension. For example: /library/Templates/item.json".format(path)

		results = []
		for library in self.Libraries:
			found_files = await library.find(path)
			if found_files:
				results.extend(found_files)

		return results

	async def read(self, path: str, tenant: typing.Optional[str] = None) -> typing.Optional[typing.IO]:
		"""
		THIS IS OBSOLETED METHOD, USE `open(...)` !!!

		Read the content of the library item specified by `path`. This method can be used only after the Library is ready.

		Args:
			path (str): Path to the file, `LibraryItem.name` can be used directly.
			tenant (str | None): The tenant to apply. If not specified, the global access is assumed.

		Returns:
			( IO | None ): Readable stream with the content of the library item. `None` is returned if the item is not found or if it is disabled (either globally or for the specified tenant).

		Example:

		```python
		itemio = await library.read('/path', 'tenant')
		if itemio is not None:
			with itemio:
				return itemio.read()
		```
		"""
		# item path must start with '/'
		assert path[:1] == '/', "Item path must start with a forward slash (/). For example: /library/Templates/item.json"
		# Item path must end with the extension
		assert len(os.path.splitext(path)[1]) > 0, "Item path must end with an extension. For example: /library/Templates/item.json"

		if self.check_disabled(path, tenant=tenant):
			return None

		for library in self.Libraries:
			itemio = await library.read(path)
			if itemio is None:
				continue
			return itemio

		return None


	@contextlib.asynccontextmanager
	async def open(self, path: str, tenant: typing.Optional[str] = None):
		"""
		Read the content of the library item specified by `path` in a SAFE way, protected by a context manager/with statement.
		This method can be used only after the Library is ready.

		Example:

		```python
		async with self.App.LibraryService.open(path) as b:
			if b is None:
				return None

			text = b.read().decode("utf-8")
		```
		"""

		itemio = await self.read(path, tenant)
		if itemio is None:
			yield itemio
		else:
			try:
				yield itemio
			finally:
				itemio.close()


	async def list(self, path: str = "/", tenant: typing.Optional[str] = None, recursive: bool = False) -> typing.List[LibraryItem]:
		"""
		List the directory of the library specified by the path that are enabled for the specified tenant. This method can be used only after the Library is ready.

		Args:
			path (str): Path to the directory.
			tenant (str | None): If specified, items that are enabled for the tenant are filtered.
			recursive (bool): If `True`, return a list of items located at `path` and its subdirectories.

		Returns:
			List of items that are enabled for the tenant.
		"""

		# Directory path must start with '/'
		assert path[:1] == '/', "Directory path must start with a forward slash (/). For example: /library/Templates/"
		# Directory path must end with '/'
		assert path[-1:] == '/', "Directory path must end with a forward slash (/). For example: /library/Templates/"
		# Directory path cannot contain '//'
		assert '//' not in path, "Directory path cannot contain double slashes (//). Example format: /library/Templates/"

		# List requested level using all available providers
		items = await self._list(path, tenant, providers=self.Libraries)

		if recursive:
			# If recursive scan is requested, then iterate thru list of items
			# find 'dir' types there and list them.
			# Output of this list is attached to the list for recursive scan
			# and also to the final output
			recitems = list(items[:])

			while len(recitems) > 0:

				item = recitems.pop(0)
				if item.type != 'dir':
					continue

				child_items = await self._list(item.name, tenant, providers=item.providers)
				items.extend(child_items)
				recitems.extend(child_items)
		return items

	async def _list(self, path, tenant, providers):
		# Execute the list query in all providers in-parallel
		result = await asyncio.gather(*[
			library.list(path)
			for library in providers
		], return_exceptions=True)

		items = []
		uniq = dict()
		for ress in result:

			if isinstance(ress, KeyError):
				# The path doesn't exists in the provider
				continue

			if isinstance(ress, Exception):
				L.exception("Error when listing items from provider", exc_info=ress)
				continue

			for item in ress:
				item.disabled = self.check_disabled(item.name, tenant=tenant)

				# If the item already exists, merge or override it
				pitem = uniq.get(item.name)
				if pitem is not None:
					pitem = uniq[item.name]
					if pitem.type == 'dir' and item.type == 'dir':
						# Directories are joined
						pitem.providers.extend(item.providers)
					elif pitem.type == 'item':
						for i, provider in enumerate(providers):
							if provider in item.providers:
								index = i
								break
						pitem.override = index
				# Other item types are skipped
				else:
					uniq[item.name] = item
					items.append(item)
		items.sort(key=lambda x: x.name)
		return items

	async def _read_disabled(self):
		# `.disabled.yaml` is read from the first configured library
		# It is applied on all libraries in the configuration.
		disabled = await self.Libraries[0].read('/.disabled.yaml')

		if disabled is None:
			self.Disabled = {}
			self.DisabledPaths = []
			return

		try:
			disabled = yaml.safe_load(disabled)
		except Exception:
			self.Disabled = {}
			self.DisabledPaths = []
			L.exception("Failed to parse '/.disabled.yaml'")
			return

		if disabled is None:
			self.Disabled = {}
			self.DisabledPaths = []
			return

		if isinstance(disabled, set):
			# This is for a backward compatibility (Aug 2023)
			self.Disabled = {key: '*' for key in self.Disabled}
			self.DisabledPaths = []
			return

		self.Disabled = {}
		self.DisabledPaths = []
		for k, v in disabled.items():
			if k.endswith('/'):
				self.DisabledPaths.append((k, v))
			else:
				self.Disabled[k] = v

		# Sort self.DisabledPaths from the shortest to longest
		self.DisabledPaths.sort(key=lambda x: len(x[0]))


	def check_disabled(self, path: str, tenant: typing.Optional[str] = None) -> bool:
		"""
		Check if the item specified in path is disabled, either globally or for the specified tenant.

		Args:
			path (str): Path to the item to be checked.
			tenant (str | None): The tenant to apply. If not specified, the global access is assumed.

		Returns:
			`True` if the item is disabled for the tenant.
		"""
		if not isinstance(path, str) or not path:
			raise ValueError("The 'path' must be a non-empty string.")

		# First check disabled by path
		for dp, disabled in self.DisabledPaths:
			if path.startswith(dp):
				if '*' in disabled:
					# Path is disabled for everybody
					return True

				if tenant is not None and tenant in disabled:
					# Path is disabled for a specified tenant
					return True

		# Then check for a specific item entries

		disabled = self.Disabled.get(path)

		if disabled is None:
			return False

		if '*' in disabled:
			# Item is disabled for everybody
			return True

		if tenant is not None and tenant in disabled:
			# Item is disabled for a specified tenant
			return True

		return False

	async def export(self, path: str = "/", tenant: typing.Optional[str] = None, remove_path: bool = False) -> typing.IO:
		"""
		Return a file-like stream containing a gzipped tar archive of the library contents of the path.

		Args:
			path: The path to export.
			tenant (str | None ): The tenant to use for the operation.
			remove_path: If `True`, the path will be removed from the tar file.

		Returns:
			A file object containing a gzipped tar archive.
		"""

		# Directory path must start with '/'
		assert path[:1] == '/', "Directory path must start with a forward slash (/). For example: /library/Templates/"
		# Directory path must end with '/'
		assert path[-1:] == '/', "Directory path must end with a forward slash (/). For example: /library/Templates/"
		# Directory path cannot contain '//'
		assert '//' not in path, "Directory path cannot contain double slashes (//). Example format: /library/Templates/"

		fileobj = tempfile.TemporaryFile()
		tarobj = tarfile.open(name=None, mode='w:gz', fileobj=fileobj)

		items = await self._list(path, tenant, providers=self.Libraries[:1])
		recitems = list(items[:])

		while len(recitems) > 0:

			item = recitems.pop(0)
			if item.type != 'dir':
				continue

			child_items = await self._list(item.name, tenant, providers=item.providers)
			items.extend(child_items)
			recitems.extend(child_items)

		for item in items:
			if item.type != 'item':
				continue
			my_data = await self.Libraries[0].read(item.name)
			if remove_path:
				assert item.name.startswith(path)
				tar_name = item.name[len(path):]
			else:
				tar_name = item.name
			info = tarfile.TarInfo(tar_name)
			my_data.seek(0, io.SEEK_END)
			info.size = my_data.tell()
			my_data.seek(0, io.SEEK_SET)
			info.mtime = time.time()
			tarobj.addfile(tarinfo=info, fileobj=my_data)

		tarobj.close()
		fileobj.seek(0)
		return fileobj


	async def subscribe(self, paths: typing.Union[str, typing.List[str]]) -> None:
		"""
		Subscribe to changes for specified paths of the library.

		In order to notify on changes in the Library, this method must be used after the Library is ready.

		Args:
			paths (str | list[str]): Either single path or list of paths to be subscribed. All the paths must be absolute (start with '/').

		Examples:
		```python
		class MyApplication(asab.Application):

			async def initialize(self):
				self.PubSub.subscribe("Library.ready!", self.on_library_ready
				self.PubSub.subscribe("Library.change!", self.on_library_change)

			async def on_library_ready(self, event_name, library=None):
				await self.LibraryService.subscribe(["/alpha","/beta"])

			def on_library_change(self, message, provider, path):
				print("New changes in the library found by provider: '{}'".format(provider))

		```
		"""
		if isinstance(paths, str):
			paths = [paths]
		for path in paths:
			assert path[:1] == '/', "Absolute path must be used when subscribing to the library changes"

			for provider in self.Libraries:
				await provider.subscribe(path)

__init__(app, service_name, paths=None) ¤

Initialize the LibraryService.

The library service is designed to "exist" in multiple instances, with different paths setups. For that reason, you have to provide unique service_name and there is no default value for that.

If paths are not provided, they are fetched from [library]providers configuration.

Parameters:

Name Type Description Default
app Application

The ASAB Application.

required
service_name str

A unique name of the service.

required
paths str | list[str] | None

Either single path or list of paths with which LibraryService is connected.

None
Source code in asab/library/service.py
def __init__(self, app: Application, service_name: str, paths: typing.Union[str, typing.List[str], None] = None):
	"""
	Initialize the LibraryService.

	The library service is designed to "exist" in multiple instances,
	with different `paths` setups.
	For that reason, you have to provide unique `service_name`
	and there is no _default_ value for that.

	If `paths` are not provided, they are fetched from `[library]providers` configuration.

	Args:
		app: The ASAB Application.
		service_name: A unique name of the service.
		paths (str | list[str] | None ): Either single path or list of paths with which LibraryService is connected.
	"""

	super().__init__(app, service_name)
	self.Libraries: list[LibraryProviderABC] = []
	self.Disabled: dict = {}
	self.DisabledPaths: list = []

	if paths is None:
		# load them from configuration
		try:
			paths = Config.getmultiline("library", "providers")
		except configparser.NoOptionError:
			L.critical("'providers' option is not present in configuration section 'library'.")
			raise SystemExit("Exit due to a critical configuration error.")

	# paths can be string if specified as argument
	if isinstance(paths, str):
		paths = re.split(r"\s+", paths)

	for layer, path in enumerate(paths):
		# Create library for each layer of paths
		self._create_library(path, layer)

	app.PubSub.subscribe("Application.tick/60!", self._on_tick60)

check_disabled(path, tenant=None) ¤

Check if the item specified in path is disabled, either globally or for the specified tenant.

Parameters:

Name Type Description Default
path str

Path to the item to be checked.

required
tenant str | None

The tenant to apply. If not specified, the global access is assumed.

None

Returns:

Type Description
bool

True if the item is disabled for the tenant.

Source code in asab/library/service.py
def check_disabled(self, path: str, tenant: typing.Optional[str] = None) -> bool:
	"""
	Check if the item specified in path is disabled, either globally or for the specified tenant.

	Args:
		path (str): Path to the item to be checked.
		tenant (str | None): The tenant to apply. If not specified, the global access is assumed.

	Returns:
		`True` if the item is disabled for the tenant.
	"""
	if not isinstance(path, str) or not path:
		raise ValueError("The 'path' must be a non-empty string.")

	# First check disabled by path
	for dp, disabled in self.DisabledPaths:
		if path.startswith(dp):
			if '*' in disabled:
				# Path is disabled for everybody
				return True

			if tenant is not None and tenant in disabled:
				# Path is disabled for a specified tenant
				return True

	# Then check for a specific item entries

	disabled = self.Disabled.get(path)

	if disabled is None:
		return False

	if '*' in disabled:
		# Item is disabled for everybody
		return True

	if tenant is not None and tenant in disabled:
		# Item is disabled for a specified tenant
		return True

	return False

export(path='/', tenant=None, remove_path=False) async ¤

Return a file-like stream containing a gzipped tar archive of the library contents of the path.

Parameters:

Name Type Description Default
path str

The path to export.

'/'
tenant str | None

The tenant to use for the operation.

None
remove_path bool

If True, the path will be removed from the tar file.

False

Returns:

Type Description
IO

A file object containing a gzipped tar archive.

Source code in asab/library/service.py
async def export(self, path: str = "/", tenant: typing.Optional[str] = None, remove_path: bool = False) -> typing.IO:
	"""
	Return a file-like stream containing a gzipped tar archive of the library contents of the path.

	Args:
		path: The path to export.
		tenant (str | None ): The tenant to use for the operation.
		remove_path: If `True`, the path will be removed from the tar file.

	Returns:
		A file object containing a gzipped tar archive.
	"""

	# Directory path must start with '/'
	assert path[:1] == '/', "Directory path must start with a forward slash (/). For example: /library/Templates/"
	# Directory path must end with '/'
	assert path[-1:] == '/', "Directory path must end with a forward slash (/). For example: /library/Templates/"
	# Directory path cannot contain '//'
	assert '//' not in path, "Directory path cannot contain double slashes (//). Example format: /library/Templates/"

	fileobj = tempfile.TemporaryFile()
	tarobj = tarfile.open(name=None, mode='w:gz', fileobj=fileobj)

	items = await self._list(path, tenant, providers=self.Libraries[:1])
	recitems = list(items[:])

	while len(recitems) > 0:

		item = recitems.pop(0)
		if item.type != 'dir':
			continue

		child_items = await self._list(item.name, tenant, providers=item.providers)
		items.extend(child_items)
		recitems.extend(child_items)

	for item in items:
		if item.type != 'item':
			continue
		my_data = await self.Libraries[0].read(item.name)
		if remove_path:
			assert item.name.startswith(path)
			tar_name = item.name[len(path):]
		else:
			tar_name = item.name
		info = tarfile.TarInfo(tar_name)
		my_data.seek(0, io.SEEK_END)
		info.size = my_data.tell()
		my_data.seek(0, io.SEEK_SET)
		info.mtime = time.time()
		tarobj.addfile(tarinfo=info, fileobj=my_data)

	tarobj.close()
	fileobj.seek(0)
	return fileobj

find(path) async ¤

Searches for files with a specific name within a library, using the provided path.

The method traverses the library directories, looking for files that match the given filename. It returns a list of paths leading to these files, or None if no such files are found.

Parameters:

Name Type Description Default
path str

The path specifying the filename and its location within the library. The path should start with a forward slash and include the filename. Example: '/library/Templates/.setup.yaml'

required

Returns:

Type Description
Optional[List[str]]

typing.Optional[typing.List[str]]: A list containing the paths to the found files, or anempty list` if no files are found that match the filename.

Source code in asab/library/service.py
async def find(self, path: str) -> typing.Optional[typing.List[str]]:
	"""
	Searches for files with a specific name within a library, using the provided path.

	The method traverses the library directories, looking for files that match the given filename.
	It returns a list of paths leading to these files, or `None` if no such files are found.

	Args:
		path (str): The path specifying the filename and its location within the library.
					The path should start with a forward slash and include the filename.
					Example: '/library/Templates/.setup.yaml'

	Returns:
		typing.Optional[typing.List[str]]: A list containing the paths to the found files,
										`or an `empty list` if no files are found that match the filename.
	"""
	# File path must start with '/'
	assert path[:1] == '/', "Item path '{}' must start with a forward slash (/). For example: /library/Templates/.setup.yaml".format(path)
	# File path must end with the filename
	assert len(os.path.splitext(path)[1]) > 0, "Item path '{}' must end with an extension. For example: /library/Templates/item.json".format(path)

	results = []
	for library in self.Libraries:
		found_files = await library.find(path)
		if found_files:
			results.extend(found_files)

	return results

is_ready() ¤

Check if all the libraries are ready.

Returns:

Type Description
bool

True if all libraries are ready, otherwise False.

Source code in asab/library/service.py
def is_ready(self) -> bool:
	"""
	Check if all the libraries are ready.

	Returns:
		True if all libraries are ready, otherwise False.
	"""
	if len(self.Libraries) == 0:
		return False

	return functools.reduce(
		lambda x, provider: provider.IsReady and x,
		self.Libraries,
		True
	)

list(path='/', tenant=None, recursive=False) async ¤

List the directory of the library specified by the path that are enabled for the specified tenant. This method can be used only after the Library is ready.

Parameters:

Name Type Description Default
path str

Path to the directory.

'/'
tenant str | None

If specified, items that are enabled for the tenant are filtered.

None
recursive bool

If True, return a list of items located at path and its subdirectories.

False

Returns:

Type Description
List[LibraryItem]

List of items that are enabled for the tenant.

Source code in asab/library/service.py
async def list(self, path: str = "/", tenant: typing.Optional[str] = None, recursive: bool = False) -> typing.List[LibraryItem]:
	"""
	List the directory of the library specified by the path that are enabled for the specified tenant. This method can be used only after the Library is ready.

	Args:
		path (str): Path to the directory.
		tenant (str | None): If specified, items that are enabled for the tenant are filtered.
		recursive (bool): If `True`, return a list of items located at `path` and its subdirectories.

	Returns:
		List of items that are enabled for the tenant.
	"""

	# Directory path must start with '/'
	assert path[:1] == '/', "Directory path must start with a forward slash (/). For example: /library/Templates/"
	# Directory path must end with '/'
	assert path[-1:] == '/', "Directory path must end with a forward slash (/). For example: /library/Templates/"
	# Directory path cannot contain '//'
	assert '//' not in path, "Directory path cannot contain double slashes (//). Example format: /library/Templates/"

	# List requested level using all available providers
	items = await self._list(path, tenant, providers=self.Libraries)

	if recursive:
		# If recursive scan is requested, then iterate thru list of items
		# find 'dir' types there and list them.
		# Output of this list is attached to the list for recursive scan
		# and also to the final output
		recitems = list(items[:])

		while len(recitems) > 0:

			item = recitems.pop(0)
			if item.type != 'dir':
				continue

			child_items = await self._list(item.name, tenant, providers=item.providers)
			items.extend(child_items)
			recitems.extend(child_items)
	return items

open(path, tenant=None) async ¤

Read the content of the library item specified by path in a SAFE way, protected by a context manager/with statement. This method can be used only after the Library is ready.

Example:

async with self.App.LibraryService.open(path) as b:
        if b is None:
                return None

        text = b.read().decode("utf-8")
Source code in asab/library/service.py
@contextlib.asynccontextmanager
async def open(self, path: str, tenant: typing.Optional[str] = None):
	"""
	Read the content of the library item specified by `path` in a SAFE way, protected by a context manager/with statement.
	This method can be used only after the Library is ready.

	Example:

	```python
	async with self.App.LibraryService.open(path) as b:
		if b is None:
			return None

		text = b.read().decode("utf-8")
	```
	"""

	itemio = await self.read(path, tenant)
	if itemio is None:
		yield itemio
	else:
		try:
			yield itemio
		finally:
			itemio.close()

read(path, tenant=None) async ¤

THIS IS OBSOLETED METHOD, USE open(...) !!!

Read the content of the library item specified by path. This method can be used only after the Library is ready.

Parameters:

Name Type Description Default
path str

Path to the file, LibraryItem.name can be used directly.

required
tenant str | None

The tenant to apply. If not specified, the global access is assumed.

None

Returns:

Type Description
IO | None

Readable stream with the content of the library item. None is returned if the item is not found or if it is disabled (either globally or for the specified tenant).

Example:

itemio = await library.read('/path', 'tenant')
if itemio is not None:
        with itemio:
                return itemio.read()
Source code in asab/library/service.py
async def read(self, path: str, tenant: typing.Optional[str] = None) -> typing.Optional[typing.IO]:
	"""
	THIS IS OBSOLETED METHOD, USE `open(...)` !!!

	Read the content of the library item specified by `path`. This method can be used only after the Library is ready.

	Args:
		path (str): Path to the file, `LibraryItem.name` can be used directly.
		tenant (str | None): The tenant to apply. If not specified, the global access is assumed.

	Returns:
		( IO | None ): Readable stream with the content of the library item. `None` is returned if the item is not found or if it is disabled (either globally or for the specified tenant).

	Example:

	```python
	itemio = await library.read('/path', 'tenant')
	if itemio is not None:
		with itemio:
			return itemio.read()
	```
	"""
	# item path must start with '/'
	assert path[:1] == '/', "Item path must start with a forward slash (/). For example: /library/Templates/item.json"
	# Item path must end with the extension
	assert len(os.path.splitext(path)[1]) > 0, "Item path must end with an extension. For example: /library/Templates/item.json"

	if self.check_disabled(path, tenant=tenant):
		return None

	for library in self.Libraries:
		itemio = await library.read(path)
		if itemio is None:
			continue
		return itemio

	return None

subscribe(paths) async ¤

Subscribe to changes for specified paths of the library.

In order to notify on changes in the Library, this method must be used after the Library is ready.

Parameters:

Name Type Description Default
paths str | list[str]

Either single path or list of paths to be subscribed. All the paths must be absolute (start with '/').

required

Examples:

class MyApplication(asab.Application):

        async def initialize(self):
                self.PubSub.subscribe("Library.ready!", self.on_library_ready
                self.PubSub.subscribe("Library.change!", self.on_library_change)

        async def on_library_ready(self, event_name, library=None):
                await self.LibraryService.subscribe(["/alpha","/beta"])

        def on_library_change(self, message, provider, path):
                print("New changes in the library found by provider: '{}'".format(provider))

Source code in asab/library/service.py
async def subscribe(self, paths: typing.Union[str, typing.List[str]]) -> None:
	"""
	Subscribe to changes for specified paths of the library.

	In order to notify on changes in the Library, this method must be used after the Library is ready.

	Args:
		paths (str | list[str]): Either single path or list of paths to be subscribed. All the paths must be absolute (start with '/').

	Examples:
	```python
	class MyApplication(asab.Application):

		async def initialize(self):
			self.PubSub.subscribe("Library.ready!", self.on_library_ready
			self.PubSub.subscribe("Library.change!", self.on_library_change)

		async def on_library_ready(self, event_name, library=None):
			await self.LibraryService.subscribe(["/alpha","/beta"])

		def on_library_change(self, message, provider, path):
			print("New changes in the library found by provider: '{}'".format(provider))

	```
	"""
	if isinstance(paths, str):
		paths = [paths]
	for path in paths:
		assert path[:1] == '/', "Absolute path must be used when subscribing to the library changes"

		for provider in self.Libraries:
			await provider.subscribe(path)

asab.library.item.LibraryItem dataclass ¤

The data class that contains the info about a specific item in the library.

Attributes:

Name Type Description
name str

The absolute path of the Item. It can be directly fed into LibraryService.read(...).

type str

Can be either dir if the Item is a directory or item if Item is of any other type.

layer int

The number of highest layer in which this Item is found. The higher the number, the lower the layer is.

providers list

List of LibraryProvider objects containing this Item.

disabled bool

True if the Item is disabled, False otherwise. If the Item is disabled, LibraryService.read(...) will return None.

override int

If True, this item is marked as an override for the providers with the same Item name.

Source code in asab/library/item.py
@dataclasses.dataclass
class LibraryItem:
	"""
	The data class that contains the info about a specific item in the library.

	Attributes:
		name (str): The absolute path of the Item. It can be directly fed into `LibraryService.read(...)`.
		type (str): Can be either `dir` if the Item is a directory or `item` if Item is of any other type.
		layer (int): The number of highest layer in which this Item is found. The higher the number, the lower the layer is.
		providers (list): List of `LibraryProvider` objects containing this Item.
		disabled (bool): `True` if the Item is disabled, `False` otherwise. If the Item is disabled, `LibraryService.read(...)` will return `None`.
		override (int): If `True`, this item is marked as an override for the providers with the same Item name.
	"""

	name: str
	type: str
	layer: int
	providers: list
	disabled: bool = False
	override: int = 0  # Default value for override is False