Skip to content

Storage Service¤

ASAB's Storage Service supports data storage in-memory or in dedicated document databases, including MongoDB and ElasticSearch.

Configuration¤

First, specify the storage type in the configuration. The options for the storage type are:

  • inmemory: Collects data directly in memory
  • mongodb: Collects data using MongoDB database. Depends on pymongo and motor libraries.
  • elasticsearch: Collects data using ElasticSearch database. Depends on aiohttp library.

The Storage Service provides a unified interface for accessing and manipulating collections across multiple database technologies.

[asab:storage]
type=mongodb

For accessing the storage, simply add asab.storage.Module when initializing and register the service.

class MyApplication(asab.Application):

    async def initialize(self):

        self.add_module(asab.storage.Module)

    async def main(self):
        storage = self.get_service("asab.StorageService")

Manipulation with databases¤

Upsertor¤

Upsertor is an object that works like a pointer to the specified database and optionally to object id. It is used for inserting new objects, updating existing objects and deleting them.

u = storage.upsertor("test-collection")

The StorageService.upsertor() method creates an upsertor object associated with the specified collection. It takes collection as an argument and can have two parameters obj_id and version, which are used for getting an existing object by its ID and version.

Inserting an object¤

For inserting an object to the collection, use the Upsertor.set() method.

u.set("key", "value")

To execute these procedures, simply run the Upsertor.execute() coroutine method, which commits the upsertor data to the storage and returns the ID of the object. Since it is a coroutine, it must be awaited.

object_id = await u.execute()

The Upsertor.execute() method has optional parameters custom_data and event_type, which are used for webhook requests.

object_id = await u.execute(
    custom_data= {"foo": "bar"},
    event_type="object_created"
    )

Getting a single object¤

For getting a single object, use StorageService.get() coroutine method that takes two arguments collection and obj_id and finds an object by its ID in collection.

obj = await storage.get(collection="test-collection", obj_id=object_id)
print(obj)

When the requested object is not found in the collection, the method raises KeyError. Remember to handle this exception properly when using databases in your services and prevent them from crashing!

Note

MongoDB storage service in addition provides a coroutine method get_by() which is used for accessing an object by finding its key-value pair.

obj = await storage.get_by(database="test-collection", key="key", value="value")

Updating an object¤

For updating an object, first obtain the upsertor specifying its obj_id and version.

u = storage.upsertor(
    collection="test-collection", 
    obj_id=object_id, 
    version=obj['_v']
)

We strongly recommend to read the version from the object such as above. That creates a soft lock on the record. This means that if the object is updated by other component in meanwhile, your upsertor will fail, and you should retry the whole operation. The new objects should have a version set to 0, which is done by default.

After obtaining an upsertor, you can update the object via the Upsertor.set() coroutine.

u.set("key", "new_value")
object_id = await u.execute()

Deleting an object¤

For deleting an object from database, use the StorageService.delete() coroutine method which takes arguments collection and obj_id, deletes the object and returns its ID.

deleted_id = await u.delete("test-collection", object_id)

Storing data in memory¤

If the option inmemory is set, ASAB will store data in its own memory. In particular, asab.StorageService is initialized with an attribute InMemoryCollections which is a dictionary in which all the collections are stored.

Note

You can go through all the databases directly by accessing InMemoryCollections attribute, although we do not recommend that.

import pprint
storage = self.get_service("asab.StorageService")
pprint.pprint(storage.InMemoryCollections, indent=2)

Storing data in MongoDB¤

If the option mongodb is set, ASAB will store data in MongoDB database.

ASAB uses motor library which provides non-blocking MongoDB driver for asyncio.

You can specify the database name and URL for MongoDB in the config file:

[asab:storage]
type=mongodb

[mongo]
uri=mongodb://localhost:27017
database=asabdb

You can use all the methods from the abstract class. MongoDB Storage class provides in addition two methods, StorageService.get_by() and StorageService.collection().

The method StorageService.get_by() is used in the same way as StorageService.get().

obj = await storage.get_by(database="test-collection", key="key", value="value")

The method collection() is used for accessing the database directly. It takes collection as the argument and returns motor.motor_asyncio.AsyncIOMotorCollection object, which can be used for calling MongoDB directives.

collection = await storage.collection("test-collection")
cursor = collection.find({})
while await cursor.fetch_next:
    data = cursor.next_object()
    pprint.pprint(data)

The full list of methods suitable for this object is described in the official documentation.

Storing data in ElasticSearch¤

When using ElasticSearch, add configurations for URL, username and password (also supports api key with api_key parameter):

[asab:storage]
type=elasticsearch

[elasticsearch]
url=http://localhost:9200/
username=JohnDoe
password=lorem_ipsum_dolor?sit_amet!2023

You can also specify the refreshing parameter and scroll timeout for ElasticSearch Scroll API.

[asab:storage]
refresh=true
scroll_timeout=1m

ElasticSearch Storage provides in addition other methods for creating index templates, mappings etc (see the Reference section).

Encryption and decryption¤

Data stored in the database can be encrypted using an algorithm that adheres to the Advanced Encryption Standard (AES).

AES Key settings¤

In order to use encryption, first make sure you have the cryptography package installed. Then specify the AES Key in the config file.

[asab:storage]
aes_key=random_key_string

Note

The AES Key is used as both an encryption and decryption key. It is recommended to keep it in a separate configuration file that is not exposed anywhere publicly.

The actual binary AES Key is obtained from the aes_key specified in the config file by encoding and hashing it using the standard hashlib algorithms, so do not worry about the length and type of the key.

Encrypting data¤

The Upsertor.set() method has an optional boolean parameter encrypt for encrypting the data before they are stored. Only values of the type bytes can be encrypted. If you want to encrypt other values, encode them first.

message = "This is a super secret message!"
number = 2023
message_binary = message.encode("ascii")
number_binary = number.encode("ascii")

u.set("message", message_binary, encrypt=True)
u.set("number", number_binary, encrypt=True)
object_id = await u.execute()

Decrypting data¤

The StorageService.get() coroutine method has an optional parameter decrypt which takes an iterable object (i.e. a list, tuple, set, ...) with the names of keys whose values are to be decrypted.

data = await storage.get(
    collection="test-collection", 
    obj_id=object_id, 
    decrypt=["message", "number"]
    )

If some of the keys to be decrypted are missing in the required document, the method will ignore them and continue.

Note

Data that has been encrypted can be identified by the prefix "$aes-cbc$" and are stored in a binary format.

Under the hood¤

For encrypting data, we use the certified symmetric AES-CBC algorithm. In fact, the abstract base class StorageServiceABC provides two methods aes_encrypt() and aes_decrypt() that are called automatically in Upsertor.set() and StorageService.get() methods when the parameter encrypt or decrypt is specified.

AES-CBC is a mode of operation for the Advanced Encryption Standard (AES) algorithm that provides confidentiality and integrity for data. In AES-CBC, the plaintext is divided into blocks of fixed size (usually 128 bits), and each block is encrypted using the AES algorithm with a secret key.

