The ASAB Library is the concept of shared data content across microservices in the cluster.
In the cluster/cloud microservice architectures, all microservices must have access to unified resources.
The Library provides a read-only interface for listing and reading this content.
The Library is designed to be read-only. It also allows to "stack" various libraries into one view (overlayed), merging the content of each library into one united space.
The library can also notify the ASAB microservice about changes, e.g. for automated update/reload.
The library content can be organized into an unlimited number of layers.
Each layer is represented by a provider (e.g. filesystem, zookeeper, git, ...) with a specific configuration. Two layers can have the same provider but different base paths.
The layers of the library are like slices of Swiss cheese layered on top of each other.
Only if there is a hole in the top layer can you see the layer that shows through underneath.
It means that files of the upper layer overwrite files with the same path in the lower layers.
Illustration of ASAB Library layers. Green items are visible, grey items are hidden. Moreover, Layer 1 contains .disabled.yaml file containing the list of disabled files.
Tip
In the most applications, it is common to create the first layer with Zookeeper provider and the layers beneath with git or libsreg provider. This allows you to quickly modify items of the Library on the first layer.
The library concept supports multi-tenancy. By default, all items of the library are visible for everyone, but you can disable some of them for specific tenants.
In order to disable some items of the library, create a file /.disabled.yaml. This file must be created on the first layer.
In the following example, file1.txt is disabled for tenant-1 and tenant-2, file2.txt for tenant-1 and file3.txt for every tenant.
When disabling a file for all tenants with a star, don't forget to close it in quotation marks. Otherwise, YAML would interpret star as an alias. Read more about anchors and aliases.
The library service may exist in multiple instances, with different paths setups.
For that reason, you have to provide a unique service_name and there is no default value for that.
Each Library item is represented by LibraryItem dataclass. Read more in the reference section.
Initializes the Library Service. Remember to specify a unique service_name.
When the Library is initialized, Library.ready! PubSub message is emitted.
The callback has to possess two arguments. event_name is the message "Library.ready!", library is the specific provider with
which is the Library initialized.
list() method returns list of LibraryItems. For more information, see the reference section.
item.type can be either 'item' or 'dir'.
read() coroutine returns item IO object or None if the file is disabled.
The Library is created in not-ready state. After the connection with the technologies behind is established, every library provider changes its state to ready.
The Library switches to ready state after all its providers are ready.
If some of the providers is disconnected, the Library switches to not-ready state again till the connection is reestablished.
Every time the Library changes its state, PubSub message is published, with the arguments provider and path.
Some providers are able to detect changes of the library items.
Example
classMyApplication(asab.Application):asyncdefinitialize(self):self.PubSub.subscribe("Library.ready!",self.on_library_readyself.PubSub.subscribe("Library.change!",self.on_library_change)asyncdefon_library_ready(self,event_name,library=None):awaitself.LibraryService.subscribe(["/asab"])#(1)!defon_library_change(self,message,provider,path):#(2)!print("New changes in the library found by provider: '{}'".format(provider))
self.LibraryService.subscribe() method takes either a single path as a string or multiple paths in list and watches for changes in them.
This coroutine takes three arguments: message (Library.change! in this case), provider (name of the provider that has detected changes) and path (the path where changes were made).
Info
Note that the some of the providers detect changes immediately while others detect them periodically. For example, git provider pulls the repository every minute, only after that the changes can be detected.
The most basic provider that reads data from the local filesystem.
The notification on changes functionality is available only for Linux systems,
as it uses inotify.
The ASAB Library supports multi-tenancy, allowing you to manage and separate content specific to different tenants within layers in library. To handle tenant-specific contexts, use Tenant.set() to set the context and Tenant.reset() to reset it after processing.
Implementing Tenant-Specific Logic in Your Application¤
To handle tenant-specific logic, make sure the AuthService is present in app.py:
# Install the AuthService for tenant-based authentication
self.AuthService = asab.web.auth.AuthService(self)
self.AuthService.install(self.WebContainer)
The following example demonstrates how to iterate through multiple tenants, setting and resetting the tenant context for each one:
import asab.contextvars
async def process_tenants(self):
for tenant in self.Tenants:
# Set the tenant context
tenant_context = asab.contextvars.Tenant.set(tenant)
try:
# Process tenant-specific logic here
print(f"Processing workflows for tenant: {tenant}")
finally:
# Reset the tenant context
asab.contextvars.Tenant.reset(tenant_context)
In this method:
- Tenant.set(tenant) establishes the context for the current tenant.
- Tenant.reset(tenant_context) ensures the context is cleared after processing, preventing any unintended carryover.
If Container Public Access Level is not set to "Public access", then
"Access Policy" must be created with "Read" and "List" permissions
and "Shared Access Signature" (SAS) query string must be added to a
URL in a configuration:
GitLab uses deploy tokens to enable authentication of deployment tasks,
independent of a user account. Authentication through deploy tokens is
the only supported option for now.
If you want to create a deploy token for your GitLab repository, follow
these steps from the
manual:
Go to Settings > Repository > Deploy tokens section in your
repository. (Note that you have to possess a "Maintainer" or
"Owner" role for the repository.)
Expand the "Deploy tokens" section. The list of current Active
Deploy Tokens will be displayed.
Complete the fields and scopes. We recommend a custom "username",
as you will need it later for the URL in the configuration.
Record the deploy token's values before leaving or refreshing the
page! After that, you cannot access it again.
After the deploy token is created, use the URL for the repository in the
following format:
The git provider clones the repository into a temporary directory. The
default path for the cloned repository is
/tmp/asab.library.git/ and it can be changed manually:
The libsreg provider downloads the content from the distribution URL.
The distribution URL points to HTTP(S) server where content archives are published.
*.tar.xz: This is the TAR/XZ archive of the actual content
*.tar.xz.sha256: SHA256 checksum of the archive
The structure of the distribution is as follows:
/{archname}/{archname}-{version}.tar.xz
archname: A name of the distribution archive, my-library in the example above
version: A version of the distribution archive, master, production are typically GIT branches, v43.41 is a GIT tag.
Tip
This provider is designed to use Microsoft Azure Storage as a distribution point.
Is is assumed that the content archives are uploaded to the distribution point using CI/CD.
The order of providers is important, the priority (or layering) is top-down.
Each library provider is specified by URL/URI schema:
zk:// or zookeeper:// for ZooKeeper provider
file:// or local path for FileSystem provider
azure+https:// for Microsoft Azure Storage provider.
git+https:// for Git provider.
libsreg+https:// for Libraries provider.
The first provider is responsible for providing /.disabled.yaml.
A library is created in “not ready” state, each provider then informs the library when it is ready
(eg. Zookeeper provider needs to connect to Zookeeper servers). Only after all providers are ready, the library itself becomes ready.
The library indicates that by the PubSub event Library.ready!.
classLibraryService(Service):""" Configuration: ```ini [library] providers: provider+1:// provider+2:// provider+3:// ``` The order of providers is important, the priority (or layering) is top-down. Each library provider is specified by URL/URI schema: * `zk://` or `zookeeper://` for ZooKeeper provider * `file://` or local path for FileSystem provider * `azure+https://` for Microsoft Azure Storage provider. * `git+https://` for Git provider. * `libsreg+https://` for Libraries provider. The first provider is responsible for providing `/.disabled.yaml`. A library is created in “not ready” state, each provider then informs the library when it is ready (eg. Zookeeper provider needs to connect to Zookeeper servers). Only after all providers are ready, the library itself becomes ready. The library indicates that by the PubSub event `Library.ready!`. """def__init__(self,app:Application,service_name:str,paths:typing.Union[str,typing.List[str],None]=None):""" Initialize the LibraryService. The library service is designed to "exist" in multiple instances, with different `paths` setups. For that reason, you have to provide unique `service_name` and there is no _default_ value for that. If `paths` are not provided, they are fetched from `[library]providers` configuration. Args: app: The ASAB Application. service_name: A unique name of the service. paths (str | list[str] | None ): Either single path or list of paths with which LibraryService is connected. """super().__init__(app,service_name)self.Libraries:list[LibraryProviderABC]=[]self.Disabled:dict={}self.DisabledPaths:list=[]self.Favorites:dict={}self.FavoritePaths:list=[]ifpathsisNone:# load them from configurationtry:paths=Config.getmultiline("library","providers")exceptconfigparser.NoOptionError:L.critical("'providers' option is not present in configuration section 'library'.")raiseSystemExit("Exit due to a critical configuration error.")# paths can be string if specified as argumentifisinstance(paths,str):paths=re.split(r"\s+",paths)forlayer,pathinenumerate(paths):# Create library for each layer of pathsself._create_library(path,layer)app.PubSub.subscribe("Application.tick/60!",self._on_tick60)asyncdeffinalize(self,app):whilelen(self.Libraries)>0:lib=self.Libraries.pop(-1)awaitlib.finalize(self.App)asyncdef_on_tick60(self,message_type):awaitself._read_disabled()awaitself._read_favorites()def_create_library(self,path,layer):library_provider=Noneifpath.startswith('zk://')orpath.startswith('zookeeper://'):from.providers.zookeeperimportZooKeeperLibraryProviderlibrary_provider=ZooKeeperLibraryProvider(self,path,layer)elifpath.startswith('./')orpath.startswith('/')orpath.startswith('file://'):from.providers.filesystemimportFileSystemLibraryProviderlibrary_provider=FileSystemLibraryProvider(self,path,layer)elifpath.startswith('azure+https://'):from.providers.azurestorageimportAzureStorageLibraryProviderlibrary_provider=AzureStorageLibraryProvider(self,path,layer)elifpath.startswith('git+'):from.providers.gitimportGitLibraryProviderlibrary_provider=GitLibraryProvider(self,path,layer)elifpath.startswith('libsreg+'):from.providers.libsregimportLibsRegLibraryProviderlibrary_provider=LibsRegLibraryProvider(self,path,layer)elifpath==''orpath.startswith("#")orpath.startswith(";"):# This is empty or commented linereturnelse:L.error("Incorrect/unknown provider for '{}'".format(path))raiseSystemExit("Exit due to a critical configuration error.")self.Libraries.append(library_provider)defis_ready(self)->bool:""" Check if all the library providers are ready. Returns: True if every provider is ready; if even one provider is not, returns False. """ifnotself.Libraries:returnFalseforproviderinself.Libraries:ifnotprovider.IsReady:returnFalsereturnTrueasyncdef_set_ready(self,provider):iflen(self.Libraries)==0:returnif(provider==self.Libraries[0])andprovider.IsReady:awaitself._read_disabled()awaitself._read_favorites()ifself.is_ready():L.log(LOG_NOTICE,"is ready.",struct_data={'name':self.Name})self.App.PubSub.publish("Library.ready!",self)elifnotprovider.IsReady:L.log(LOG_NOTICE,"is NOT ready.",struct_data={'name':self.Name})self.App.PubSub.publish("Library.not_ready!",self)def_ensure_ready(self):ifnotself.is_ready():raiseLibraryNotReadyError("Library is not ready yet.")asyncdeffind(self,path:str)->typing.List[str]:""" Search for files with a specific name within a library, using the provided path. The method traverses the library directories, looking for files that match the given filename. It returns a list of paths leading to these files, empty if no items are found. Args: path (str): Location of the file in Library. It must start with a forward slash and include the filename. Example: '/Dashboards/Cisco/Overview.json' Returns: typing.List[str]: A list of paths to the found files. If no files are found, the list will be empty. """_validate_path_item(path)results=[]forlibraryinself.Libraries:found_files=awaitlibrary.find(path)iffound_files:results.extend(found_files)returnresultsasyncdefread(self,path:str)->typing.Optional[typing.IO]:""" THIS IS OBSOLETED METHOD, USE `open(...)` !!! Read the content of the library item specified by `path`. This method can be used only after the Library is ready. Args: path (str): Path to the file, `LibraryItem.name` can be used directly. tenant (str | None): The tenant to apply. If not specified, the global access is assumed. Returns: ( IO | None ): Readable stream with the content of the library item. `None` is returned if the item is not found or if it is disabled (either globally or for the specified tenant). Example: ```python itemio = await library.read('/path', 'tenant') if itemio is not None: with itemio: return itemio.read() ``` """self._ensure_ready()LogObsolete.warning("Method 'LibraryService.read()' is obsolete. Use 'LibraryService.open()' method instead.")_validate_path_item(path)ifself.check_disabled(path):returnNoneforlibraryinself.Libraries:itemio=awaitlibrary.read(path)ifitemioisNone:continuereturnitemioreturnNone@contextlib.asynccontextmanagerasyncdefopen(self,path:str):""" Read the content of the library item specified by `path` in a SAFE way, protected by a context manager/with statement. This method can be used only after the Library is ready. Example: ```python async with self.LibraryService.open(path) as io: if io is None: return None text = b.read().decode("utf-8") ``` """self._ensure_ready()_validate_path_item(path)# Same functionality as in read() methoditemio=Nonedisabled=self.check_disabled(path)ifnotdisabled:forlibraryinself.Libraries:itemio=awaitlibrary.read(path)ifitemioisnotNone:breakifitemioisNone:yielditemioelse:try:yielditemiofinally:itemio.close()asyncdeflist(self,path:str="/",recursive:bool=False)->typing.List[LibraryItem]:""" List the directory of the library specified by the path that are enabled for the specified tenant. This method can be used only after the Library is ready. **WARNING:** Tenant must be set in the context variable! If it is not set automatically (e.g. from web request), it must be set manually. Example: ```python try: tenant_ctx = asab.contextvars.Tenant.set(tenant) items = self.LibraryService.list(path) ... finally: asab.contextvars.Tenant.reset(tenant_ctx) ``` Args: path (str): Path to the directory. recursive (bool): If `True`, return a list of items located at `path` and its subdirectories. Returns: List of items that are enabled for the tenant. """self._ensure_ready()_validate_path_directory(path)# List requested level using all available providersitems=awaitself._list(path,providers=self.Libraries)ifrecursive:# If recursive scan is requested, then iterate thru list of items# find 'dir' types there and list them.# Output of this list is attached to the list for recursive scan# and also to the final outputrecitems=list(items[:])whilelen(recitems)>0:item=recitems.pop(0)ifitem.type!='dir':continuechild_items=awaitself._list(item.name,providers=item.providers)items.extend(child_items)recitems.extend(child_items)returnitemsasyncdef_list(self,path,providers):""" Lists items from all providers, merging items with the same name, and simply adding every layer an item belongs to. Args: path (str): The path to list items from. providers (list): A list of providers to query. Returns: list: A list of unique LibraryItem objects with merged layers. """items:list[LibraryItem]=[]unique_items:dict[str,LibraryItem]={}# Launch tasks to list items from each provider.tasks=[(self.Libraries.index(provider),asyncio.create_task(provider.list(path)))forproviderinproviders]forouter_layer,taskintasks:try:provider_items:list[LibraryItem]=awaittaskexceptKeyError:# The path doesn't exist in this provider.continueexceptException:L.exception("Unexpected error when listing path '{}' on layer {}.".format(path,outer_layer))continueforiteminprovider_items:# Check if the item is disabled.item.disabled=self.check_disabled(item.name)item.favorite=self.check_favorite(item.name)# Use the layers as provided by the provider; if empty, fall back to outer_layer.provider_layers=item.layersifitem.layerselse[outer_layer]ifitem.nameinunique_items:existing_item=unique_items[item.name]# Merge providers for directories (avoid duplicates).ifexisting_item.type=="dir"anditem.type=="dir":forpinitem.providers:ifpnotinexisting_item.providers:existing_item.providers.append(p)# Extend the layers list by adding each new layer (without sorting).forlayer_valueinprovider_layers:iflayer_valuenotinexisting_item.layers:existing_item.layers.append(layer_value)else:# New item: use the provided layers.item.layers=provider_layersunique_items[item.name]=itemitems.append(item)# Do not sort items; return them in the order they were merged.returnitemsasyncdef_read_favorites(self):""" Load favorites from '/.favorites.yaml' into: - self.Favorites: { '/path/file.ext': ['tenant', '*', ...] } - self.FavoritePaths: [ ('/path/folder/', ['tenant', '*', ...]), ... ] Expected YAML shape: /path: tenants: - system """fav_data=Nonetry:fav_file=awaitself.Libraries[0].read('/.favorites.yaml')exceptExceptionase:L.warning("Failed to read '/.favorites.yaml': {}.".format(e))self.Favorites={}self.FavoritePaths=[]returniffav_fileisNone:self.Favorites={}self.FavoritePaths=[]returntry:fav_data=yaml.load(fav_file,Loader=yaml.CSafeLoader)exceptException:L.exception("Failed to parse '/.favorites.yaml'")self.Favorites={}self.FavoritePaths=[]returnfinally:ifhasattr(fav_file,"close"):try:fav_file.close()exceptException:passiffav_dataisNone:self.Favorites={}self.FavoritePaths=[]returnifnotisinstance(fav_data,dict):L.warning("Unexpected favorites format ({}). Resetting.".format(type(fav_data).__name__))self.Favorites={}self.FavoritePaths=[]returnfiles={}folders=[]fork,vinfav_data.items():ifnotisinstance(k,str):L.warning("Ignoring non-string favorite key: {}".format(k))continue# normalize tenants listtenants=[]ifisinstance(v,dict)and'tenants'inv:tv=v.get('tenants')ifisinstance(tv,list):tenants=[str(t)fortintv]elifisinstance(tv,set):tenants=[str(t)fortinlist(tv)]eliftvisNone:tenants=[]else:tenants=[str(tv)]elifisinstance(v,list):# Back-compat: allow direct listtenants=[str(t)fortinv]else:tenants=[str(v)]ifk.endswith('/'):folders.append((k,tenants))else:files[k]=tenants# Sort folders shortest→longest (consistent with DisabledPaths sort)folders.sort(key=lambdax:len(x[0]))self.Favorites=filesself.FavoritePaths=foldersasyncdef_read_disabled(self,publish_changes=False):old_disabled=self.Disabled.copy()old_disabled_paths=list(self.DisabledPaths)# Read the filedisabled_file=awaitself.Libraries[0].read('/.disabled.yaml')ifdisabled_fileisNone:self.Disabled={}self.DisabledPaths=[]else:try:disabled_data=yaml.load(disabled_file,Loader=yaml.CSafeLoader)exceptException:L.exception("Failed to parse '/.disabled.yaml'")self.Disabled={}self.DisabledPaths=[]returnifdisabled_dataisNone:self.Disabled={}self.DisabledPaths=[]returnifisinstance(disabled_data,set):# Backward compatibility (August 2023)self.Disabled={key:'*'forkeyindisabled_data}self.DisabledPaths=[]else:self.Disabled={}self.DisabledPaths=[]fork,vindisabled_data.items():ifk.endswith('/'):self.DisabledPaths.append((k,v))else:self.Disabled[k]=vself.DisabledPaths.sort(key=lambdax:len(x[0]))# If requested, compare old and new disables to notify subscribers via Library.change!ifpublish_changes:awaitself._publish_change_for_disabled_diff(old_disabled,old_disabled_paths)asyncdef_publish_change_for_disabled_diff(self,old_disabled,old_disabled_paths):""" Compare old and new disabled data and publish Library.change! if any subscribed path is affected for the specific subscription target. """ifnotself.Libraries:return# Only the first provider (layer 0) owns and manages '/.disabled.yaml',# so all disable‐and‐subscription logic is driven from that topmost layer.provider=self.Libraries[0]# Check if the provider has Subscriptions attributesubscriptions=getattr(provider,"Subscriptions",None)ifsubscriptionsisNone:return# For each subscription (path + target)forp_target,p_pathinlist(subscriptions):# Check if something disabled under this path and target changedchanged=self._is_disabled_diff_affecting_path(p_path,old_disabled,old_disabled_paths,p_target)ifchanged:self.App.PubSub.publish("Library.change!",self,p_path)def_is_disabled_diff_affecting_path(self,sub_path,old_disabled,old_disabled_paths,target=None):""" Check if disabling changes affect the subscribed path for a specific target. For target=="tenant" (wildcard), we fire whenever the set of tenants changes. """# Normalize path (ensure it ends with / for folders)ifnotsub_path.endswith('/'):sub_path=sub_path+'/'# 1) File-level disablesforpathinold_disabled.keys()|self.Disabled.keys():ifnotpath.startswith(sub_path):continueold_entry=old_disabled.get(path)or[]new_entry=self.Disabled.get(path)or[]iftarget=="tenant":# Wildcard tenant subscription: any change in the list of tenantsifset(old_entry)!=set(new_entry):returnTrueelse:old_flag=self._is_disabled_for_target(old_entry,target)new_flag=self._is_disabled_for_target(new_entry,target)ifold_flag!=new_flag:returnTrue# 2) Folder-level disablesold_map={p:vforp,vinold_disabled_paths}new_map={p:vforp,vinself.DisabledPaths}forpathinold_map.keys()|new_map.keys():ifnotpath.startswith(sub_path):continueold_entry=old_map.get(path)or[]new_entry=new_map.get(path)or[]iftarget=="tenant":ifset(old_entry)!=set(new_entry):returnTrueelse:old_flag=self._is_disabled_for_target(old_entry,target)new_flag=self._is_disabled_for_target(new_entry,target)ifold_flag!=new_flag:returnTruereturnFalsedef_is_disabled_for_target(self,disabled_entry,target):""" Check if a disabled entry affects a specific target (global, tenant, or tenant ID). """ifdisabled_entryisNone:returnFalseiftargetisNoneortarget=="global":return"*"indisabled_entryiftarget=="tenant":# Wildcard tenant subscription (all tenants)returnbool(disabled_entry)ifisinstance(target,tuple)andtarget[0]=="tenant":tenant_id=target[1]return"*"indisabled_entryortenant_idindisabled_entryreturnFalsedefcheck_disabled(self,path:str)->bool:""" Check if the item specified in path is disabled, either globally or for the specified tenant. **WARNING:** When checking for items disabled for a tenant, it must be set in context variable before using this function! If it is not set automatically (e.g. from web request), it must be set manually. Example: 1. Is path disabled for a specific tenant? ```python try: tenant_ctx = asab.contextvars.Tenant.set(tenant) disabled = self.LibraryService.check_disabled(path) ... finally: asab.contextvars.Tenant.reset(tenant_ctx) ``` 2. Is path disabled globally? ```python disabled = self.LibraryService.check_disabled(path) ``` Args: path (str): Path to the item to be checked. Returns: `True` if the item is disabled for the tenant. """ifnotisinstance(path,str)ornotpath:raiseLibraryInvalidPathError(message="Argument 'path' must be a non-empty string.",path=path,)try:tenant=Tenant.get()exceptLookupError:tenant=None# First check disabled by pathfordp,disabledinself.DisabledPaths:ifpath.startswith(dp):if'*'indisabled:# Path is disabled for everybodyreturnTrueiftenantisnotNoneandtenantindisabled:# Path is disabled for a specified tenantreturnTrue# Then check for a specific item entriesdisabled=self.Disabled.get(path)ifdisabledisNone:returnFalseif'*'indisabled:# Item is disabled for everybodyreturnTrueiftenantisnotNoneandtenantindisabled:# Item is disabled for a specified tenantreturnTruereturnFalsedefcheck_favorite(self,path:str,inherit:bool=False)->bool:""" Check if `path` is marked as favorite for the current tenant (or globally via '*'). If `inherit` is True and a parent *folder* is favorited for the tenant/'*', any child path under that folder is considered favorited. **WARNING:** Tenant must be set in the context variable before using this function if you want tenant-aware behavior (same as check_disabled). Examples: 1) Tenant-aware check: ```python try: tenant_ctx = asab.contextvars.Tenant.set(tenant) is_fav = self.LibraryService.check_favorite(path) ... finally: asab.contextvars.Tenant.reset(tenant_ctx) ``` 2) Global-only check (any tenant): ```python is_fav = self.LibraryService.check_favorite(path) ``` Args: path (str): Path to the item or folder to be checked. inherit (bool): If True, a favorited folder marks all descendants as favorited. Returns: bool: True if favorited for current tenant or globally. """ifnotisinstance(path,str)ornotpath:raiseLibraryInvalidPathError(message="Argument 'path' must be a non-empty string.",path=path,)try:tenant=Tenant.get()exceptLookupError:tenant=None# 1) Folder favorites# - Exact match if not inheriting# - Prefix match if inheriting (folder favorite applies to children)ifinherit:forfp,fav_tenantsinself.FavoritePaths:ifpath.startswith(fp):if'*'infav_tenants:returnTrueiftenantisnotNoneandtenantinfav_tenants:returnTrueelse:# exact folder favorite (no inheritance)forfp,fav_tenantsinself.FavoritePaths:ifpath==fp:if'*'infav_tenants:returnTrueiftenantisnotNoneandtenantinfav_tenants:returnTrue# 2) Exact item favoritesfav_tenants=self.Favorites.get(path)iffav_tenantsisNone:returnFalseif'*'infav_tenants:returnTrueiftenantisnotNoneandtenantinfav_tenants:returnTruereturnFalseasyncdefget_item_metadata(self,path:str)->typing.Optional[dict]:""" Retrieve metadata for a specific file in the library, including its `target`. Args: path (str): The absolute path of the file to retrieve metadata for. Must start with '/' and include a filename with an extension. Returns: dict: Metadata for the specified file, including `target`, or None if not found. """# Validate the path format_validate_path_item(path)# Split into directory and filenamedirectory,filename=os.path.split(path)ifnotdirectoryornotfilename:L.warning("Invalid path '{}': missing directory or filename.".format(path))returnNone# Ensure directory ends with '/'ifnotdirectory.endswith('/'):directory+='/'try:# Fetch all items in the directoryitems=awaitself.list(directory)exceptExceptionase:L.warning("Failed to list items in directory '{}': {}".format(directory,e))returnNone# Use dictionary for faster lookupitems_dict={item.name:itemforiteminitems}# Retrieve the item by pathitem=items_dict.get(path)ifitemanditem.type=="item":# Match found; return metadata including `target`return{"name":item.name,"type":item.type,"layers":item.layers,"providers":item.providers,"disabled":item.disabled,"favorite":item.favorite,"override":item.override,}# Item not foundL.info("Item '{}' not found in directory '{}'.".format(filename,directory))returnNoneasyncdefexport(self,path:str="/",remove_path:bool=False)->typing.IO:""" Return a file-like stream containing a gzipped tar archive of the library contents of the path. Args: path: The path to export. tenant (str | None ): The tenant to use for the operation. remove_path: If `True`, the path will be removed from the tar file. Returns: A file object containing a gzipped tar archive. """self._ensure_ready()_validate_path_directory(path)fileobj=tempfile.TemporaryFile()tarobj=tarfile.open(name=None,mode='w:gz',fileobj=fileobj)items=awaitself._list(path,providers=self.Libraries[:1])recitems=list(items[:])whilelen(recitems)>0:item=recitems.pop(0)ifitem.type!='dir':continuechild_items=awaitself._list(item.name,providers=item.providers)items.extend(child_items)recitems.extend(child_items)foriteminitems:ifitem.type!='item':continuemy_data=awaitself.Libraries[0].read(item.name)ifremove_path:assertitem.name.startswith(path)tar_name=item.name[len(path):]else:tar_name=item.nameinfo=tarfile.TarInfo(tar_name)my_data.seek(0,io.SEEK_END)info.size=my_data.tell()my_data.seek(0,io.SEEK_SET)info.mtime=time.time()tarobj.addfile(tarinfo=info,fileobj=my_data)tarobj.close()fileobj.seek(0)returnfileobjasyncdefsubscribe(self,paths:typing.Union[str,typing.List[str]],target:typing.Union[str,tuple,None]=None,)->None:""" Subscribe to changes for specified paths of the library. In order to notify on changes in the Library, this method must be used after the Library is ready. Args: paths (str | list[str]): Either single path or list of paths to be subscribed. All the paths must be absolute (start with '/'). target: In which target to watch the changes. Possible values: - "global" to watch global path changes - "tenant" to watch path changes in tenants - ("tenant", TENANT_ID) to watch path changes in one specified tenant TENANT_ID Examples: ```python class MyApplication(asab.Application): async def initialize(self): self.PubSub.subscribe("Library.ready!", self.on_library_ready self.PubSub.subscribe("Library.change!", self.on_library_change) async def on_library_ready(self, event_name, library=None): await self.LibraryService.subscribe(["/path1/","/path2/"]) def on_library_change(self, message, provider, path): print("New changes in the library found by provider: '{}'".format(provider)) ``` """self._ensure_ready()ifisinstance(paths,str):paths=[paths]forpathinpaths:ifnotpath.startswith("/"):raiseLibraryInvalidPathError(message="Directory path must start with '/' when subscribing to Library changes.",path=path,)forproviderinself.Libraries:awaitprovider.subscribe(path,target)
The library service is designed to "exist" in multiple instances,
with different paths setups.
For that reason, you have to provide unique service_name
and there is no default value for that.
If paths are not provided, they are fetched from [library]providers configuration.
def__init__(self,app:Application,service_name:str,paths:typing.Union[str,typing.List[str],None]=None):""" Initialize the LibraryService. The library service is designed to "exist" in multiple instances, with different `paths` setups. For that reason, you have to provide unique `service_name` and there is no _default_ value for that. If `paths` are not provided, they are fetched from `[library]providers` configuration. Args: app: The ASAB Application. service_name: A unique name of the service. paths (str | list[str] | None ): Either single path or list of paths with which LibraryService is connected. """super().__init__(app,service_name)self.Libraries:list[LibraryProviderABC]=[]self.Disabled:dict={}self.DisabledPaths:list=[]self.Favorites:dict={}self.FavoritePaths:list=[]ifpathsisNone:# load them from configurationtry:paths=Config.getmultiline("library","providers")exceptconfigparser.NoOptionError:L.critical("'providers' option is not present in configuration section 'library'.")raiseSystemExit("Exit due to a critical configuration error.")# paths can be string if specified as argumentifisinstance(paths,str):paths=re.split(r"\s+",paths)forlayer,pathinenumerate(paths):# Create library for each layer of pathsself._create_library(path,layer)app.PubSub.subscribe("Application.tick/60!",self._on_tick60)
Check if the item specified in path is disabled, either globally or for the specified tenant.
WARNING: When checking for items disabled for a tenant,
it must be set in context variable before using this function!
If it is not set automatically (e.g. from web request), it must be set manually.
defcheck_disabled(self,path:str)->bool:""" Check if the item specified in path is disabled, either globally or for the specified tenant. **WARNING:** When checking for items disabled for a tenant, it must be set in context variable before using this function! If it is not set automatically (e.g. from web request), it must be set manually. Example: 1. Is path disabled for a specific tenant? ```python try: tenant_ctx = asab.contextvars.Tenant.set(tenant) disabled = self.LibraryService.check_disabled(path) ... finally: asab.contextvars.Tenant.reset(tenant_ctx) ``` 2. Is path disabled globally? ```python disabled = self.LibraryService.check_disabled(path) ``` Args: path (str): Path to the item to be checked. Returns: `True` if the item is disabled for the tenant. """ifnotisinstance(path,str)ornotpath:raiseLibraryInvalidPathError(message="Argument 'path' must be a non-empty string.",path=path,)try:tenant=Tenant.get()exceptLookupError:tenant=None# First check disabled by pathfordp,disabledinself.DisabledPaths:ifpath.startswith(dp):if'*'indisabled:# Path is disabled for everybodyreturnTrueiftenantisnotNoneandtenantindisabled:# Path is disabled for a specified tenantreturnTrue# Then check for a specific item entriesdisabled=self.Disabled.get(path)ifdisabledisNone:returnFalseif'*'indisabled:# Item is disabled for everybodyreturnTrueiftenantisnotNoneandtenantindisabled:# Item is disabled for a specified tenantreturnTruereturnFalse
defcheck_favorite(self,path:str,inherit:bool=False)->bool:""" Check if `path` is marked as favorite for the current tenant (or globally via '*'). If `inherit` is True and a parent *folder* is favorited for the tenant/'*', any child path under that folder is considered favorited. **WARNING:** Tenant must be set in the context variable before using this function if you want tenant-aware behavior (same as check_disabled). Examples: 1) Tenant-aware check: ```python try: tenant_ctx = asab.contextvars.Tenant.set(tenant) is_fav = self.LibraryService.check_favorite(path) ... finally: asab.contextvars.Tenant.reset(tenant_ctx) ``` 2) Global-only check (any tenant): ```python is_fav = self.LibraryService.check_favorite(path) ``` Args: path (str): Path to the item or folder to be checked. inherit (bool): If True, a favorited folder marks all descendants as favorited. Returns: bool: True if favorited for current tenant or globally. """ifnotisinstance(path,str)ornotpath:raiseLibraryInvalidPathError(message="Argument 'path' must be a non-empty string.",path=path,)try:tenant=Tenant.get()exceptLookupError:tenant=None# 1) Folder favorites# - Exact match if not inheriting# - Prefix match if inheriting (folder favorite applies to children)ifinherit:forfp,fav_tenantsinself.FavoritePaths:ifpath.startswith(fp):if'*'infav_tenants:returnTrueiftenantisnotNoneandtenantinfav_tenants:returnTrueelse:# exact folder favorite (no inheritance)forfp,fav_tenantsinself.FavoritePaths:ifpath==fp:if'*'infav_tenants:returnTrueiftenantisnotNoneandtenantinfav_tenants:returnTrue# 2) Exact item favoritesfav_tenants=self.Favorites.get(path)iffav_tenantsisNone:returnFalseif'*'infav_tenants:returnTrueiftenantisnotNoneandtenantinfav_tenants:returnTruereturnFalse
asyncdefexport(self,path:str="/",remove_path:bool=False)->typing.IO:""" Return a file-like stream containing a gzipped tar archive of the library contents of the path. Args: path: The path to export. tenant (str | None ): The tenant to use for the operation. remove_path: If `True`, the path will be removed from the tar file. Returns: A file object containing a gzipped tar archive. """self._ensure_ready()_validate_path_directory(path)fileobj=tempfile.TemporaryFile()tarobj=tarfile.open(name=None,mode='w:gz',fileobj=fileobj)items=awaitself._list(path,providers=self.Libraries[:1])recitems=list(items[:])whilelen(recitems)>0:item=recitems.pop(0)ifitem.type!='dir':continuechild_items=awaitself._list(item.name,providers=item.providers)items.extend(child_items)recitems.extend(child_items)foriteminitems:ifitem.type!='item':continuemy_data=awaitself.Libraries[0].read(item.name)ifremove_path:assertitem.name.startswith(path)tar_name=item.name[len(path):]else:tar_name=item.nameinfo=tarfile.TarInfo(tar_name)my_data.seek(0,io.SEEK_END)info.size=my_data.tell()my_data.seek(0,io.SEEK_SET)info.mtime=time.time()tarobj.addfile(tarinfo=info,fileobj=my_data)tarobj.close()fileobj.seek(0)returnfileobj
Search for files with a specific name within a library, using the provided path.
The method traverses the library directories, looking for files that match the given filename.
It returns a list of paths leading to these files, empty if no items are found.
Parameters:
Name
Type
Description
Default
path
str
Location of the file in Library. It must start with a forward slash and include the filename. Example: '/Dashboards/Cisco/Overview.json'
required
Returns:
Type
Description
List[str]
typing.List[str]: A list of paths to the found files. If no files are found, the list will be empty.
asyncdeffind(self,path:str)->typing.List[str]:""" Search for files with a specific name within a library, using the provided path. The method traverses the library directories, looking for files that match the given filename. It returns a list of paths leading to these files, empty if no items are found. Args: path (str): Location of the file in Library. It must start with a forward slash and include the filename. Example: '/Dashboards/Cisco/Overview.json' Returns: typing.List[str]: A list of paths to the found files. If no files are found, the list will be empty. """_validate_path_item(path)results=[]forlibraryinself.Libraries:found_files=awaitlibrary.find(path)iffound_files:results.extend(found_files)returnresults
asyncdefget_item_metadata(self,path:str)->typing.Optional[dict]:""" Retrieve metadata for a specific file in the library, including its `target`. Args: path (str): The absolute path of the file to retrieve metadata for. Must start with '/' and include a filename with an extension. Returns: dict: Metadata for the specified file, including `target`, or None if not found. """# Validate the path format_validate_path_item(path)# Split into directory and filenamedirectory,filename=os.path.split(path)ifnotdirectoryornotfilename:L.warning("Invalid path '{}': missing directory or filename.".format(path))returnNone# Ensure directory ends with '/'ifnotdirectory.endswith('/'):directory+='/'try:# Fetch all items in the directoryitems=awaitself.list(directory)exceptExceptionase:L.warning("Failed to list items in directory '{}': {}".format(directory,e))returnNone# Use dictionary for faster lookupitems_dict={item.name:itemforiteminitems}# Retrieve the item by pathitem=items_dict.get(path)ifitemanditem.type=="item":# Match found; return metadata including `target`return{"name":item.name,"type":item.type,"layers":item.layers,"providers":item.providers,"disabled":item.disabled,"favorite":item.favorite,"override":item.override,}# Item not foundL.info("Item '{}' not found in directory '{}'.".format(filename,directory))returnNone
defis_ready(self)->bool:""" Check if all the library providers are ready. Returns: True if every provider is ready; if even one provider is not, returns False. """ifnotself.Libraries:returnFalseforproviderinself.Libraries:ifnotprovider.IsReady:returnFalsereturnTrue
List the directory of the library specified by the path that are enabled for the specified tenant.
This method can be used only after the Library is ready.
WARNING: Tenant must be set in the context variable!
If it is not set automatically (e.g. from web request), it must be set manually.
asyncdeflist(self,path:str="/",recursive:bool=False)->typing.List[LibraryItem]:""" List the directory of the library specified by the path that are enabled for the specified tenant. This method can be used only after the Library is ready. **WARNING:** Tenant must be set in the context variable! If it is not set automatically (e.g. from web request), it must be set manually. Example: ```python try: tenant_ctx = asab.contextvars.Tenant.set(tenant) items = self.LibraryService.list(path) ... finally: asab.contextvars.Tenant.reset(tenant_ctx) ``` Args: path (str): Path to the directory. recursive (bool): If `True`, return a list of items located at `path` and its subdirectories. Returns: List of items that are enabled for the tenant. """self._ensure_ready()_validate_path_directory(path)# List requested level using all available providersitems=awaitself._list(path,providers=self.Libraries)ifrecursive:# If recursive scan is requested, then iterate thru list of items# find 'dir' types there and list them.# Output of this list is attached to the list for recursive scan# and also to the final outputrecitems=list(items[:])whilelen(recitems)>0:item=recitems.pop(0)ifitem.type!='dir':continuechild_items=awaitself._list(item.name,providers=item.providers)items.extend(child_items)recitems.extend(child_items)returnitems
Read the content of the library item specified by path in a SAFE way, protected by a context manager/with statement.
This method can be used only after the Library is ready.
@contextlib.asynccontextmanagerasyncdefopen(self,path:str):""" Read the content of the library item specified by `path` in a SAFE way, protected by a context manager/with statement. This method can be used only after the Library is ready. Example: ```python async with self.LibraryService.open(path) as io: if io is None: return None text = b.read().decode("utf-8") ``` """self._ensure_ready()_validate_path_item(path)# Same functionality as in read() methoditemio=Nonedisabled=self.check_disabled(path)ifnotdisabled:forlibraryinself.Libraries:itemio=awaitlibrary.read(path)ifitemioisnotNone:breakifitemioisNone:yielditemioelse:try:yielditemiofinally:itemio.close()
Read the content of the library item specified by path. This method can be used only after the Library is ready.
Parameters:
Name
Type
Description
Default
path
str
Path to the file, LibraryItem.name can be used directly.
required
tenant
str | None
The tenant to apply. If not specified, the global access is assumed.
required
Returns:
Type
Description
IO | None
Readable stream with the content of the library item. None is returned if the item is not found or if it is disabled (either globally or for the specified tenant).
asyncdefread(self,path:str)->typing.Optional[typing.IO]:""" THIS IS OBSOLETED METHOD, USE `open(...)` !!! Read the content of the library item specified by `path`. This method can be used only after the Library is ready. Args: path (str): Path to the file, `LibraryItem.name` can be used directly. tenant (str | None): The tenant to apply. If not specified, the global access is assumed. Returns: ( IO | None ): Readable stream with the content of the library item. `None` is returned if the item is not found or if it is disabled (either globally or for the specified tenant). Example: ```python itemio = await library.read('/path', 'tenant') if itemio is not None: with itemio: return itemio.read() ``` """self._ensure_ready()LogObsolete.warning("Method 'LibraryService.read()' is obsolete. Use 'LibraryService.open()' method instead.")_validate_path_item(path)ifself.check_disabled(path):returnNoneforlibraryinself.Libraries:itemio=awaitlibrary.read(path)ifitemioisNone:continuereturnitemioreturnNone
Either single path or list of paths to be subscribed. All the paths must be absolute (start with '/').
required
target
Union[str, tuple, None]
In which target to watch the changes. Possible values:
- "global" to watch global path changes
- "tenant" to watch path changes in tenants
- ("tenant", TENANT_ID) to watch path changes in one specified tenant TENANT_ID
None
Examples:
classMyApplication(asab.Application):asyncdefinitialize(self):self.PubSub.subscribe("Library.ready!",self.on_library_readyself.PubSub.subscribe("Library.change!",self.on_library_change)asyncdefon_library_ready(self,event_name,library=None):awaitself.LibraryService.subscribe(["/path1/","/path2/"])defon_library_change(self,message,provider,path):print("New changes in the library found by provider: '{}'".format(provider))
asyncdefsubscribe(self,paths:typing.Union[str,typing.List[str]],target:typing.Union[str,tuple,None]=None,)->None:""" Subscribe to changes for specified paths of the library. In order to notify on changes in the Library, this method must be used after the Library is ready. Args: paths (str | list[str]): Either single path or list of paths to be subscribed. All the paths must be absolute (start with '/'). target: In which target to watch the changes. Possible values: - "global" to watch global path changes - "tenant" to watch path changes in tenants - ("tenant", TENANT_ID) to watch path changes in one specified tenant TENANT_ID Examples: ```python class MyApplication(asab.Application): async def initialize(self): self.PubSub.subscribe("Library.ready!", self.on_library_ready self.PubSub.subscribe("Library.change!", self.on_library_change) async def on_library_ready(self, event_name, library=None): await self.LibraryService.subscribe(["/path1/","/path2/"]) def on_library_change(self, message, provider, path): print("New changes in the library found by provider: '{}'".format(provider)) ``` """self._ensure_ready()ifisinstance(paths,str):paths=[paths]forpathinpaths:ifnotpath.startswith("/"):raiseLibraryInvalidPathError(message="Directory path must start with '/' when subscribing to Library changes.",path=path,)forproviderinself.Libraries:awaitprovider.subscribe(path,target)
@dataclasses.dataclassclassLibraryItem:""" The data class that contains the info about a specific item in the library. Attributes: name (str): The absolute path of the Item. It can be directly fed into `LibraryService.read(...)`. type (str): Can be either `dir` if the Item is a directory or `item` if Item is of any other type. layer (int): The number of highest layer in which this Item is found. The higher the number, the lower the layer is. providers (list): List of `LibraryProvider` objects containing this Item. disabled (bool): `True` if the Item is disabled, `False` otherwise. If the Item is disabled, `LibraryService.read(...)` will return `None`. favorite (bool): True if the Item is marked as a favorite. override (int): If `True`, this item is marked as an override for the providers with the same Item name. target (str): Specifies the target context, e.g., "tenant" or "global". Defaults to "global". """name:strtype:strlayers:typing.List[int]# Now stores multiple layers in ascending orderproviders:listdisabled:bool=Falsefavorite:bool=Falseoverride:int=0size:typing.Optional[int]=None# Size is None by default, absent for directories