Storage Service
ASAB's Storage Service supports data storage in-memory or in dedicated document databases, including
MongoDB and ElasticSearch.
Configuration
First, specify the storage type in the configuration. The options for the storage type are:
inmemory
: Collects data directly in memory
mongodb
: Collects data using MongoDB database. Depends on pymongo and
motor libraries.
elasticsearch
: Collects data using ElasticSearch database. Depends on aiohttp library.
The Storage Service provides a unified interface for accessing and
manipulating collections across multiple database technologies.
[asab:storage]
type=mongodb
For accessing the storage, simply add asab.storage.Module
when initializing and register the
service.
class MyApplication(asab.Application):
async def initialize(self):
self.add_module(asab.storage.Module)
async def main(self):
storage = self.get_service("asab.StorageService")
Manipulation with databases
Upsertor
Upsertor is an object that works like a pointer to the specified
database and optionally to object id. It is used for inserting new
objects, updating existing objects and deleting them.
u = storage.upsertor("test-collection")
The StorageService.upsertor()
method creates an upsertor object associated with the specified collection.
It takes collection
as an argument and can have two
parameters obj_id
and version
, which are used for getting an existing object by its ID and version.
Inserting an object
For inserting an object to the collection, use the Upsertor.set()
method.
To execute these procedures, simply run the Upsertor.execute()
coroutine method,
which commits the upsertor data to the storage and returns the ID of the object.
Since it is a coroutine, it must be awaited.
object_id = await u.execute()
The Upsertor.execute()
method has optional parameters
custom_data
and event_type
, which are used for webhook requests.
object_id = await u.execute(
custom_data= {"foo": "bar"},
event_type="object_created"
)
Getting a single object
For getting a single object, use StorageService.get()
coroutine method
that takes two arguments collection
and obj_id
and finds an object by its ID in collection.
obj = await storage.get(collection="test-collection", obj_id=object_id)
print(obj)
When the requested object is not found in the collection, the method
raises KeyError
. Remember to handle this exception properly when using
databases in your services and prevent them from crashing!
Note
MongoDB storage service in addition provides a coroutine method get_by()
which is used for accessing an object by finding its key-value pair.
obj = await storage.get_by(database="test-collection", key="key", value="value")
Updating an object
For updating an object, first obtain the upsertor specifying its obj_id
and version
.
u = storage.upsertor(
collection="test-collection",
obj_id=object_id,
version=obj['_v']
)
We strongly recommend to read the version from the object such as above.
That creates a soft lock on the record. This means that if the object is
updated by other component in meanwhile, your upsertor will fail, and you
should retry the whole operation. The new objects should have a version
set to 0, which is done by default.
After obtaining an upsertor, you can update the object via the
Upsertor.set()
coroutine.
u.set("key", "new_value")
object_id = await u.execute()
Deleting an object
For deleting an object from database, use the StorageService.delete()
coroutine method which takes arguments collection
and obj_id
,
deletes the object and returns its ID.
deleted_id = await u.delete("test-collection", object_id)
Storing data in memory
If the option inmemory
is set, ASAB will store data in its own memory.
In particular, asab.StorageService
is initialized with an attribute InMemoryCollections
which is
a dictionary in which all the collections are stored.
Note
You can go through all the databases directly by accessing InMemoryCollections
attribute, although we do not
recommend that.
import pprint
storage = self.get_service("asab.StorageService")
pprint.pprint(storage.InMemoryCollections, indent=2)
Storing data in MongoDB
If the option mongodb
is set, ASAB will store data in MongoDB database.
ASAB uses motor library which provides non-blocking MongoDB driver for asyncio.
You can specify the database name and URL for MongoDB in the config file:
[asab:storage]
type=mongodb
[mongo]
uri=mongodb://localhost:27017
database=asabdb
You can use all the methods from the abstract class. MongoDB Storage class provides in addition two methods,
StorageService.get_by()
and StorageService.collection()
.
The method StorageService.get_by()
is used in the same way as StorageService.get()
.
obj = await storage.get_by(database="test-collection", key="key", value="value")
The method collection()
is used for accessing the database directly.
It takes collection
as the argument and returns motor.motor_asyncio.AsyncIOMotorCollection
object, which can be used for calling MongoDB directives.
collection = await storage.collection("test-collection")
cursor = collection.find({})
while await cursor.fetch_next:
data = cursor.next_object()
pprint.pprint(data)
The full list of methods suitable for this object is described in the
official documentation.
Storing data in ElasticSearch
When using ElasticSearch, add configurations for URL, username and
password (also supports api key with api_key
parameter):
[asab:storage]
type=elasticsearch
[elasticsearch]
url=http://localhost:9200/
username=JohnDoe
password=lorem_ipsum_dolor?sit_amet!2023
You can also specify the refreshing parameter
and scroll timeout for ElasticSearch Scroll API.
[asab:storage]
refresh=true
scroll_timeout=1m
ElasticSearch Storage provides in addition other methods for creating index templates, mappings etc (see the Reference section).
Encryption and decryption
Data stored in the database can be encrypted using an algorithm that adheres to the Advanced Encryption Standard (AES).
AES Key settings
In order to use encryption, first make sure you have the cryptography package installed. Then specify
the AES Key in the config file.
[asab:storage]
aes_key=random_key_string
Note
The AES Key is used as both an encryption and decryption key.
It is recommended to keep it in a separate configuration file
that is not exposed anywhere publicly.
The actual binary AES Key is obtained from the aes_key
specified in the config file by encoding and hashing it using the
standard hashlib
algorithms, so do not worry about the length and type of the key.
Encrypting data
The Upsertor.set()
method has an
optional boolean parameter encrypt
for encrypting the data before they are stored.
Only values of the type bytes
can be encrypted. If you want to encrypt other values, encode them first.
message = "This is a super secret message!"
number = 2023
message_binary = message.encode("ascii")
number_binary = number.encode("ascii")
u.set("message", message_binary, encrypt=True)
u.set("number", number_binary, encrypt=True)
object_id = await u.execute()
Decrypting data
The StorageService.get()
coroutine method has an optional parameter decrypt
which takes an iterable
object (i.e. a list, tuple, set, ...)
with the names of keys whose values are to be decrypted.
data = await storage.get(
collection="test-collection",
obj_id=object_id,
decrypt=["message", "number"]
)
If some of the keys to be decrypted are missing in the required document, the method will ignore them and continue.
Note
Data that has been encrypted can be identified by the prefix "$aes-cbc$"
and are stored in a binary format.
Under the hood
For encrypting data, we use the certified symmetric AES-CBC algorithm.
In fact, the abstract base class StorageServiceABC
provides two methods aes_encrypt()
and aes_decrypt()
that are called automatically in Upsertor.set()
and StorageService.get()
methods when
the parameter encrypt
or decrypt
is specified.
AES-CBC is a mode of operation for the Advanced Encryption Standard
(AES) algorithm that provides confidentiality and integrity for data.
In AES-CBC, the plaintext is divided into blocks of fixed size (usually 128 bits),
and each block is encrypted using the AES algorithm with a secret key.
CBC stands for \"Cipher Block Chaining\" and it is a technique that adds an extra step to the encryption
process to ensure that each ciphertext block depends on the previous one.
This means that any modification to the ciphertext will produce a completely different plaintext after decryption.
The algorithm is a symmetric cipher, which is suitable for encrypting large amounts of data.
It requires much less computation power than asymmetric ciphers and is much more useful for bulk encrypting large amounts of data.
Reference
asab.storage.Module
Bases: Module
Source code in asab/storage/__init__.py
| class Module(asab.Module):
def __init__(self, app):
super().__init__(app)
sttype = asab.Config.get('asab:storage', 'type', fallback=None)
if sttype == 'inmemory':
from .inmemory import StorageService
self.Service = StorageService(app, "asab.StorageService")
elif sttype == 'mongodb':
from .mongodb import StorageService
self.Service = StorageService(app, "asab.StorageService")
elif sttype == "elasticsearch":
from .elasticsearch import StorageService
self.Service = StorageService(app, "asab.StorageService")
elif sttype is None:
L.critical("Missing configuration for [asab:storage] type.")
raise SystemExit("Exit due to a critical configuration error.")
else:
L.critical("Unknown configuration type '{}' in [asab:storage].".format(sttype))
raise SystemExit("Exit due to a critical configuration error.")
|
asab.storage.service.StorageServiceABC
Bases: Service
An abstract class for the Storage Service.
Source code in asab/storage/service.py
| class StorageServiceABC(asab.Service):
"""
An abstract class for the Storage Service.
"""
def __init__(self, app, service_name):
super().__init__(app, service_name)
self.WebhookURIs = asab.Config.get("asab:storage:changestream", "webhook_uri", fallback="") or None
if self.WebhookURIs is not None:
self.WebhookURIs = [uri for uri in re.split(r"\s+", self.WebhookURIs) if len(uri) > 0]
try:
self.ProactorService = app.get_service("asab.ProactorService")
except KeyError as e:
raise Exception("Storage webhooks require ProactorService") from e
self.WebhookAuth = asab.Config.get("asab:storage:changestream", "webhook_auth", fallback="") or None
# Specify a non-empty AES key to enable AES encryption of selected fields
self._AESKey = asab.Config.get("asab:storage", "aes_key", fallback="")
if len(self._AESKey) > 0:
if cryptography is None:
raise ModuleNotFoundError(
"You are using storage encryption without 'cryptography' installed. "
"Please run 'pip install cryptography' "
"or install asab with 'storage_encryption' optional dependency.")
self._AESKey = hashlib.sha256(self._AESKey.encode("utf-8")).digest()
else:
self._AESKey = None
@abc.abstractmethod
def upsertor(self, collection: str, obj_id=None, version: int = 0) -> None:
"""
Create an upsertor object for the specified collection.
If updating an existing object, please specify its `obj_id` and also `version` that you need to read from a storage upfront.
If `obj_id` is None, we assume that you want to insert a new object and generate its new `obj_id`, `version` should be set to 0 (default) in that case.
If you want to insert a new object with a specific `obj_id`, specify `obj_id` and set a version to 0.
- If there will be a colliding object already stored in a storage, `execute()` method will fail on `DuplicateError`.
Args:
collection: Name of collection to work with
obj_id: Primary identification of an object in the storage (e.g. primary key)
version: Specify a current version of the object and hence prevent byzantine faults. \
You should always read the version from the storage upfront, prior using an upsertor. \
That creates a soft lock on the record. It means that if the object is updated by other \
component in meanwhile, your upsertor will fail and you should retry the whole operation. \
The new objects should have a `version` set to 0.
"""
pass
@abc.abstractmethod
async def get(self, collection: str, obj_id, decrypt: bool = None) -> dict:
"""
Get object from collection by its ID.
Args:
collection: Collection to get from.
obj_id: Object identification.
decrypt: Set of fields to decrypt.
Returns:
The object retrieved from a storage.
Raises:
KeyError: Raised if `obj_id` is not found in `collection`.
"""
pass
@abc.abstractmethod
async def get_by(self, collection: str, key: str, value, decrypt=None) -> dict:
"""
Get object from collection by its key and value.
Args:
collection: Collection to get from
key: Key to filter on
value: Value to filter on
decrypt: Set of fields to decrypt
Returns:
The object retrieved from a storage.
Raises:
KeyError: If object {key: value} not found in `collection`
"""
pass
@abc.abstractmethod
async def delete(self, collection: str, obj_id):
"""
Delete object from collection.
Args:
collection: Collection to get from
obj_id: Object identification
Returns:
ID of the deleted object.
Raises:
KeyError: Raised when obj_id cannot be found in collection.
"""
pass
def aes_encrypt(self, raw: bytes, iv: bytes = None) -> bytes:
"""
Take an array of bytes and encrypt it using AES-CBC.
Args:
raw: The data to be encrypted.
iv: AES-CBC initialization vector, 16 bytes long. If left empty, a random 16-byte array will be used.
Returns:
The encrypted data.
Raises:
TypeError: The data are not in binary format.
"""
block_size = cryptography.hazmat.primitives.ciphers.algorithms.AES.block_size
if self._AESKey is None:
raise RuntimeError(
"No aes_key specified in asab:storage configuration. "
"If you want to use encryption, specify a non-empty aes_key."
)
if not isinstance(raw, bytes):
if isinstance(raw, str):
raise TypeError("String objects must be encoded before encryption")
else:
raise TypeError("Only 'bytes' objects can be encrypted")
# Pad the value to fit the block size
padder = cryptography.hazmat.primitives.padding.PKCS7(block_size).padder()
padded = padder.update(raw)
padded += padder.finalize()
if iv is None:
iv = secrets.token_bytes(block_size // 8)
algorithm = cryptography.hazmat.primitives.ciphers.algorithms.AES(self._AESKey)
mode = cryptography.hazmat.primitives.ciphers.modes.CBC(iv)
cipher = cryptography.hazmat.primitives.ciphers.Cipher(algorithm, mode)
encryptor = cipher.encryptor()
encrypted = ENCRYPTED_PREFIX + iv + (encryptor.update(padded) + encryptor.finalize())
return encrypted
def aes_decrypt(self, encrypted: bytes, _obsolete_padding: bool = False) -> bytes:
"""
Decrypt encrypted data using AES-CBC.
Args:
encrypted: The encrypted data to decrypt. It must start with b"$aes-cbc$" prefix, followed by one-block-long initialization vector.
_obsolete_padding: Back-compat option: Use incorrect old padding method
Returns:
The decrypted data.
"""
block_size = cryptography.hazmat.primitives.ciphers.algorithms.AES.block_size
if self._AESKey is None:
raise RuntimeError(
"No aes_key specified in asab:storage configuration. "
"If you want to use encryption, specify a non-empty aes_key."
)
if not isinstance(encrypted, bytes):
raise TypeError("Only values of type 'bytes' can be decrypted")
# Strip the prefix
if not encrypted.startswith(ENCRYPTED_PREFIX):
raise ValueError("Encrypted data must start with {!r} prefix".format(ENCRYPTED_PREFIX))
encrypted = encrypted[len(ENCRYPTED_PREFIX):]
# Separate the initialization vector
iv, encrypted = encrypted[:block_size // 8], encrypted[block_size // 8:]
algorithm = cryptography.hazmat.primitives.ciphers.algorithms.AES(self._AESKey)
mode = cryptography.hazmat.primitives.ciphers.modes.CBC(iv)
cipher = cryptography.hazmat.primitives.ciphers.Cipher(algorithm, mode)
decryptor = cipher.decryptor()
padded = decryptor.update(encrypted) + decryptor.finalize()
# Strip padding
if _obsolete_padding:
# Back-compat: Incorrect old padding method
raw = padded.rstrip(b"\x00")
else:
unpadder = cryptography.hazmat.primitives.padding.PKCS7(block_size).unpadder()
raw = unpadder.update(padded)
raw += unpadder.finalize()
return raw
def encryption_enabled(self) -> bool:
"""
Check if AESKey is not empty.
Returns:
True if AESKey is not empty.
"""
return self._AESKey is not None
|
aes_decrypt(encrypted, _obsolete_padding=False)
Decrypt encrypted data using AES-CBC.
Parameters:
Name |
Type |
Description |
Default |
encrypted |
bytes
|
The encrypted data to decrypt. It must start with b"$aes-cbc$" prefix, followed by one-block-long initialization vector.
|
required
|
_obsolete_padding |
bool
|
Back-compat option: Use incorrect old padding method
|
False
|
Returns:
Source code in asab/storage/service.py
| def aes_decrypt(self, encrypted: bytes, _obsolete_padding: bool = False) -> bytes:
"""
Decrypt encrypted data using AES-CBC.
Args:
encrypted: The encrypted data to decrypt. It must start with b"$aes-cbc$" prefix, followed by one-block-long initialization vector.
_obsolete_padding: Back-compat option: Use incorrect old padding method
Returns:
The decrypted data.
"""
block_size = cryptography.hazmat.primitives.ciphers.algorithms.AES.block_size
if self._AESKey is None:
raise RuntimeError(
"No aes_key specified in asab:storage configuration. "
"If you want to use encryption, specify a non-empty aes_key."
)
if not isinstance(encrypted, bytes):
raise TypeError("Only values of type 'bytes' can be decrypted")
# Strip the prefix
if not encrypted.startswith(ENCRYPTED_PREFIX):
raise ValueError("Encrypted data must start with {!r} prefix".format(ENCRYPTED_PREFIX))
encrypted = encrypted[len(ENCRYPTED_PREFIX):]
# Separate the initialization vector
iv, encrypted = encrypted[:block_size // 8], encrypted[block_size // 8:]
algorithm = cryptography.hazmat.primitives.ciphers.algorithms.AES(self._AESKey)
mode = cryptography.hazmat.primitives.ciphers.modes.CBC(iv)
cipher = cryptography.hazmat.primitives.ciphers.Cipher(algorithm, mode)
decryptor = cipher.decryptor()
padded = decryptor.update(encrypted) + decryptor.finalize()
# Strip padding
if _obsolete_padding:
# Back-compat: Incorrect old padding method
raw = padded.rstrip(b"\x00")
else:
unpadder = cryptography.hazmat.primitives.padding.PKCS7(block_size).unpadder()
raw = unpadder.update(padded)
raw += unpadder.finalize()
return raw
|
aes_encrypt(raw, iv=None)
Take an array of bytes and encrypt it using AES-CBC.
Parameters:
Name |
Type |
Description |
Default |
raw |
bytes
|
The data to be encrypted.
|
required
|
iv |
bytes
|
AES-CBC initialization vector, 16 bytes long. If left empty, a random 16-byte array will be used.
|
None
|
Returns:
Raises:
Type |
Description |
TypeError
|
The data are not in binary format.
|
Source code in asab/storage/service.py
| def aes_encrypt(self, raw: bytes, iv: bytes = None) -> bytes:
"""
Take an array of bytes and encrypt it using AES-CBC.
Args:
raw: The data to be encrypted.
iv: AES-CBC initialization vector, 16 bytes long. If left empty, a random 16-byte array will be used.
Returns:
The encrypted data.
Raises:
TypeError: The data are not in binary format.
"""
block_size = cryptography.hazmat.primitives.ciphers.algorithms.AES.block_size
if self._AESKey is None:
raise RuntimeError(
"No aes_key specified in asab:storage configuration. "
"If you want to use encryption, specify a non-empty aes_key."
)
if not isinstance(raw, bytes):
if isinstance(raw, str):
raise TypeError("String objects must be encoded before encryption")
else:
raise TypeError("Only 'bytes' objects can be encrypted")
# Pad the value to fit the block size
padder = cryptography.hazmat.primitives.padding.PKCS7(block_size).padder()
padded = padder.update(raw)
padded += padder.finalize()
if iv is None:
iv = secrets.token_bytes(block_size // 8)
algorithm = cryptography.hazmat.primitives.ciphers.algorithms.AES(self._AESKey)
mode = cryptography.hazmat.primitives.ciphers.modes.CBC(iv)
cipher = cryptography.hazmat.primitives.ciphers.Cipher(algorithm, mode)
encryptor = cipher.encryptor()
encrypted = ENCRYPTED_PREFIX + iv + (encryptor.update(padded) + encryptor.finalize())
return encrypted
|
delete(collection, obj_id)
abstractmethod
async
Delete object from collection.
Parameters:
Name |
Type |
Description |
Default |
collection |
str
|
|
required
|
obj_id |
|
|
required
|
Returns:
Type |
Description |
|
ID of the deleted object.
|
Raises:
Type |
Description |
KeyError
|
Raised when obj_id cannot be found in collection.
|
Source code in asab/storage/service.py
| @abc.abstractmethod
async def delete(self, collection: str, obj_id):
"""
Delete object from collection.
Args:
collection: Collection to get from
obj_id: Object identification
Returns:
ID of the deleted object.
Raises:
KeyError: Raised when obj_id cannot be found in collection.
"""
pass
|
encryption_enabled()
Check if AESKey is not empty.
Returns:
Type |
Description |
bool
|
True if AESKey is not empty.
|
Source code in asab/storage/service.py
| def encryption_enabled(self) -> bool:
"""
Check if AESKey is not empty.
Returns:
True if AESKey is not empty.
"""
return self._AESKey is not None
|
get(collection, obj_id, decrypt=None)
abstractmethod
async
Get object from collection by its ID.
Parameters:
Name |
Type |
Description |
Default |
collection |
str
|
|
required
|
obj_id |
|
|
required
|
decrypt |
bool
|
Set of fields to decrypt.
|
None
|
Returns:
Type |
Description |
dict
|
The object retrieved from a storage.
|
Raises:
Type |
Description |
KeyError
|
Raised if obj_id is not found in collection .
|
Source code in asab/storage/service.py
| @abc.abstractmethod
async def get(self, collection: str, obj_id, decrypt: bool = None) -> dict:
"""
Get object from collection by its ID.
Args:
collection: Collection to get from.
obj_id: Object identification.
decrypt: Set of fields to decrypt.
Returns:
The object retrieved from a storage.
Raises:
KeyError: Raised if `obj_id` is not found in `collection`.
"""
pass
|
get_by(collection, key, value, decrypt=None)
abstractmethod
async
Get object from collection by its key and value.
Parameters:
Name |
Type |
Description |
Default |
collection |
str
|
|
required
|
key |
str
|
|
required
|
value |
|
|
required
|
decrypt |
|
|
None
|
Returns:
Type |
Description |
dict
|
The object retrieved from a storage.
|
Raises:
Type |
Description |
KeyError
|
If object {key: value} not found in collection
|
Source code in asab/storage/service.py
| @abc.abstractmethod
async def get_by(self, collection: str, key: str, value, decrypt=None) -> dict:
"""
Get object from collection by its key and value.
Args:
collection: Collection to get from
key: Key to filter on
value: Value to filter on
decrypt: Set of fields to decrypt
Returns:
The object retrieved from a storage.
Raises:
KeyError: If object {key: value} not found in `collection`
"""
pass
|
upsertor(collection, obj_id=None, version=0)
abstractmethod
Create an upsertor object for the specified collection.
If updating an existing object, please specify its obj_id
and also version
that you need to read from a storage upfront.
If obj_id
is None, we assume that you want to insert a new object and generate its new obj_id
, version
should be set to 0 (default) in that case.
If you want to insert a new object with a specific obj_id
, specify obj_id
and set a version to 0.
- If there will be a colliding object already stored in a storage, execute()
method will fail on DuplicateError
.
Parameters:
Name |
Type |
Description |
Default |
collection |
str
|
Name of collection to work with
|
required
|
obj_id |
|
Primary identification of an object in the storage (e.g. primary key)
|
None
|
version |
int
|
Specify a current version of the object and hence prevent byzantine faults. You should always read the version from the storage upfront, prior using an upsertor. That creates a soft lock on the record. It means that if the object is updated by other component in meanwhile, your upsertor will fail and you should retry the whole operation. The new objects should have a version set to 0.
|
0
|
Source code in asab/storage/service.py
| @abc.abstractmethod
def upsertor(self, collection: str, obj_id=None, version: int = 0) -> None:
"""
Create an upsertor object for the specified collection.
If updating an existing object, please specify its `obj_id` and also `version` that you need to read from a storage upfront.
If `obj_id` is None, we assume that you want to insert a new object and generate its new `obj_id`, `version` should be set to 0 (default) in that case.
If you want to insert a new object with a specific `obj_id`, specify `obj_id` and set a version to 0.
- If there will be a colliding object already stored in a storage, `execute()` method will fail on `DuplicateError`.
Args:
collection: Name of collection to work with
obj_id: Primary identification of an object in the storage (e.g. primary key)
version: Specify a current version of the object and hence prevent byzantine faults. \
You should always read the version from the storage upfront, prior using an upsertor. \
That creates a soft lock on the record. It means that if the object is updated by other \
component in meanwhile, your upsertor will fail and you should retry the whole operation. \
The new objects should have a `version` set to 0.
"""
pass
|
asab.storage.inmemory.StorageService
Bases: StorageServiceABC
Source code in asab/storage/inmemory.py
| class StorageService(StorageServiceABC):
def __init__(self, app, service_name):
super().__init__(app, service_name)
self.InMemoryCollections = {}
def upsertor(self, collection: str, obj_id=None, version=0) -> InMemoryUpsertor:
"""Obtain an in-memory upsertor for given collection and possibly for the specified object.
:collection (str): The name of the collection.
:obj_id (_type_, optional): The ID of the document to retrieve. Defaults to None.
:version (int, optional): The version of the collection. Defaults to 0.
Returns:
:InMemoryUpsertor: Upsertor for given collection.
"""
return InMemoryUpsertor(self, collection, obj_id, version)
async def get(self, collection: str, obj_id: typing.Union[str, bytes], decrypt=None) -> dict:
"""Retrieve a document from an in-memory collection by its ID.
:collection (str): The name of the collection to retrieve the document from.
:obj_id (str | bytes): The ID of the document to retrieve.
:decrypt (_type_, optional): A list of field names to decrypt. Defaults to None.
Returns:
:dict: A dictionary representing the retrieved document.
If `decrypt` is not None, the specified fields in the document are decrypted using AES decryption algorithm.
"""
coll = self.InMemoryCollections[collection]
data = coll[obj_id]
if decrypt is not None:
for field in decrypt:
if field in data:
data[field] = self.aes_decrypt(data[field])
return data
async def get_by(self, collection: str, key: str, value, decrypt=None) -> dict:
"""
Retrieve a document from an in-memory collection by key and value. Not implemented yet.
Raises:
:NotImplementedError: Not implemented on InMemoryStorage
"""
raise NotImplementedError()
async def delete(self, collection: str, obj_id):
"""
Delete a document from an in-memory collection.
:param collection: Collection to delete from
:param obj_id: Object identification
Raises:
:KeyError: If `obj_id` not found in `collection`
"""
coll = self.InMemoryCollections[collection]
del coll[obj_id]
def _set(self, collection: str, obj_id, obj):
try:
coll = self.InMemoryCollections[collection]
except KeyError:
coll = {}
self.InMemoryCollections[collection] = coll
nobj = coll.setdefault(obj_id, obj)
if nobj != obj:
raise DuplicateError("Already exists", obj_id)
|
delete(collection, obj_id)
async
Delete a document from an in-memory collection.
:param collection: Collection to delete from
:param obj_id: Object identification
Raises:
Type |
Description |
|
KeyError: If obj_id not found in collection
|
Source code in asab/storage/inmemory.py
| async def delete(self, collection: str, obj_id):
"""
Delete a document from an in-memory collection.
:param collection: Collection to delete from
:param obj_id: Object identification
Raises:
:KeyError: If `obj_id` not found in `collection`
"""
coll = self.InMemoryCollections[collection]
del coll[obj_id]
|
get(collection, obj_id, decrypt=None)
async
Retrieve a document from an in-memory collection by its ID.
:collection (str): The name of the collection to retrieve the document from.
:obj_id (str | bytes): The ID of the document to retrieve.
:decrypt (type, optional): A list of field names to decrypt. Defaults to None.
Returns:
Type |
Description |
dict
|
dict: A dictionary representing the retrieved document.
|
dict
|
If decrypt is not None, the specified fields in the document are decrypted using AES decryption algorithm.
|
Source code in asab/storage/inmemory.py
| async def get(self, collection: str, obj_id: typing.Union[str, bytes], decrypt=None) -> dict:
"""Retrieve a document from an in-memory collection by its ID.
:collection (str): The name of the collection to retrieve the document from.
:obj_id (str | bytes): The ID of the document to retrieve.
:decrypt (_type_, optional): A list of field names to decrypt. Defaults to None.
Returns:
:dict: A dictionary representing the retrieved document.
If `decrypt` is not None, the specified fields in the document are decrypted using AES decryption algorithm.
"""
coll = self.InMemoryCollections[collection]
data = coll[obj_id]
if decrypt is not None:
for field in decrypt:
if field in data:
data[field] = self.aes_decrypt(data[field])
return data
|
get_by(collection, key, value, decrypt=None)
async
Retrieve a document from an in-memory collection by key and value. Not implemented yet.
Raises:
Type |
Description |
|
NotImplementedError: Not implemented on InMemoryStorage
|
Source code in asab/storage/inmemory.py
| async def get_by(self, collection: str, key: str, value, decrypt=None) -> dict:
"""
Retrieve a document from an in-memory collection by key and value. Not implemented yet.
Raises:
:NotImplementedError: Not implemented on InMemoryStorage
"""
raise NotImplementedError()
|
upsertor(collection, obj_id=None, version=0)
Obtain an in-memory upsertor for given collection and possibly for the specified object.
:collection (str): The name of the collection.
:obj_id (type, optional): The ID of the document to retrieve. Defaults to None.
:version (int, optional): The version of the collection. Defaults to 0.
Returns:
Type |
Description |
InMemoryUpsertor
|
InMemoryUpsertor: Upsertor for given collection.
|
Source code in asab/storage/inmemory.py
| def upsertor(self, collection: str, obj_id=None, version=0) -> InMemoryUpsertor:
"""Obtain an in-memory upsertor for given collection and possibly for the specified object.
:collection (str): The name of the collection.
:obj_id (_type_, optional): The ID of the document to retrieve. Defaults to None.
:version (int, optional): The version of the collection. Defaults to 0.
Returns:
:InMemoryUpsertor: Upsertor for given collection.
"""
return InMemoryUpsertor(self, collection, obj_id, version)
|
asab.storage.mongodb.StorageService
Bases: StorageServiceABC
StorageService for MongoDB. Depends on pymongo
and motor
.
Source code in asab/storage/mongodb.py
| class StorageService(StorageServiceABC):
'''
StorageService for MongoDB. Depends on `pymongo` and `motor`.
'''
def __init__(self, app, service_name, config_section_name='asab:storage'):
super().__init__(app, service_name)
# Check the old section and then the new section for uri
uri = asab.Config.get(config_section_name, 'mongodb_uri', fallback='')
if len(uri) == 0:
uri = asab.Config.get("mongo", 'uri', fallback='')
if len(uri) == 0:
raise RuntimeError("No MongoDB URI has been provided.")
self.Client = motor.motor_asyncio.AsyncIOMotorClient(uri)
# Check the old section and then the new section for database name
db_name = asab.Config.get(config_section_name, 'mongodb_database', fallback='')
if len(db_name) == 0:
db_name = asab.Config.get('mongo', 'database', fallback='')
self.Database = self.Client.get_database(
db_name,
codec_options=bson.codec_options.CodecOptions(tz_aware=True, tzinfo=datetime.timezone.utc),
)
assert self.Database is not None
def upsertor(self, collection: str, obj_id=None, version=0):
return MongoDBUpsertor(self, collection, obj_id, version)
async def get(self, collection: str, obj_id, decrypt=None) -> dict:
coll = self.Database[collection]
ret = await coll.find_one({'_id': obj_id})
if ret is None:
raise KeyError("NOT-FOUND")
if decrypt is not None:
await self._decrypt(ret, fields=decrypt, collection=collection)
return ret
async def get_by(self, collection: str, key: str, value, decrypt=None) -> dict:
coll = self.Database[collection]
ret = await coll.find_one({key: value})
if ret is None:
raise KeyError("NOT-FOUND")
if decrypt is not None:
await self._decrypt(ret, fields=decrypt, collection=collection)
return ret
async def collection(self, collection: str) -> motor.motor_asyncio.AsyncIOMotorCollection:
"""
Get collection. Useful for custom operations.
Args:
collection: Collection to get.
Returns:
`AsyncIOMotorCollection` object connected to the queried database.
Examples:
>>> coll = await storage.collection("test-collection")
>>> cursor = coll.find({})
>>> while await cursor.fetch_next:
... obj = cursor.next_object()
... pprint.pprint(obj)
"""
return self.Database[collection]
async def delete(self, collection: str, obj_id):
coll = self.Database[collection]
ret = await coll.find_one_and_delete({'_id': obj_id})
if ret is None:
raise KeyError("NOT-FOUND")
return ret['_id']
async def _decrypt(self, db_obj: dict, fields: typing.Iterable, collection: str):
"""
Decrypt object fields in-place
"""
re_encrypt_fields = {}
for field in fields:
if field in db_obj:
try:
db_obj[field] = self.aes_decrypt(db_obj[field])
except ValueError:
db_obj[field] = self.aes_decrypt(db_obj[field], _obsolete_padding=True)
re_encrypt_fields[field] = db_obj[field]
# Update fields encrypted with flawed padding in previous versions (before #587)
if re_encrypt_fields:
upsertor = self.upsertor(collection, db_obj["_id"], db_obj["_v"])
for k, v in re_encrypt_fields.items():
upsertor.set(k, v, encrypt=True)
L.debug("Object encryption updated.", struct_data={
"coll": collection, "_id": db_obj["_id"], "fields": list(re_encrypt_fields)})
await upsertor.execute()
|
collection(collection)
async
Get collection. Useful for custom operations.
Parameters:
Name |
Type |
Description |
Default |
collection |
str
|
|
required
|
Returns:
Type |
Description |
AsyncIOMotorCollection
|
AsyncIOMotorCollection object connected to the queried database.
|
Examples:
>>> coll = await storage.collection("test-collection")
>>> cursor = coll.find({})
>>> while await cursor.fetch_next:
... obj = cursor.next_object()
... pprint.pprint(obj)
Source code in asab/storage/mongodb.py
| async def collection(self, collection: str) -> motor.motor_asyncio.AsyncIOMotorCollection:
"""
Get collection. Useful for custom operations.
Args:
collection: Collection to get.
Returns:
`AsyncIOMotorCollection` object connected to the queried database.
Examples:
>>> coll = await storage.collection("test-collection")
>>> cursor = coll.find({})
>>> while await cursor.fetch_next:
... obj = cursor.next_object()
... pprint.pprint(obj)
"""
return self.Database[collection]
|
asab.storage.elasticsearch.StorageService
Bases: StorageServiceABC
StorageService for Elastic Search. Depends on aiohttp
library.
Source code in asab/storage/elasticsearch.py
| class StorageService(StorageServiceABC):
"""
StorageService for Elastic Search. Depends on `aiohttp` library.
"""
def __init__(self, app, service_name, config_section_name='asab:storage'):
super().__init__(app, service_name)
self.Refresh = Config.get(config_section_name, 'refresh', fallback='true')
self.ScrollTimeout = Config.get(config_section_name, 'scroll_timeout', fallback='1m')
# always check if there is a url in the old config section first
url = Config.getmultiline(config_section_name, 'elasticsearch_url', fallback='')
if len(url) > 0:
asab.LogObsolete.warning(
"Do not configure elasticsearch connection in [asab:storage]. Please use [elasticsearch] section with url, username and password parameters."
)
elif len(url) == 0:
url = asab.Config.getmultiline('elasticsearch', 'url', fallback='')
self.ServerUrls = get_url_list(url)
if len(self.ServerUrls) == 0:
raise RuntimeError("No ElasticSearch URL has been provided.")
# Authorization: username or API-key
username = Config.get(config_section_name, 'elasticsearch_username')
if len(username) == 0:
username = Config.get('elasticsearch', 'username', fallback='')
password = Config.get(config_section_name, 'elasticsearch_password')
if len(password) == 0:
password = Config.get('elasticsearch', 'password', fallback='')
api_key = Config.get(config_section_name, 'elasticsearch_api_key')
if len(api_key) == 0:
api_key = Config.get('elasticsearch', 'api_key', fallback='')
# Create headers for requests
self.Headers = build_headers(username, password, api_key)
# Build ssl context
if self.ServerUrls[0].startswith('https://'):
# check if [asab:storage] section has data for SSL or default to the [elasticsearch] section
if section_has_ssl_option(config_section_name):
self.SSLContextBuilder = SSLContextBuilder(config_section_name=config_section_name)
else:
self.SSLContextBuilder = SSLContextBuilder(config_section_name='elasticsearch')
self.SSLContext = self.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT)
else:
self.SSLContext = None
@contextlib.asynccontextmanager
async def request(self, method, path, data=None, json=None):
'''
This method can be used to do a custom call to ElasticSearch like so:
async with self.request("GET", "cluster/_health") as resp:
...
'''
async with aiohttp.ClientSession() as session:
for n, url in enumerate(self.ServerUrls, 1):
try:
async with session.request(
method=method,
url=url + path,
ssl=self.SSLContext,
headers=self.Headers,
data=data,
json=json,
) as resp:
if resp.status == 401:
raise ConnectionRefusedError("Response code 401: Unauthorized. Provide authorization by specifying either user name and password or api key.")
yield resp
return
except aiohttp.client_exceptions.ClientConnectorError:
if n == len(self.ServerUrls):
raise ConnectionError("Failed to connect to '{}'.".format(url))
else:
L.warning("Failed to connect to '{}', iterating to another node".format(url))
async def is_connected(self) -> bool:
"""
Check if the service is connected to ElasticSearch cluster.
Raises:
ConnectionError: Connection failed.
Returns:
bool: True if the service is connected.
"""
async with self.request("GET", "") as resp:
if resp.status not in {200, 201}:
resp = await resp.json()
L.error("Failed to connect to ElasticSearch.", struct_data={
"code": resp.get("status"),
"reason": resp.get("error", {}).get("reason")
})
return False
else:
L.info("Connected to ElasticSearch.", struct_data={"urls": self.ServerUrls})
return True
async def get(self, index: str, obj_id: str, decrypt=None) -> dict:
"""
Get object by its index and object ID.
Args:
index (str): Index for the query.
obj_id (str): ID of the object.
decrypt (None): Not implemented yet. Defaults to None.
Raises:
NotImplementedError: Encryption and decryption has not yet been implemented for ECS.
ConnectionError: Connection failed.
ConnectionRefusedError: Authorization required.
KeyError: Object with the ID does not exist.
Returns:
The query result.
"""
if decrypt is not None:
raise NotImplementedError("AES encryption for ElasticSearch not implemented")
async with self.request("GET", "{}/_doc/{}".format(index, obj_id)) as resp:
if resp.status not in {200, 201}:
resp = await resp.json()
raise ConnectionError("Failed to retrieve data from ElasticSearch. Got {}: {}".format(
resp.get("status"),
resp.get("error", {}).get("reason")
))
else:
obj = await resp.json()
if not obj.get("found"):
raise KeyError("No existing object with ID {}".format(obj_id))
ret = obj['_source']
ret['_v'] = obj['_version']
ret['_id'] = obj['_id']
return ret
async def get_by(self, collection: str, key: str, value, decrypt=None):
raise NotImplementedError("get_by")
async def delete(self, index: str, _id=None) -> dict:
"""
Delete an entire index or document from that index.
Args:
index: Index to delete.
_id: If specified, only document with the ID is deleted.
Raises:
ConnectionRefusedError: Authorization required (status 401)
KeyError: No existing object with ID
ConnectionError: Unexpected status code
Exception: ClientConnectorError
Returns:
The deleted document or message that the entire index was deleted.
"""
if _id:
path = "{}/_doc/{}?refresh={}".format(index, urllib.parse.quote_plus(_id), self.Refresh)
else:
path = "{}".format(index)
async with self.request("DELETE", path) as resp:
if resp.status == 404:
raise KeyError("No existing object with ID {}".format(_id))
elif resp.status not in {200, 201}:
resp = await resp.json()
raise ConnectionError("Failed to retrieve data from ElasticSearch. Got {}: {}".format(
resp.get("status"),
resp.get("error", {})
))
else:
json_response = await resp.json()
if json_response.get("acknowledged", False):
return json_response
assert json_response["result"] == "deleted", "Document was not deleted"
return json_response
async def mapping(self, index: str) -> dict:
"""
Retrieve mapping definitions for one index.
:param index: Specified index.
:type index: str
:raise Exception: Connection failed.
Returns:
dict: Mapping definitions for the index.
"""
async with self.request("GET", "{}/_mapping".format(index)) as resp:
if resp.status != 200:
raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
return await resp.json()
async def get_index_template(self, template_name: str) -> dict:
"""
Retrieve ECS Index template for the given template name.
:param template_name: The name of the ECS template to retrieve.
:type template_name: str
:raise Exception: Raised if connection to all server URLs fails.
:return: ElasticSearch Index template.
"""
async with self.request("GET", "_index_template/{}?format=json".format(template_name)) as resp:
if resp.status != 200:
raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
return await resp.json()
async def put_index_template(self, template_name: str, template: dict) -> dict:
"""
Create a new ECS index template.
:param template_name: The name of ECS template.
:param template: Body for the request.
:return: JSON response.
:raise Exception: Raised if connection to all server URLs fails.
"""
async with self.request("PUT", "_index_template/{}".format(template_name), json=template) as resp:
if resp.status != 200:
raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
return await resp.json()
async def reindex(self, previous_index, new_index):
data = {
"source": {
"index": previous_index,
},
"dest": {
"index": new_index,
}
}
async with self.request("POST", "_reindex", json=data) as resp:
if resp.status != 200:
raise AssertionError(
"Unexpected response code when reindexing: {}, {}".format(
resp.status, await resp.text()
)
)
print('------> REINDEX ;=)')
return await resp.json()
def upsertor(self, index: str, obj_id=None, version: int = 0):
return ElasticSearchUpsertor(self, index, obj_id, version)
async def list(self, index: str, _from: int = 0, size: int = 10000, body: typing.Optional[dict] = None) -> dict:
"""List data matching the index.
:param index: Specified index.
:param _from: Starting document offset. Defaults to 0.
:type _from: int
:param size: The number of hits to return. Defaults to 10000.
:type size: int
:param body: An optional request body. Defaults to None.
:type body: dict
:return: The query search result.
:raise Exception: Raised if connection to all server URLs fails.
"""
if body is None:
body = {
'query': {
'bool': {
'must': {
'match_all': {}
}
}
}
}
async with self.request("GET", "{}/_search?size={}&from={}&version=true".format(index, size, _from)) as resp:
if resp.status != 200:
raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
return await resp.json()
async def count(self, index) -> int:
"""
Get the number of matches for a given index.
:param index: The specified index.
:return: The number of matches for a given index.
:raise Exception: Connection failed.
"""
async with self.request("GET", "{}/_count".format(index)) as resp:
if resp.status != 200:
raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
return await resp.json()
async def indices(self, search_string=None):
"""
Return high-level information about indices in a cluster, including backing indices for data streams.
:param search_string: A search string. Default to None.
"""
async with self.request("GET", "_cat/indices/{}?format=json".format(search_string if search_string is not None else "*")) as resp:
if resp.status != 200:
raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
return await resp.json()
async def empty_index(self, index, settings=None):
'''
Create an empty ECS index.
'''
# TODO: There is an option here to specify settings (e.g. shard number, replica number etc) and mappings here
if settings is None:
settings = {}
async with self.request("PUT", index, json=settings) as resp:
if resp.status != 200:
raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
return await resp.json()
async def put_policy(self, policy_name, settings=None):
'''
Create a lifecycle policy.
'''
if settings is None:
settings = {}
async with self.request("PUT", "_ilm/policy/{}".format(policy_name), json=settings) as resp:
if resp.status != 200:
raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
return await resp.json()
async def policies(self):
"""
Return high-level information about ILM policies in a cluster, including backing indices for data streams.
:param search_string: A search string. Default to None.
"""
async with self.request("GET", "_ilm/policy") as resp:
if resp.status != 200:
raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
return await resp.json()
|
count(index)
async
Get the number of matches for a given index.
:param index: The specified index.
:return: The number of matches for a given index.
:raise Exception: Connection failed.
Source code in asab/storage/elasticsearch.py
| async def count(self, index) -> int:
"""
Get the number of matches for a given index.
:param index: The specified index.
:return: The number of matches for a given index.
:raise Exception: Connection failed.
"""
async with self.request("GET", "{}/_count".format(index)) as resp:
if resp.status != 200:
raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
return await resp.json()
|
delete(index, _id=None)
async
Delete an entire index or document from that index.
Parameters:
Name |
Type |
Description |
Default |
index |
str
|
|
required
|
_id |
|
If specified, only document with the ID is deleted.
|
None
|
Raises:
Type |
Description |
ConnectionRefusedError
|
Authorization required (status 401)
|
KeyError
|
No existing object with ID
|
ConnectionError
|
|
Exception
|
|
Returns:
Type |
Description |
dict
|
The deleted document or message that the entire index was deleted.
|
Source code in asab/storage/elasticsearch.py
| async def delete(self, index: str, _id=None) -> dict:
"""
Delete an entire index or document from that index.
Args:
index: Index to delete.
_id: If specified, only document with the ID is deleted.
Raises:
ConnectionRefusedError: Authorization required (status 401)
KeyError: No existing object with ID
ConnectionError: Unexpected status code
Exception: ClientConnectorError
Returns:
The deleted document or message that the entire index was deleted.
"""
if _id:
path = "{}/_doc/{}?refresh={}".format(index, urllib.parse.quote_plus(_id), self.Refresh)
else:
path = "{}".format(index)
async with self.request("DELETE", path) as resp:
if resp.status == 404:
raise KeyError("No existing object with ID {}".format(_id))
elif resp.status not in {200, 201}:
resp = await resp.json()
raise ConnectionError("Failed to retrieve data from ElasticSearch. Got {}: {}".format(
resp.get("status"),
resp.get("error", {})
))
else:
json_response = await resp.json()
if json_response.get("acknowledged", False):
return json_response
assert json_response["result"] == "deleted", "Document was not deleted"
return json_response
|
empty_index(index, settings=None)
async
Create an empty ECS index.
Source code in asab/storage/elasticsearch.py
| async def empty_index(self, index, settings=None):
'''
Create an empty ECS index.
'''
# TODO: There is an option here to specify settings (e.g. shard number, replica number etc) and mappings here
if settings is None:
settings = {}
async with self.request("PUT", index, json=settings) as resp:
if resp.status != 200:
raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
return await resp.json()
|
get(index, obj_id, decrypt=None)
async
Get object by its index and object ID.
Parameters:
Name |
Type |
Description |
Default |
index |
str
|
|
required
|
obj_id |
str
|
|
required
|
decrypt |
None
|
Not implemented yet. Defaults to None.
|
None
|
Raises:
Type |
Description |
NotImplementedError
|
Encryption and decryption has not yet been implemented for ECS.
|
ConnectionError
|
|
ConnectionRefusedError
|
|
KeyError
|
Object with the ID does not exist.
|
Returns:
Source code in asab/storage/elasticsearch.py
| async def get(self, index: str, obj_id: str, decrypt=None) -> dict:
"""
Get object by its index and object ID.
Args:
index (str): Index for the query.
obj_id (str): ID of the object.
decrypt (None): Not implemented yet. Defaults to None.
Raises:
NotImplementedError: Encryption and decryption has not yet been implemented for ECS.
ConnectionError: Connection failed.
ConnectionRefusedError: Authorization required.
KeyError: Object with the ID does not exist.
Returns:
The query result.
"""
if decrypt is not None:
raise NotImplementedError("AES encryption for ElasticSearch not implemented")
async with self.request("GET", "{}/_doc/{}".format(index, obj_id)) as resp:
if resp.status not in {200, 201}:
resp = await resp.json()
raise ConnectionError("Failed to retrieve data from ElasticSearch. Got {}: {}".format(
resp.get("status"),
resp.get("error", {}).get("reason")
))
else:
obj = await resp.json()
if not obj.get("found"):
raise KeyError("No existing object with ID {}".format(obj_id))
ret = obj['_source']
ret['_v'] = obj['_version']
ret['_id'] = obj['_id']
return ret
|
get_index_template(template_name)
async
Retrieve ECS Index template for the given template name.
:param template_name: The name of the ECS template to retrieve.
:type template_name: str
:raise Exception: Raised if connection to all server URLs fails.
:return: ElasticSearch Index template.
Source code in asab/storage/elasticsearch.py
| async def get_index_template(self, template_name: str) -> dict:
"""
Retrieve ECS Index template for the given template name.
:param template_name: The name of the ECS template to retrieve.
:type template_name: str
:raise Exception: Raised if connection to all server URLs fails.
:return: ElasticSearch Index template.
"""
async with self.request("GET", "_index_template/{}?format=json".format(template_name)) as resp:
if resp.status != 200:
raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
return await resp.json()
|
indices(search_string=None)
async
Return high-level information about indices in a cluster, including backing indices for data streams.
:param search_string: A search string. Default to None.
Source code in asab/storage/elasticsearch.py
| async def indices(self, search_string=None):
"""
Return high-level information about indices in a cluster, including backing indices for data streams.
:param search_string: A search string. Default to None.
"""
async with self.request("GET", "_cat/indices/{}?format=json".format(search_string if search_string is not None else "*")) as resp:
if resp.status != 200:
raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
return await resp.json()
|
is_connected()
async
Check if the service is connected to ElasticSearch cluster.
Raises:
Type |
Description |
ConnectionError
|
|
Returns:
Name | Type |
Description |
bool |
bool
|
True if the service is connected.
|
Source code in asab/storage/elasticsearch.py
| async def is_connected(self) -> bool:
"""
Check if the service is connected to ElasticSearch cluster.
Raises:
ConnectionError: Connection failed.
Returns:
bool: True if the service is connected.
"""
async with self.request("GET", "") as resp:
if resp.status not in {200, 201}:
resp = await resp.json()
L.error("Failed to connect to ElasticSearch.", struct_data={
"code": resp.get("status"),
"reason": resp.get("error", {}).get("reason")
})
return False
else:
L.info("Connected to ElasticSearch.", struct_data={"urls": self.ServerUrls})
return True
|
list(index, _from=0, size=10000, body=None)
async
List data matching the index.
:param index: Specified index.
:param _from: Starting document offset. Defaults to 0.
:type _from: int
:param size: The number of hits to return. Defaults to 10000.
:type size: int
:param body: An optional request body. Defaults to None.
:type body: dict
:return: The query search result.
:raise Exception: Raised if connection to all server URLs fails.
Source code in asab/storage/elasticsearch.py
| async def list(self, index: str, _from: int = 0, size: int = 10000, body: typing.Optional[dict] = None) -> dict:
"""List data matching the index.
:param index: Specified index.
:param _from: Starting document offset. Defaults to 0.
:type _from: int
:param size: The number of hits to return. Defaults to 10000.
:type size: int
:param body: An optional request body. Defaults to None.
:type body: dict
:return: The query search result.
:raise Exception: Raised if connection to all server URLs fails.
"""
if body is None:
body = {
'query': {
'bool': {
'must': {
'match_all': {}
}
}
}
}
async with self.request("GET", "{}/_search?size={}&from={}&version=true".format(index, size, _from)) as resp:
if resp.status != 200:
raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
return await resp.json()
|
mapping(index)
async
Retrieve mapping definitions for one index.
:param index: Specified index.
:type index: str
:raise Exception: Connection failed.
Returns:
Name | Type |
Description |
dict |
dict
|
Mapping definitions for the index.
|
Source code in asab/storage/elasticsearch.py
| async def mapping(self, index: str) -> dict:
"""
Retrieve mapping definitions for one index.
:param index: Specified index.
:type index: str
:raise Exception: Connection failed.
Returns:
dict: Mapping definitions for the index.
"""
async with self.request("GET", "{}/_mapping".format(index)) as resp:
if resp.status != 200:
raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
return await resp.json()
|
policies()
async
Return high-level information about ILM policies in a cluster, including backing indices for data streams.
:param search_string: A search string. Default to None.
Source code in asab/storage/elasticsearch.py
| async def policies(self):
"""
Return high-level information about ILM policies in a cluster, including backing indices for data streams.
:param search_string: A search string. Default to None.
"""
async with self.request("GET", "_ilm/policy") as resp:
if resp.status != 200:
raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
return await resp.json()
|
put_index_template(template_name, template)
async
Create a new ECS index template.
:param template_name: The name of ECS template.
:param template: Body for the request.
:return: JSON response.
:raise Exception: Raised if connection to all server URLs fails.
Source code in asab/storage/elasticsearch.py
| async def put_index_template(self, template_name: str, template: dict) -> dict:
"""
Create a new ECS index template.
:param template_name: The name of ECS template.
:param template: Body for the request.
:return: JSON response.
:raise Exception: Raised if connection to all server URLs fails.
"""
async with self.request("PUT", "_index_template/{}".format(template_name), json=template) as resp:
if resp.status != 200:
raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
return await resp.json()
|
put_policy(policy_name, settings=None)
async
Create a lifecycle policy.
Source code in asab/storage/elasticsearch.py
| async def put_policy(self, policy_name, settings=None):
'''
Create a lifecycle policy.
'''
if settings is None:
settings = {}
async with self.request("PUT", "_ilm/policy/{}".format(policy_name), json=settings) as resp:
if resp.status != 200:
raise Exception("Unexpected response code: {}: '{}'".format(resp.status, await resp.text()))
return await resp.json()
|
request(method, path, data=None, json=None)
async
This method can be used to do a custom call to ElasticSearch like so:
async with self.request("GET", "cluster/_health") as resp:
...
Source code in asab/storage/elasticsearch.py
| @contextlib.asynccontextmanager
async def request(self, method, path, data=None, json=None):
'''
This method can be used to do a custom call to ElasticSearch like so:
async with self.request("GET", "cluster/_health") as resp:
...
'''
async with aiohttp.ClientSession() as session:
for n, url in enumerate(self.ServerUrls, 1):
try:
async with session.request(
method=method,
url=url + path,
ssl=self.SSLContext,
headers=self.Headers,
data=data,
json=json,
) as resp:
if resp.status == 401:
raise ConnectionRefusedError("Response code 401: Unauthorized. Provide authorization by specifying either user name and password or api key.")
yield resp
return
except aiohttp.client_exceptions.ClientConnectorError:
if n == len(self.ServerUrls):
raise ConnectionError("Failed to connect to '{}'.".format(url))
else:
L.warning("Failed to connect to '{}', iterating to another node".format(url))
|