CBC stands for \"Cipher Block Chaining\" and it is a technique that adds an extra step to the encryption process to ensure that each ciphertext block depends on the previous one. This means that any modification to the ciphertext will produce a completely different plaintext after decryption.

The algorithm is a symmetric cipher, which is suitable for encrypting large amounts of data. It requires much less computation power than asymmetric ciphers and is much more useful for bulk encrypting large amounts of data.

Reference¤

asab.storage.Module ¤

Bases: Module

Source code in asab/storage/__init__.py
class Module(asab.Module):

	def __init__(self, app):
		super().__init__(app)
		sttype = asab.Config.get('asab:storage', 'type', fallback=None)

		if sttype == 'inmemory':
			from .inmemory import StorageService
			self.Service = StorageService(app, "asab.StorageService")

		elif sttype == 'mongodb':
			from .mongodb import StorageService
			self.Service = StorageService(app, "asab.StorageService")

		elif sttype == "elasticsearch":
			from .elasticsearch import StorageService
			self.Service = StorageService(app, "asab.StorageService")

		elif sttype is None:
			L.critical("Missing configuration for [asab:storage] type.")
			raise SystemExit("Exit due to a critical configuration error.")

		else:
			L.critical("Unknown configuration type '{}' in [asab:storage].".format(sttype))
			raise SystemExit("Exit due to a critical configuration error.")

asab.storage.service.StorageServiceABC ¤

Bases: Service

An abstract class for the Storage Service.

Source code in asab/storage/service.py
class StorageServiceABC(asab.Service):
	"""
	An abstract class for the Storage Service.

	"""

	def __init__(self, app, service_name):
		super().__init__(app, service_name)
		self.WebhookURIs = asab.Config.get("asab:storage:changestream", "webhook_uri", fallback="") or None
		if self.WebhookURIs is not None:
			self.WebhookURIs = [uri for uri in re.split(r"\s+", self.WebhookURIs) if len(uri) > 0]
			try:
				self.ProactorService = app.get_service("asab.ProactorService")
			except KeyError as e:
				raise Exception("Storage webhooks require ProactorService") from e
		self.WebhookAuth = asab.Config.get("asab:storage:changestream", "webhook_auth", fallback="") or None

		# Specify a non-empty AES key to enable AES encryption of selected fields
		self._AESKey = asab.Config.get("asab:storage", "aes_key", fallback="")
		if len(self._AESKey) > 0:
			if cryptography is None:
				raise ModuleNotFoundError(
					"You are using storage encryption without 'cryptography' installed. "
					"Please run 'pip install cryptography' "
					"or install asab with 'storage_encryption' optional dependency.")
			self._AESKey = hashlib.sha256(self._AESKey.encode("utf-8")).digest()
		else:
			self._AESKey = None


	@abc.abstractmethod
	def upsertor(self, collection: str, obj_id=None, version: int = 0) -> None:
		"""
		Create an upsertor object for the specified collection.

		If updating an existing object, please specify its `obj_id` and also `version` that you need to read from a storage upfront.
		If `obj_id` is None, we assume that you want to insert a new object and generate its new `obj_id`, `version` should be set to 0 (default) in that case.
		If you want to insert a new object with a specific `obj_id`, specify `obj_id` and set a version to 0.
			- If there will be a colliding object already stored in a storage, `execute()` method will fail on `DuplicateError`.

		Args:
			collection: Name of collection to work with
			obj_id: Primary identification of an object in the storage (e.g. primary key)
			version: Specify a current version of the object and hence prevent byzantine faults. \
			You should always read the version from the storage upfront, prior using an upsertor. \
			That creates a soft lock on the record. It means that if the object is updated by other \
			component in meanwhile, your upsertor will fail and you should retry the whole operation. \
			The new objects should have a `version` set to 0.
		"""
		pass


	@abc.abstractmethod
	async def get(self, collection: str, obj_id, decrypt: bool = None) -> dict:
		"""
		Get object from collection by its ID.

		Args:
			collection: Collection to get from.
			obj_id: Object identification.
			decrypt: Set of fields to decrypt.

		Returns:
			The object retrieved from a storage.

		Raises:
			KeyError: Raised if `obj_id` is not found in `collection`.
		"""
		pass


	@abc.abstractmethod
	async def get_by(self, collection: str, key: str, value, decrypt=None) -> dict:
		"""
		Get object from collection by its key and value.

		Args:
			collection: Collection to get from
			key: Key to filter on
			value: Value to filter on
			decrypt: Set of fields to decrypt

		Returns:
			The object retrieved from a storage.

		Raises:
			KeyError: If object {key: value} not found in `collection`
		"""
		pass


	@abc.abstractmethod
	async def delete(self, collection: str, obj_id):
		"""
		Delete object from collection.

		Args:
			collection: Collection to get from
			obj_id: Object identification

		Returns:
			ID of the deleted object.

		Raises:
			KeyError: Raised when obj_id cannot be found in collection.
		"""
		pass


	def aes_encrypt(self, raw: bytes, iv: bytes = None) -> bytes:
		"""
		Take an array of bytes and encrypt it using AES-CBC.

		Args:
			raw: The data to be encrypted.
			iv: AES-CBC initialization vector, 16 bytes long. If left empty, a random 16-byte array will be used.

		Returns:
			The encrypted data.

		Raises:
			TypeError: The data are not in binary format.
		"""
		block_size = cryptography.hazmat.primitives.ciphers.algorithms.AES.block_size

		if self._AESKey is None:
			raise RuntimeError(
				"No aes_key specified in asab:storage configuration. "
				"If you want to use encryption, specify a non-empty aes_key."
			)

		if not isinstance(raw, bytes):
			if isinstance(raw, str):
				raise TypeError("String objects must be encoded before encryption")
			else:
				raise TypeError("Only 'bytes' objects can be encrypted")

		# Pad the value to fit the block size
		padder = cryptography.hazmat.primitives.padding.PKCS7(block_size).padder()
		padded = padder.update(raw)
		padded += padder.finalize()

		if iv is None:
			iv = secrets.token_bytes(block_size // 8)

		algorithm = cryptography.hazmat.primitives.ciphers.algorithms.AES(self._AESKey)
		mode = cryptography.hazmat.primitives.ciphers.modes.CBC(iv)
		cipher = cryptography.hazmat.primitives.ciphers.Cipher(algorithm, mode)
		encryptor = cipher.encryptor()
		encrypted = ENCRYPTED_PREFIX + iv + (encryptor.update(padded) + encryptor.finalize())
		return encrypted


	def aes_decrypt(self, encrypted: bytes, _obsolete_padding: bool = False) -> bytes:
		"""
		Decrypt encrypted data using AES-CBC.

		Args:
			encrypted: The encrypted data to decrypt. It must start with b"$aes-cbc$" prefix, followed by one-block-long initialization vector.
			_obsolete_padding: Back-compat option: Use incorrect old padding method

		Returns:
			The decrypted data.
		"""
		block_size = cryptography.hazmat.primitives.ciphers.algorithms.AES.block_size

		if self._AESKey is None:
			raise RuntimeError(
				"No aes_key specified in asab:storage configuration. "
				"If you want to use encryption, specify a non-empty aes_key."
			)

		if not isinstance(encrypted, bytes):
			raise TypeError("Only values of type 'bytes' can be decrypted")

		# Strip the prefix
		if not encrypted.startswith(ENCRYPTED_PREFIX):
			raise ValueError("Encrypted data must start with {!r} prefix".format(ENCRYPTED_PREFIX))
		encrypted = encrypted[len(ENCRYPTED_PREFIX):]

		# Separate the initialization vector
		iv, encrypted = encrypted[:block_size // 8], encrypted[block_size // 8:]

		algorithm = cryptography.hazmat.primitives.ciphers.algorithms.AES(self._AESKey)
		mode = cryptography.hazmat.primitives.ciphers.modes.CBC(iv)
		cipher = cryptography.hazmat.primitives.ciphers.Cipher(algorithm, mode)
		decryptor = cipher.decryptor()
		padded = decryptor.update(encrypted) + decryptor.finalize()

		# Strip padding
		if _obsolete_padding:
			# Back-compat: Incorrect old padding method
			raw = padded.rstrip(b"\x00")
		else:
			unpadder = cryptography.hazmat.primitives.padding.PKCS7(block_size).unpadder()
			raw = unpadder.update(padded)
			raw += unpadder.finalize()

		return raw


	def encryption_enabled(self) -> bool:
		"""
		Check if AESKey is not empty.

		Returns:
			True if AESKey is not empty.
		"""
		return self._AESKey is not None

aes_decrypt(encrypted, _obsolete_padding=False) ¤

Decrypt encrypted data using AES-CBC.

Parameters:

Name Type Description Default
encrypted bytes

The encrypted data to decrypt. It must start with b"$aes-cbc$" prefix, followed by one-block-long initialization vector.

required
_obsolete_padding bool

Back-compat option: Use incorrect old padding method

False

Returns:

Type Description
bytes

The decrypted data.

Source code in asab/storage/service.py
def aes_decrypt(self, encrypted: bytes, _obsolete_padding: bool = False) -> bytes:
	"""
	Decrypt encrypted data using AES-CBC.

	Args:
		encrypted: The encrypted data to decrypt. It must start with b"$aes-cbc$" prefix, followed by one-block-long initialization vector.
		_obsolete_padding: Back-compat option: Use incorrect old padding method

	Returns:
		The decrypted data.
	"""
	block_size = cryptography.hazmat.primitives.ciphers.algorithms.AES.block_size

	if self._AESKey is None:
		raise RuntimeError(
			"No aes_key specified in asab:storage configuration. "
			"If you want to use encryption, specify a non-empty aes_key."
		)

	if not isinstance(encrypted, bytes):
		raise TypeError("Only values of type 'bytes' can be decrypted")

	# Strip the prefix
	if not encrypted.startswith(ENCRYPTED_PREFIX):
		raise ValueError("Encrypted data must start with {!r} prefix".format(ENCRYPTED_PREFIX))
	encrypted = encrypted[len(ENCRYPTED_PREFIX):]

	# Separate the initialization vector
	iv, encrypted = encrypted[:block_size // 8], encrypted[block_size // 8:]

	algorithm = cryptography.hazmat.primitives.ciphers.algorithms.AES(self._AESKey)
	mode = cryptography.hazmat.primitives.ciphers.modes.CBC(iv)
	cipher = cryptography.hazmat.primitives.ciphers.Cipher(algorithm, mode)
	decryptor = cipher.decryptor()
	padded = decryptor.update(encrypted) + decryptor.finalize()

	# Strip padding
	if _obsolete_padding:
		# Back-compat: Incorrect old padding method
		raw = padded.rstrip(b"\x00")
	else:
		unpadder = cryptography.hazmat.primitives.padding.PKCS7(block_size).unpadder()
		raw = unpadder.update(padded)
		raw += unpadder.finalize()

	return raw

aes_encrypt(raw, iv=None) ¤

Take an array of bytes and encrypt it using AES-CBC.

Parameters:

Name Type Description Default
raw bytes

The data to be encrypted.

required
iv bytes

AES-CBC initialization vector, 16 bytes long. If left empty, a random 16-byte array will be used.

None

Returns:

Type Description
bytes

The encrypted data.

Raises:

Type Description
TypeError

The data are not in binary format.

Source code in asab/storage/service.py
def aes_encrypt(self, raw: bytes, iv: bytes = None) -> bytes:
	"""
	Take an array of bytes and encrypt it using AES-CBC.

	Args:
		raw: The data to be encrypted.
		iv: AES-CBC initialization vector, 16 bytes long. If left empty, a random 16-byte array will be used.

	Returns:
		The encrypted data.

	Raises:
		TypeError: The data are not in binary format.
	"""
	block_size = cryptography.hazmat.primitives.ciphers.algorithms.AES.block_size

	if self._AESKey is None:
		raise RuntimeError(
			"No aes_key specified in asab:storage configuration. "
			"If you want to use encryption, specify a non-empty aes_key."
		)

	if not isinstance(raw, bytes):
		if isinstance(raw, str):
			raise TypeError("String objects must be encoded before encryption")
		else:
			raise TypeError("Only 'bytes' objects can be encrypted")

	# Pad the value to fit the block size
	padder = cryptography.hazmat.primitives.padding.PKCS7(block_size).padder()
	padded = padder.update(raw)
	padded += padder.finalize()

	if iv is None:
		iv = secrets.token_bytes(block_size // 8)

	algorithm = cryptography.hazmat.primitives.ciphers.algorithms.AES(self._AESKey)
	mode = cryptography.hazmat.primitives.ciphers.modes.CBC(iv)
	cipher = cryptography.hazmat.primitives.ciphers.Cipher(algorithm, mode)
	encryptor = cipher.encryptor()
	encrypted = ENCRYPTED_PREFIX + iv + (encryptor.update(padded) + encryptor.finalize())
	return encrypted

delete(collection, obj_id) abstractmethod async ¤

Delete object from collection.

Parameters:

Name Type Description Default
collection str

Collection to get from

required
obj_id

Object identification

required

Returns:

Type Description

ID of the deleted object.

Raises:

Type Description
KeyError

Raised when obj_id cannot be found in collection.

Source code in asab/storage/service.py
@abc.abstractmethod
async def delete(self, collection: str, obj_id):
	"""
	Delete object from collection.

	Args:
		collection: Collection to get from
		obj_id: Object identification

	Returns:
		ID of the deleted object.

	Raises:
		KeyError: Raised when obj_id cannot be found in collection.
	"""
	pass

encryption_enabled() ¤

Check if AESKey is not empty.

Returns:

Type Description
bool

True if AESKey is not empty.

Source code in asab/storage/service.py
def encryption_enabled(self) -> bool:
	"""
	Check if AESKey is not empty.

	Returns:
		True if AESKey is not empty.
	"""
	return self._AESKey is not None

get(collection, obj_id, decrypt=None) abstractmethod async ¤

Get object from collection by its ID.

Parameters:

Name Type Description Default
collection str

Collection to get from.

required
obj_id

Object identification.

required
decrypt bool

Set of fields to decrypt.

None

Returns:

Type Description
dict

The object retrieved from a storage.

Raises:

Type Description
KeyError

Raised if obj_id is not found in collection.

Source code in asab/storage/service.py
@abc.abstractmethod
async def get(self, collection: str, obj_id, decrypt: bool = None) -> dict:
	"""
	Get object from collection by its ID.

	Args:
		collection: Collection to get from.
		obj_id: Object identification.
		decrypt: Set of fields to decrypt.

	Returns:
		The object retrieved from a storage.

	Raises:
		KeyError: Raised if `obj_id` is not found in `collection`.
	"""
	pass

get_by(collection, key, value, decrypt=None) abstractmethod async ¤

Get object from collection by its key and value.

Parameters:

Name Type Description Default
collection str

Collection to get from

required
key str

Key to filter on

required
value

Value to filter on

required
decrypt

Set of fields to decrypt

None

Returns:

Type Description
dict

The object retrieved from a storage.

Raises:

Type Description
KeyError

If object {key: value} not found in collection

Source code in asab/storage/service.py
@abc.abstractmethod
async def get_by(self, collection: str, key: str, value, decrypt=None) -> dict:
	"""
	Get object from collection by its key and value.

	Args:
		collection: Collection to get from
		key: Key to filter on
		value: Value to filter on
		decrypt: Set of fields to decrypt

	Returns:
		The object retrieved from a storage.

	Raises:
		KeyError: If object {key: value} not found in `collection`
	"""
	pass

upsertor(collection, obj_id=None, version=0) abstractmethod ¤

Create an upsertor object for the specified collection.

If updating an existing object, please specify its obj_id and also version that you need to read from a storage upfront. If obj_id is None, we assume that you want to insert a new object and generate its new obj_id, version should be set to 0 (default) in that case. If you want to insert a new object with a specific obj_id, specify obj_id and set a version to 0. - If there will be a colliding object already stored in a storage, execute() method will fail on DuplicateError.

Parameters:

Name Type Description Default
collection str

Name of collection to work with

required
obj_id

Primary identification of an object in the storage (e.g. primary key)

None
version int

Specify a current version of the object and hence prevent byzantine faults. You should always read the version from the storage upfront, prior using an upsertor. That creates a soft lock on the record. It means that if the object is updated by other component in meanwhile, your upsertor will fail and you should retry the whole operation. The new objects should have a version set to 0.

0
Source code in asab/storage/service.py
@abc.abstractmethod
def upsertor(self, collection: str, obj_id=None, version: int = 0) -> None:
	"""
	Create an upsertor object for the specified collection.

	If updating an existing object, please specify its `obj_id` and also `version` that you need to read from a storage upfront.
	If `obj_id` is None, we assume that you want to insert a new object and generate its new `obj_id`, `version` should be set to 0 (default) in that case.
	If you want to insert a new object with a specific `obj_id`, specify `obj_id` and set a version to 0.
		- If there will be a colliding object already stored in a storage, `execute()` method will fail on `DuplicateError`.

	Args:
		collection: Name of collection to work with
		obj_id: Primary identification of an object in the storage (e.g. primary key)
		version: Specify a current version of the object and hence prevent byzantine faults. \
		You should always read the version from the storage upfront, prior using an upsertor. \
		That creates a soft lock on the record. It means that if the object is updated by other \
		component in meanwhile, your upsertor will fail and you should retry the whole operation. \
		The new objects should have a `version` set to 0.
	"""
	pass

asab.storage.inmemory.StorageService ¤

Bases: StorageServiceABC

Source code in asab/storage/inmemory.py
class StorageService(StorageServiceABC):


	def __init__(self, app, service_name):
		super().__init__(app, service_name)
		self.InMemoryCollections = {}


	def upsertor(self, collection: str, obj_id=None, version=0) -> InMemoryUpsertor:
		"""Obtain an in-memory upsertor for given collection and possibly for the specified object.

		:collection (str): The name of the collection.
		:obj_id (_type_, optional): The ID of the document to retrieve. Defaults to None.
		:version (int, optional): The version of the collection. Defaults to 0.

		Returns:
			:InMemoryUpsertor: Upsertor for given collection.

		"""
		return InMemoryUpsertor(self, collection, obj_id, version)


	async def get(self, collection: str, obj_id: typing.Union[str, bytes], decrypt=None) -> dict:
		"""Retrieve a document from an in-memory collection by its ID.

		:collection (str): The name of the collection to retrieve the document from.
		:obj_id (str | bytes): The ID of the document to retrieve.
		:decrypt (_type_, optional): A list of field names to decrypt. Defaults to None.

		Returns:
			:dict: A dictionary representing the retrieved document.
			If `decrypt` is not None, the specified fields in the document are decrypted using AES decryption algorithm.

		"""
		coll = self.InMemoryCollections[collection]
		data = coll[obj_id]
		if decrypt is not None:
			for field in decrypt:
				if field in data:
					data[field] = self.aes_decrypt(data[field])
		return data


	async def get_by(self, collection: str, key: str, value, decrypt=None) -> dict:
		"""
		Retrieve a document from an in-memory collection by key and value. Not implemented yet.

		Raises:
			:NotImplementedError: Not implemented on InMemoryStorage
		"""
		raise NotImplementedError()


	async def delete(self, collection: str, obj_id):
		"""
		Delete a document from an in-memory collection.

		:param collection: Collection to delete from
		:param obj_id: Object identification

		Raises:
			:KeyError: If `obj_id` not found in `collection`
		"""
		coll = self.InMemoryCollections[collection]
		del coll[obj_id]


	def _set(self, collection: str, obj_id, obj):
		try:
			coll = self.InMemoryCollections[collection]
		except KeyError:
			coll = {}
			self.InMemoryCollections[collection] = coll

		nobj = coll.setdefault(obj_id, obj)
		if nobj != obj:
			raise DuplicateError("Already exists", obj_id)

delete(collection, obj_id) async ¤

Delete a document from an in-memory collection.

:param collection: Collection to delete from :param obj_id: Object identification

Raises:

Type Description

KeyError: If obj_id not found in collection

Source code in asab/storage/inmemory.py
async def delete(self, collection: str, obj_id):
	"""
	Delete a document from an in-memory collection.

	:param collection: Collection to delete from
	:param obj_id: Object identification

	Raises:
		:KeyError: If `obj_id` not found in `collection`
	"""
	coll = self.InMemoryCollections[collection]
	del coll[obj_id]

get(collection, obj_id, decrypt=None) async ¤

Retrieve a document from an in-memory collection by its ID.

:collection (str): The name of the collection to retrieve the document from. :obj_id (str | bytes): The ID of the document to retrieve. :decrypt (type, optional): A list of field names to decrypt. Defaults to None.

Returns:

Type Description
dict

dict: A dictionary representing the retrieved document.

dict

If decrypt is not None, the specified fields in the document are decrypted using AES decryption algorithm.

Source code in asab/storage/inmemory.py
async def get(self, collection: str, obj_id: typing.Union[str, bytes], decrypt=None) -> dict:
	"""Retrieve a document from an in-memory collection by its ID.

	:collection (str): The name of the collection to retrieve the document from.
	:obj_id (str | bytes): The ID of the document to retrieve.
	:decrypt (_type_, optional): A list of field names to decrypt. Defaults to None.

	Returns:
		:dict: A dictionary representing the retrieved document.
		If `decrypt` is not None, the specified fields in the document are decrypted using AES decryption algorithm.

	"""
	coll = self.InMemoryCollections[collection]
	data = coll[obj_id]
	if decrypt is not None:
		for field in decrypt:
			if field in data:
				data[field] = self.aes_decrypt(data[field])
	return data

get_by(collection, key, value, decrypt=None) async ¤

Retrieve a document from an in-memory collection by key and value. Not implemented yet.

Raises:

Type Description

NotImplementedError: Not implemented on InMemoryStorage

Source code in asab/storage/inmemory.py
async def get_by(self, collection: str, key: str, value, decrypt=None) -> dict:
	"""
	Retrieve a document from an in-memory collection by key and value. Not implemented yet.

	Raises:
		:NotImplementedError: Not implemented on InMemoryStorage
	"""
	raise NotImplementedError()

upsertor(collection, obj_id=None, version=0) ¤

Obtain an in-memory upsertor for given collection and possibly for the specified object.

:collection (str): The name of the collection. :obj_id (type, optional): The ID of the document to retrieve. Defaults to None. :version (int, optional): The version of the collection. Defaults to 0.

Returns:

Type Description
InMemoryUpsertor

InMemoryUpsertor: Upsertor for given collection.

Source code in asab/storage/inmemory.py
def upsertor(self, collection: str, obj_id=None, version=0) -> InMemoryUpsertor:
	"""Obtain an in-memory upsertor for given collection and possibly for the specified object.

	:collection (str): The name of the collection.
	:obj_id (_type_, optional): The ID of the document to retrieve. Defaults to None.
	:version (int, optional): The version of the collection. Defaults to 0.

	Returns:
		:InMemoryUpsertor: Upsertor for given collection.

	"""
	return InMemoryUpsertor(self, collection, obj_id, version)

asab.storage.mongodb.StorageService ¤

Bases: StorageServiceABC

StorageService for MongoDB. Depends on pymongo and motor.

Source code in asab/storage/mongodb.py
class StorageService(StorageServiceABC):
	'''
	StorageService for MongoDB. Depends on `pymongo` and `motor`.
	'''


	def __init__(self, app, service_name, config_section_name='asab:storage'):
		super().__init__(app, service_name)

		# Check the old section and then the new section for uri
		uri = asab.Config.get(config_section_name, 'mongodb_uri', fallback='')
		if len(uri) == 0:
			uri = asab.Config.get("mongo", 'uri', fallback='')

		if len(uri) == 0:
			raise RuntimeError("No MongoDB URI has been provided.")

		self.Client = motor.motor_asyncio.AsyncIOMotorClient(uri)

		# Check the old section and then the new section for database name
		db_name = asab.Config.get(config_section_name, 'mongodb_database', fallback='')
		if len(db_name) == 0:
			db_name = asab.Config.get('mongo', 'database', fallback='')

		self.Database = self.Client.get_database(
			db_name,
			codec_options=bson.codec_options.CodecOptions(tz_aware=True, tzinfo=datetime.timezone.utc),
		)

		assert self.Database is not None


	def upsertor(self, collection: str, obj_id=None, version=0):
		return MongoDBUpsertor(self, collection, obj_id, version)


	async def get(self, collection: str, obj_id, decrypt=None) -> dict:
		coll = self.Database[collection]
		ret = await coll.find_one({'_id': obj_id})
		if ret is None:
			raise KeyError("NOT-FOUND")

		if decrypt is not None:
			await self._decrypt(ret, fields=decrypt, collection=collection)

		return ret


	async def get_by(self, collection: str, key: str, value, decrypt=None) -> dict:
		coll = self.Database[collection]
		ret = await coll.find_one({key: value})
		if ret is None:
			raise KeyError("NOT-FOUND")

		if decrypt is not None:
			await self._decrypt(ret, fields=decrypt, collection=collection)

		return ret


	async def collection(self, collection: str) -> motor.motor_asyncio.AsyncIOMotorCollection:
		"""
		Get collection. Useful for custom operations.

		Args:
			collection: Collection to get.

		Returns:
			`AsyncIOMotorCollection` object connected to the queried database.

		Examples:

			>>> coll = await storage.collection("test-collection")
			>>> cursor = coll.find({})
			>>> while await cursor.fetch_next:
			... 	obj = cursor.next_object()
			... 	pprint.pprint(obj)

		"""

		return self.Database[collection]


	async def delete(self, collection: str, obj_id):
		coll = self.Database[collection]
		ret = await coll.find_one_and_delete({'_id': obj_id})
		if ret is None:
			raise KeyError("NOT-FOUND")
		return ret['_id']


	async def _decrypt(self, db_obj: dict, fields: typing.Iterable, collection: str):
		"""
		Decrypt object fields in-place
		"""
		re_encrypt_fields = {}
		for field in fields:
			if field in db_obj:
				try:
					db_obj[field] = self.aes_decrypt(db_obj[field])
				except ValueError:
					db_obj[field] = self.aes_decrypt(db_obj[field], _obsolete_padding=True)
					re_encrypt_fields[field] = db_obj[field]

		# Update fields encrypted with flawed padding in previous versions (before #587)
		if re_encrypt_fields:
			upsertor = self.upsertor(collection, db_obj["_id"], db_obj["_v"])
			for k, v in re_encrypt_fields.items():
				upsertor.set(k, v, encrypt=True)
			L.debug("Object encryption updated.", struct_data={
				"coll": collection, "_id": db_obj["_id"], "fields": list(re_encrypt_fields)})
			await upsertor.execute()

collection(collection) async ¤

Get collection. Useful for custom operations.

Parameters:

Name Type Description Default
collection str

Collection to get.

required

Returns:

Type Description
AsyncIOMotorCollection

AsyncIOMotorCollection object connected to the queried database.

Examples:

    >>> coll = await storage.collection("test-collection")
    >>> cursor = coll.find({})
    >>> while await cursor.fetch_next:
    ...     obj = cursor.next_object()
    ...     pprint.pprint(obj)
Source code in asab/storage/mongodb.py
async def collection(self, collection: str) -> motor.motor_asyncio.AsyncIOMotorCollection:
	"""
	Get collection. Useful for custom operations.

	Args:
		collection: Collection to get.

	Returns:
		`AsyncIOMotorCollection` object connected to the queried database.

	Examples:

		>>> coll = await storage.collection("test-collection")
		>>> cursor = coll.find({})
		>>> while await cursor.fetch_next:
		... 	obj = cursor.next_object()
		... 	pprint.pprint(obj)

	"""

	return self.Database[collection]

asab.storage.elasticsearch.StorageService ¤

Bases: StorageServiceABC

StorageService for Elastic Search. Depends on aiohttp library.

Source code in asab/storage/elasticsearch.py
class StorageService(StorageServiceABC):
	"""
	StorageService for Elastic Search. Depends on `aiohttp` library.
	"""

	def __init__(self, app, service_name, config_section_name='asab:storage'):
		super().__init__(app, service_name)

		self.Refresh = Config.get(config_section_name, 'refresh', fallback='true')
		self.ScrollTimeout = Config.get(config_section_name, 'scroll_timeout', fallback='1m')

		# always check if there is a url in the old config section first
		url = Config.getmultiline(config_section_name, 'elasticsearch_url', fallback='')
		if len(url) > 0:
			asab.LogObsolete.warning(
				"Do not configure elasticsearch connection in [asab:storage]. Please use [elasticsearch] section with url, username and password parameters."
			)
		elif len(url) == 0:
			url = asab.Config.getmultiline('elasticsearch', 'url', fallback='')

		self.ServerUrls = get_url_list(url)

		if len(self.ServerUrls) == 0:
			raise RuntimeError("No ElasticSearch URL has been provided.")

		# Authorization: username or API-key
		username = Config.get(config_section_name, 'elasticsearch_username')
		if len(username) == 0:
			username = Config.get('elasticsearch', 'username', fallback='')

		password = Config.get(config_section_name, 'elasticsearch_password')
		if len(password) == 0:
			password = Config.get('elasticsearch', 'password', fallback='')

		api_key = Config.get(config_section_name, 'elasticsearch_api_key')
		if len(api_key) == 0:
			api_key = Config.get('elasticsearch', 'api_key', fallback='')

		# Create headers for requests
		self.Headers = build_headers(username, password, api_key)

		# Build ssl context
		if self.ServerUrls[0].startswith('https://'):
			# check if [asab:storage] section has data for SSL or default to the [elasticsearch] section
			if section_has_ssl_option(config_section_name):
				self.SSLContextBuilder = SSLContextBuilder(config_section_name=config_section_name)
			else:
				self.SSLContextBuilder = SSLContextBuilder(config_section_name='elasticsearch')
			self.SSLContext = self.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT)
		else:
			self.SSLContext = None


	@contextlib.asynccontextmanager
	async def request(self, method, path, data=None, json=None):
		'''
		This method can be used to do a custom call to ElasticSearch like so:

		async with self.request("GET", "cluster/_health") as resp:
			...

		'''
		async with aiohttp.ClientSession() as session:
			for n, url in enumerate(self.ServerUrls, 1):
				try:
					async with session.request(
						method=method,
						url=url + path,
						ssl=self.SSLContext,
						headers=self.Headers,
						data=data,
						json=json,
					) as resp:

						if resp.status == 401:
							raise ConnectionRefusedError("Response code 401: Unauthorized. Provide authorization by specifying either user name and password or api key.")

						yield resp
						return

				except aiohttp.client_exceptions.ClientConnectorError:
					if n == len(self.ServerUrls):
						raise ConnectionError("Failed to connect to '{}'.".format(url))
					else:
						L.warning("Failed to connect to '{}', iterating to another node".format(url))


	async def is_connected(self) -> bool:
		"""
		Check if the service is connected to ElasticSearch cluster.

		Raises:
			ConnectionError: Connection failed.

		Returns:
			bool: True if the service is connected.
		"""
		async with self.request("GET", "") as resp:
			if resp.status not in {200, 201}:
				resp = await resp.json()
				L.error("Failed to connect to ElasticSearch.", struct_data={
					"code": resp.get("status"),
					"reason": resp.get("error", {}).get("reason")
				})
				return False

			else:
				L.info("Connected to ElasticSearch.", struct_data={"urls": self.ServerUrls})
				return True


	async def get(self, index: str, obj_id: str, decrypt=None) -> dict:
		"""
		Get object by its index and object ID.

		Args:
			index (str): Index for the query.
			obj_id (str): ID of the object.
			decrypt (None): Not implemented yet. Defaults to None.

		Raises:
			NotImplementedError: Encryption and decryption has not yet been implemented for ECS.
			ConnectionError: Connection failed.
			ConnectionRefusedError: Authorization required.
			KeyError: Object with the ID does not exist.

		Returns:
			The query result.
		"""
		if decrypt is not None:
			raise NotImplementedError("AES encryption for ElasticSearch not implemented")

		async with self.request("GET", "{}/_doc/{}".format(index, obj_id)) as resp:

			if resp.status not in {200, 201}:
				resp = await resp.json()
				raise ConnectionError("Failed to retrieve data from ElasticSearch. Got {}: {}".format(
					resp.get("status"),
					resp.get("error", {}).get("reason")
				))

			else:
				obj = await resp.json()
				if not obj.get("found"):
					raise KeyError("No existing object with ID {}".format(obj_id))
				ret = obj['_source']
				ret['_v'] = obj['_version']
				ret['_id'] = obj['_id']
				return ret


	async def get_by(self, collection: str, key: str, value, decrypt=None):
		raise NotImplementedError("get_by")

	async def delete(self, index: str, _id=None) -> dict:
		"""
		Delete an entire index or document from that index.

		Args:
			index: Index to delete.
			_id: If specified, only document with the ID is deleted.

		Raises:
			ConnectionRefusedError: Authorization required (status 401)
			KeyError: No existing object with ID
			ConnectionError: Unexpected status code
			Exception: ClientConnectorError

		Returns:
			The deleted document or message that the entire index was deleted.
		"""

		if _id:
			path = "{}/_doc/{}?refresh={}".format(index, urllib.parse.quote_plus(_id), self.Refresh)
		else:
			path = "{}".format(index)

		async with self.request("DELETE", path) as resp:
			if resp.status == 404:
				raise KeyError("No existing object with ID {}".format(_id))

			elif resp.status not in {200, 201}:
				resp = await resp.json()
				raise ConnectionError("Failed to retrieve data from ElasticSearch. Got {}: {}".format(
					resp.get("status"),
					resp.get("error", {})
				))

			else:
				json_response = await resp.json()

				if json_response.get("acknowledged", False):
					return json_response
				assert json_response["result"] == "deleted", "Document was not deleted"
				return json_response


	async def mapping(self, index: str) -> dict:
		"""
		Retrieve mapping definitions for one index.

		:param index: Specified index.
		:type index: str
		:raise Exception: Connection failed.

		Returns:
			dict: Mapping definitions for the index.
		"""
		async with self.request("GET", "{}/_mapping".format(index)) as resp:
			if resp.status != 200:
				raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
			return await resp.json()


	async def get_index_template(self, template_name: str) -> dict:
		"""
		Retrieve ECS Index template for the given template name.

		:param template_name: The name of the ECS template to retrieve.
		:type template_name: str
		:raise Exception: Raised if connection to all server URLs fails.
		:return: ElasticSearch Index template.
		"""
		async with self.request("GET", "_index_template/{}?format=json".format(template_name)) as resp:
			if resp.status != 200:
				raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
			return await resp.json()


	async def put_index_template(self, template_name: str, template: dict) -> dict:
		"""
		Create a new ECS index template.

			:param template_name: The name of ECS template.
			:param template: Body for the request.
			:return: JSON response.
			:raise Exception: Raised if connection to all server URLs fails.
		"""
		async with self.request("PUT", "_index_template/{}?master_timeout=120s".format(template_name), json=template) as resp:
			if resp.status != 200:
				raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))

			return await resp.json()


	async def reindex(self, previous_index, new_index):

		data = {
			"source": {
				"index": previous_index,
			},
			"dest": {
				"index": new_index,
			}
		}
		async with self.request("POST", "_reindex", json=data) as resp:
			if resp.status != 200:
				raise AssertionError(
					"Unexpected response code when reindexing: {}, {}".format(
						resp.status, await resp.text()
					)
				)

			print('------> REINDEX ;=)')
			return await resp.json()


	def upsertor(self, index: str, obj_id=None, version: int = 0):
		return ElasticSearchUpsertor(self, index, obj_id, version)


	async def list(self, index: str, _from: int = 0, size: int = 10000, body: typing.Optional[dict] = None) -> dict:
		"""List data matching the index.

		:param index: Specified index.
		:param _from:  Starting document offset. Defaults to 0.
		:type _from: int
		:param size: The number of hits to return. Defaults to 10000.
		:type size: int
		:param body: An optional request body. Defaults to None.
		:type body: dict

		:return: The query search result.
		:raise Exception: Raised if connection to all server URLs fails.

		"""
		if body is None:
			body = {
				'query': {
					'bool': {
						'must': {
							'match_all': {}
						}
					}
				}
			}

		async with self.request("GET", "{}/_search?size={}&from={}&version=true".format(index, size, _from)) as resp:
			if resp.status != 200:
				raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))

			return await resp.json()


	async def count(self, index) -> int:
		"""
		Get the number of matches for a given index.

		:param index: The specified index.
		:return: The number of matches for a given index.
		:raise Exception: Connection failed.
		"""

		async with self.request("GET", "{}/_count".format(index)) as resp:
			if resp.status != 200:
				raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))

			return await resp.json()


	async def indices(self, search_string=None):
		"""
		Return high-level information about indices in a cluster, including backing indices for data streams.

		:param search_string: A search string. Default to None.
		"""

		async with self.request("GET", "_cat/indices/{}?format=json".format(search_string if search_string is not None else "*")) as resp:
			if resp.status != 200:
				raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
			return await resp.json()


	async def empty_index(self, index, settings=None):
		'''
		Create an empty ECS index.
		'''
		# TODO: There is an option here to specify settings (e.g. shard number, replica number etc) and mappings here

		if settings is None:
			settings = {}

		async with self.request("PUT", index, json=settings) as resp:
			if resp.status != 200:
				raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
			return await resp.json()


	async def put_policy(self, policy_name, settings=None):
		'''
		Create a lifecycle policy.
		'''

		if settings is None:
			settings = {}

		async with self.request("PUT", "_ilm/policy/{}?master_timeout=120s".format(policy_name), json=settings) as resp:
			if resp.status != 200:
				raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))

			return await resp.json()


	async def policies(self):
		"""
		Return high-level information about ILM policies in a cluster, including backing indices for data streams.

		:param search_string: A search string. Default to None.
		"""
		async with self.request("GET", "_ilm/policy") as resp:
			if resp.status != 200:
				raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))

			return await resp.json()

count(index) async ¤

Get the number of matches for a given index.

:param index: The specified index. :return: The number of matches for a given index. :raise Exception: Connection failed.

Source code in asab/storage/elasticsearch.py
async def count(self, index) -> int:
	"""
	Get the number of matches for a given index.

	:param index: The specified index.
	:return: The number of matches for a given index.
	:raise Exception: Connection failed.
	"""

	async with self.request("GET", "{}/_count".format(index)) as resp:
		if resp.status != 200:
			raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))

		return await resp.json()

delete(index, _id=None) async ¤

Delete an entire index or document from that index.

Parameters:

Name Type Description Default
index str

Index to delete.

required
_id

If specified, only document with the ID is deleted.

None

Raises:

Type Description
ConnectionRefusedError

Authorization required (status 401)

KeyError

No existing object with ID

ConnectionError

Unexpected status code

Exception

ClientConnectorError

Returns:

Type Description
dict

The deleted document or message that the entire index was deleted.

Source code in asab/storage/elasticsearch.py
async def delete(self, index: str, _id=None) -> dict:
	"""
	Delete an entire index or document from that index.

	Args:
		index: Index to delete.
		_id: If specified, only document with the ID is deleted.

	Raises:
		ConnectionRefusedError: Authorization required (status 401)
		KeyError: No existing object with ID
		ConnectionError: Unexpected status code
		Exception: ClientConnectorError

	Returns:
		The deleted document or message that the entire index was deleted.
	"""

	if _id:
		path = "{}/_doc/{}?refresh={}".format(index, urllib.parse.quote_plus(_id), self.Refresh)
	else:
		path = "{}".format(index)

	async with self.request("DELETE", path) as resp:
		if resp.status == 404:
			raise KeyError("No existing object with ID {}".format(_id))

		elif resp.status not in {200, 201}:
			resp = await resp.json()
			raise ConnectionError("Failed to retrieve data from ElasticSearch. Got {}: {}".format(
				resp.get("status"),
				resp.get("error", {})
			))

		else:
			json_response = await resp.json()

			if json_response.get("acknowledged", False):
				return json_response
			assert json_response["result"] == "deleted", "Document was not deleted"
			return json_response

empty_index(index, settings=None) async ¤

Create an empty ECS index.

Source code in asab/storage/elasticsearch.py
async def empty_index(self, index, settings=None):
	'''
	Create an empty ECS index.
	'''
	# TODO: There is an option here to specify settings (e.g. shard number, replica number etc) and mappings here

	if settings is None:
		settings = {}

	async with self.request("PUT", index, json=settings) as resp:
		if resp.status != 200:
			raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
		return await resp.json()

get(index, obj_id, decrypt=None) async ¤

Get object by its index and object ID.

Parameters:

Name Type Description Default
index str

Index for the query.

required
obj_id str

ID of the object.

required
decrypt None

Not implemented yet. Defaults to None.

None

Raises:

Type Description
NotImplementedError

Encryption and decryption has not yet been implemented for ECS.

ConnectionError

Connection failed.

ConnectionRefusedError

Authorization required.

KeyError

Object with the ID does not exist.

Returns:

Type Description
dict

The query result.

Source code in asab/storage/elasticsearch.py
async def get(self, index: str, obj_id: str, decrypt=None) -> dict:
	"""
	Get object by its index and object ID.

	Args:
		index (str): Index for the query.
		obj_id (str): ID of the object.
		decrypt (None): Not implemented yet. Defaults to None.

	Raises:
		NotImplementedError: Encryption and decryption has not yet been implemented for ECS.
		ConnectionError: Connection failed.
		ConnectionRefusedError: Authorization required.
		KeyError: Object with the ID does not exist.

	Returns:
		The query result.
	"""
	if decrypt is not None:
		raise NotImplementedError("AES encryption for ElasticSearch not implemented")

	async with self.request("GET", "{}/_doc/{}".format(index, obj_id)) as resp:

		if resp.status not in {200, 201}:
			resp = await resp.json()
			raise ConnectionError("Failed to retrieve data from ElasticSearch. Got {}: {}".format(
				resp.get("status"),
				resp.get("error", {}).get("reason")
			))

		else:
			obj = await resp.json()
			if not obj.get("found"):
				raise KeyError("No existing object with ID {}".format(obj_id))
			ret = obj['_source']
			ret['_v'] = obj['_version']
			ret['_id'] = obj['_id']
			return ret

get_index_template(template_name) async ¤

Retrieve ECS Index template for the given template name.

:param template_name: The name of the ECS template to retrieve. :type template_name: str :raise Exception: Raised if connection to all server URLs fails. :return: ElasticSearch Index template.

Source code in asab/storage/elasticsearch.py
async def get_index_template(self, template_name: str) -> dict:
	"""
	Retrieve ECS Index template for the given template name.

	:param template_name: The name of the ECS template to retrieve.
	:type template_name: str
	:raise Exception: Raised if connection to all server URLs fails.
	:return: ElasticSearch Index template.
	"""
	async with self.request("GET", "_index_template/{}?format=json".format(template_name)) as resp:
		if resp.status != 200:
			raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
		return await resp.json()

indices(search_string=None) async ¤

Return high-level information about indices in a cluster, including backing indices for data streams.

:param search_string: A search string. Default to None.

Source code in asab/storage/elasticsearch.py
async def indices(self, search_string=None):
	"""
	Return high-level information about indices in a cluster, including backing indices for data streams.

	:param search_string: A search string. Default to None.
	"""

	async with self.request("GET", "_cat/indices/{}?format=json".format(search_string if search_string is not None else "*")) as resp:
		if resp.status != 200:
			raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
		return await resp.json()

is_connected() async ¤

Check if the service is connected to ElasticSearch cluster.

Raises:

Type Description
ConnectionError

Connection failed.

Returns:

Name Type Description
bool bool

True if the service is connected.

Source code in asab/storage/elasticsearch.py
async def is_connected(self) -> bool:
	"""
	Check if the service is connected to ElasticSearch cluster.

	Raises:
		ConnectionError: Connection failed.

	Returns:
		bool: True if the service is connected.
	"""
	async with self.request("GET", "") as resp:
		if resp.status not in {200, 201}:
			resp = await resp.json()
			L.error("Failed to connect to ElasticSearch.", struct_data={
				"code": resp.get("status"),
				"reason": resp.get("error", {}).get("reason")
			})
			return False

		else:
			L.info("Connected to ElasticSearch.", struct_data={"urls": self.ServerUrls})
			return True

list(index, _from=0, size=10000, body=None) async ¤

List data matching the index.

:param index: Specified index. :param _from: Starting document offset. Defaults to 0. :type _from: int :param size: The number of hits to return. Defaults to 10000. :type size: int :param body: An optional request body. Defaults to None. :type body: dict

:return: The query search result. :raise Exception: Raised if connection to all server URLs fails.

Source code in asab/storage/elasticsearch.py
async def list(self, index: str, _from: int = 0, size: int = 10000, body: typing.Optional[dict] = None) -> dict:
	"""List data matching the index.

	:param index: Specified index.
	:param _from:  Starting document offset. Defaults to 0.
	:type _from: int
	:param size: The number of hits to return. Defaults to 10000.
	:type size: int
	:param body: An optional request body. Defaults to None.
	:type body: dict

	:return: The query search result.
	:raise Exception: Raised if connection to all server URLs fails.

	"""
	if body is None:
		body = {
			'query': {
				'bool': {
					'must': {
						'match_all': {}
					}
				}
			}
		}

	async with self.request("GET", "{}/_search?size={}&from={}&version=true".format(index, size, _from)) as resp:
		if resp.status != 200:
			raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))

		return await resp.json()

mapping(index) async ¤

Retrieve mapping definitions for one index.

:param index: Specified index. :type index: str :raise Exception: Connection failed.

Returns:

Name Type Description
dict dict

Mapping definitions for the index.

Source code in asab/storage/elasticsearch.py
async def mapping(self, index: str) -> dict:
	"""
	Retrieve mapping definitions for one index.

	:param index: Specified index.
	:type index: str
	:raise Exception: Connection failed.

	Returns:
		dict: Mapping definitions for the index.
	"""
	async with self.request("GET", "{}/_mapping".format(index)) as resp:
		if resp.status != 200:
			raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
		return await resp.json()

policies() async ¤

Return high-level information about ILM policies in a cluster, including backing indices for data streams.

:param search_string: A search string. Default to None.

Source code in asab/storage/elasticsearch.py
async def policies(self):
	"""
	Return high-level information about ILM policies in a cluster, including backing indices for data streams.

	:param search_string: A search string. Default to None.
	"""
	async with self.request("GET", "_ilm/policy") as resp:
		if resp.status != 200:
			raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))

		return await resp.json()

put_index_template(template_name, template) async ¤

Create a new ECS index template.

    :param template_name: The name of ECS template.
    :param template: Body for the request.
    :return: JSON response.
    :raise Exception: Raised if connection to all server URLs fails.
Source code in asab/storage/elasticsearch.py
async def put_index_template(self, template_name: str, template: dict) -> dict:
	"""
	Create a new ECS index template.

		:param template_name: The name of ECS template.
		:param template: Body for the request.
		:return: JSON response.
		:raise Exception: Raised if connection to all server URLs fails.
	"""
	async with self.request("PUT", "_index_template/{}?master_timeout=120s".format(template_name), json=template) as resp:
		if resp.status != 200:
			raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))

		return await resp.json()

put_policy(policy_name, settings=None) async ¤

Create a lifecycle policy.

Source code in asab/storage/elasticsearch.py
async def put_policy(self, policy_name, settings=None):
	'''
	Create a lifecycle policy.
	'''

	if settings is None:
		settings = {}

	async with self.request("PUT", "_ilm/policy/{}?master_timeout=120s".format(policy_name), json=settings) as resp:
		if resp.status != 200:
			raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))

		return await resp.json()

request(method, path, data=None, json=None) async ¤

This method can be used to do a custom call to ElasticSearch like so:

async with self.request("GET", "cluster/_health") as resp: ...

Source code in asab/storage/elasticsearch.py
@contextlib.asynccontextmanager
async def request(self, method, path, data=None, json=None):
	'''
	This method can be used to do a custom call to ElasticSearch like so:

	async with self.request("GET", "cluster/_health") as resp:
		...

	'''
	async with aiohttp.ClientSession() as session:
		for n, url in enumerate(self.ServerUrls, 1):
			try:
				async with session.request(
					method=method,
					url=url + path,
					ssl=self.SSLContext,
					headers=self.Headers,
					data=data,
					json=json,
				) as resp:

					if resp.status == 401:
						raise ConnectionRefusedError("Response code 401: Unauthorized. Provide authorization by specifying either user name and password or api key.")

					yield resp
					return

			except aiohttp.client_exceptions.ClientConnectorError:
				if n == len(self.ServerUrls):
					raise ConnectionError("Failed to connect to '{}'.".format(url))
				else:
					L.warning("Failed to connect to '{}', iterating to another node".format(url))