Storage Service¤
ASAB's Storage Service supports data storage in-memory or in dedicated document databases, including MongoDB and ElasticSearch.
Configuration¤
First, specify the storage type in the configuration. The options for the storage type are:
inmemory
: Collects data directly in memorymongodb
: Collects data using MongoDB database. Depends on pymongo and motor libraries.elasticsearch
: Collects data using ElasticSearch database. Depends on aiohttp library.
The Storage Service provides a unified interface for accessing and manipulating collections across multiple database technologies.
For accessing the storage, simply add asab.storage.Module
when initializing and register the
service.
class MyApplication(asab.Application):
async def initialize(self):
self.add_module(asab.storage.Module)
async def main(self):
storage = self.get_service("asab.StorageService")
Manipulation with databases¤
Upsertor¤
Upsertor is an object that works like a pointer to the specified database and optionally to object id. It is used for inserting new objects, updating existing objects and deleting them.
The StorageService.upsertor()
method creates an upsertor object associated with the specified collection.
It takes collection
as an argument and can have two
parameters obj_id
and version
, which are used for getting an existing object by its ID and version.
Inserting an object¤
For inserting an object to the collection, use the Upsertor.set()
method.
To execute these procedures, simply run the Upsertor.execute()
coroutine method,
which commits the upsertor data to the storage and returns the ID of the object.
Since it is a coroutine, it must be awaited.
The Upsertor.execute()
method has optional parameters
custom_data
and event_type
, which are used for webhook requests.
Getting a single object¤
For getting a single object, use StorageService.get()
coroutine method
that takes two arguments collection
and obj_id
and finds an object by its ID in collection.
When the requested object is not found in the collection, the method
raises KeyError
. Remember to handle this exception properly when using
databases in your services and prevent them from crashing!
Note
MongoDB storage service in addition provides a coroutine method get_by()
which is used for accessing an object by finding its key-value pair.
Updating an object¤
For updating an object, first obtain the upsertor specifying its obj_id
and version
.
We strongly recommend to read the version from the object such as above. That creates a soft lock on the record. This means that if the object is updated by other component in meanwhile, your upsertor will fail, and you should retry the whole operation. The new objects should have a version set to 0, which is done by default.
After obtaining an upsertor, you can update the object via the
Upsertor.set()
coroutine.
Deleting an object¤
For deleting an object from database, use the StorageService.delete()
coroutine method which takes arguments collection
and obj_id
,
deletes the object and returns its ID.
Storing data in memory¤
If the option inmemory
is set, ASAB will store data in its own memory.
In particular, asab.StorageService
is initialized with an attribute InMemoryCollections
which is
a dictionary in which all the collections are stored.
Note
You can go through all the databases directly by accessing InMemoryCollections
attribute, although we do not
recommend that.
Storing data in MongoDB¤
If the option mongodb
is set, ASAB will store data in MongoDB database.
ASAB uses motor library which provides non-blocking MongoDB driver for asyncio.
You can specify the database name and URL for MongoDB in the config file:
You can use all the methods from the abstract class. MongoDB Storage class provides in addition two methods,
StorageService.get_by()
and StorageService.collection()
.
The method StorageService.get_by()
is used in the same way as StorageService.get()
.
The method collection()
is used for accessing the database directly.
It takes collection
as the argument and returns motor.motor_asyncio.AsyncIOMotorCollection
object, which can be used for calling MongoDB directives.
collection = await storage.collection("test-collection")
cursor = collection.find({})
while await cursor.fetch_next:
data = cursor.next_object()
pprint.pprint(data)
The full list of methods suitable for this object is described in the official documentation.
Storing data in ElasticSearch¤
When using ElasticSearch, add configurations for URL, username and
password (also supports api key with api_key
parameter):
[asab:storage]
type=elasticsearch
[elasticsearch]
url=http://localhost:9200/
username=JohnDoe
password=lorem_ipsum_dolor?sit_amet!2023
You can also specify the refreshing parameter and scroll timeout for ElasticSearch Scroll API.
ElasticSearch Storage provides in addition other methods for creating index templates, mappings etc (see the Reference section).
Encryption and decryption¤
Data stored in the database can be encrypted using an algorithm that adheres to the Advanced Encryption Standard (AES).
AES Key settings¤
In order to use encryption, first make sure you have the cryptography package installed. Then specify the AES Key in the config file.
Note
The AES Key is used as both an encryption and decryption key. It is recommended to keep it in a separate configuration file that is not exposed anywhere publicly.
The actual binary AES Key is obtained from the aes_key
specified in the config file by encoding and hashing it using the
standard hashlib
algorithms, so do not worry about the length and type of the key.
Encrypting data¤
The Upsertor.set()
method has an
optional boolean parameter encrypt
for encrypting the data before they are stored.
Only values of the type bytes
can be encrypted. If you want to encrypt other values, encode them first.
message = "This is a super secret message!"
number = 2023
message_binary = message.encode("ascii")
number_binary = number.encode("ascii")
u.set("message", message_binary, encrypt=True)
u.set("number", number_binary, encrypt=True)
object_id = await u.execute()
Decrypting data¤
The StorageService.get()
coroutine method has an optional parameter decrypt
which takes an iterable
object (i.e. a list, tuple, set, ...)
with the names of keys whose values are to be decrypted.
data = await storage.get(
collection="test-collection",
obj_id=object_id,
decrypt=["message", "number"]
)
If some of the keys to be decrypted are missing in the required document, the method will ignore them and continue.
Note
Data that has been encrypted can be identified by the prefix "$aes-cbc$"
and are stored in a binary format.
Under the hood¤
For encrypting data, we use the certified symmetric AES-CBC algorithm.
In fact, the abstract base class StorageServiceABC
provides two methods aes_encrypt()
and aes_decrypt()
that are called automatically in Upsertor.set()
and StorageService.get()
methods when
the parameter encrypt
or decrypt
is specified.
AES-CBC is a mode of operation for the Advanced Encryption Standard (AES) algorithm that provides confidentiality and integrity for data. In AES-CBC, the plaintext is divided into blocks of fixed size (usually 128 bits), and each block is encrypted using the AES algorithm with a secret key.
CBC stands for \"Cipher Block Chaining\" and it is a technique that adds an extra step to the encryption process to ensure that each ciphertext block depends on the previous one. This means that any modification to the ciphertext will produce a completely different plaintext after decryption.
The algorithm is a symmetric cipher, which is suitable for encrypting large amounts of data. It requires much less computation power than asymmetric ciphers and is much more useful for bulk encrypting large amounts of data.
Reference¤
asab.storage.Module
¤
Bases: Module
Source code in asab/storage/__init__.py
asab.storage.service.StorageServiceABC
¤
Bases: Service
An abstract class for the Storage Service.
Source code in asab/storage/service.py
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
|
aes_decrypt(encrypted, _obsolete_padding=False)
¤
Decrypt encrypted data using AES-CBC.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
encrypted
|
bytes
|
The encrypted data to decrypt. It must start with b"$aes-cbc$" prefix, followed by one-block-long initialization vector. |
required |
_obsolete_padding
|
bool
|
Back-compat option: Use incorrect old padding method |
False
|
Returns:
Type | Description |
---|---|
bytes
|
The decrypted data. |
Source code in asab/storage/service.py
aes_encrypt(raw, iv=None)
¤
Take an array of bytes and encrypt it using AES-CBC.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
raw
|
bytes
|
The data to be encrypted. |
required |
iv
|
bytes
|
AES-CBC initialization vector, 16 bytes long. If left empty, a random 16-byte array will be used. |
None
|
Returns:
Type | Description |
---|---|
bytes
|
The encrypted data. |
Raises:
Type | Description |
---|---|
TypeError
|
The data are not in binary format. |
Source code in asab/storage/service.py
delete(collection, obj_id)
abstractmethod
async
¤
Delete object from collection.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
collection
|
str
|
Collection to get from |
required |
obj_id
|
Object identification |
required |
Returns:
Type | Description |
---|---|
ID of the deleted object. |
Raises:
Type | Description |
---|---|
KeyError
|
Raised when obj_id cannot be found in collection. |
Source code in asab/storage/service.py
encryption_enabled()
¤
Check if AESKey is not empty.
Returns:
Type | Description |
---|---|
bool
|
True if AESKey is not empty. |
get(collection, obj_id, decrypt=None)
abstractmethod
async
¤
Get object from collection by its ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
collection
|
str
|
Collection to get from. |
required |
obj_id
|
Object identification. |
required | |
decrypt
|
bool
|
Set of fields to decrypt. |
None
|
Returns:
Type | Description |
---|---|
dict
|
The object retrieved from a storage. |
Raises:
Type | Description |
---|---|
KeyError
|
Raised if |
Source code in asab/storage/service.py
get_by(collection, key, value, decrypt=None)
abstractmethod
async
¤
Get object from collection by its key and value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
collection
|
str
|
Collection to get from |
required |
key
|
str
|
Key to filter on |
required |
value
|
Value to filter on |
required | |
decrypt
|
Set of fields to decrypt |
None
|
Returns:
Type | Description |
---|---|
dict
|
The object retrieved from a storage. |
Raises:
Type | Description |
---|---|
KeyError
|
If object {key: value} not found in |
Source code in asab/storage/service.py
upsertor(collection, obj_id=None, version=0)
abstractmethod
¤
Create an upsertor object for the specified collection.
If updating an existing object, please specify its obj_id
and also version
that you need to read from a storage upfront.
If obj_id
is None, we assume that you want to insert a new object and generate its new obj_id
, version
should be set to 0 (default) in that case.
If you want to insert a new object with a specific obj_id
, specify obj_id
and set a version to 0.
- If there will be a colliding object already stored in a storage, execute()
method will fail on DuplicateError
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
collection
|
str
|
Name of collection to work with |
required |
obj_id
|
Primary identification of an object in the storage (e.g. primary key) |
None
|
|
version
|
int
|
Specify a current version of the object and hence prevent byzantine faults. You should always read the version from the storage upfront, prior using an upsertor. That creates a soft lock on the record. It means that if the object is updated by other component in meanwhile, your upsertor will fail and you should retry the whole operation. The new objects should have a |
0
|
Source code in asab/storage/service.py
asab.storage.inmemory.StorageService
¤
Bases: StorageServiceABC
Source code in asab/storage/inmemory.py
delete(collection, obj_id)
async
¤
Delete a document from an in-memory collection.
:param collection: Collection to delete from :param obj_id: Object identification
Raises:
Type | Description |
---|---|
KeyError: If |
Source code in asab/storage/inmemory.py
get(collection, obj_id, decrypt=None)
async
¤
Retrieve a document from an in-memory collection by its ID.
:collection (str): The name of the collection to retrieve the document from. :obj_id (str | bytes): The ID of the document to retrieve. :decrypt (type, optional): A list of field names to decrypt. Defaults to None.
Returns:
Type | Description |
---|---|
dict
|
dict: A dictionary representing the retrieved document. |
dict
|
If |
Source code in asab/storage/inmemory.py
get_by(collection, key, value, decrypt=None)
async
¤
Retrieve a document from an in-memory collection by key and value. Not implemented yet.
Raises:
Type | Description |
---|---|
NotImplementedError: Not implemented on InMemoryStorage |
Source code in asab/storage/inmemory.py
upsertor(collection, obj_id=None, version=0)
¤
Obtain an in-memory upsertor for given collection and possibly for the specified object.
:collection (str): The name of the collection. :obj_id (type, optional): The ID of the document to retrieve. Defaults to None. :version (int, optional): The version of the collection. Defaults to 0.
Returns:
Type | Description |
---|---|
InMemoryUpsertor
|
InMemoryUpsertor: Upsertor for given collection. |
Source code in asab/storage/inmemory.py
asab.storage.mongodb.StorageService
¤
Bases: StorageServiceABC
StorageService for MongoDB. Depends on pymongo
and motor
.
Source code in asab/storage/mongodb.py
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
|
collection(collection)
async
¤
Get collection. Useful for custom operations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
collection
|
str
|
Collection to get. |
required |
Returns:
Type | Description |
---|---|
AsyncIOMotorCollection
|
|
Examples:
>>> coll = await storage.collection("test-collection")
>>> cursor = coll.find({})
>>> while await cursor.fetch_next:
... obj = cursor.next_object()
... pprint.pprint(obj)
Source code in asab/storage/mongodb.py
asab.storage.elasticsearch.StorageService
¤
Bases: StorageServiceABC
StorageService for Elastic Search. Depends on aiohttp
library.
Source code in asab/storage/elasticsearch.py
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 |
|
count(index)
async
¤
Get the number of matches for a given index.
:param index: The specified index. :return: The number of matches for a given index. :raise Exception: Connection failed.
Source code in asab/storage/elasticsearch.py
delete(index, _id=None)
async
¤
Delete an entire index or document from that index.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
index
|
str
|
Index to delete. |
required |
_id
|
If specified, only document with the ID is deleted. |
None
|
Raises:
Type | Description |
---|---|
ConnectionRefusedError
|
Authorization required (status 401) |
KeyError
|
No existing object with ID |
ConnectionError
|
Unexpected status code |
Exception
|
ClientConnectorError |
Returns:
Type | Description |
---|---|
dict
|
The deleted document or message that the entire index was deleted. |
Source code in asab/storage/elasticsearch.py
empty_index(index, settings=None)
async
¤
Create an empty ECS index.
Source code in asab/storage/elasticsearch.py
get(index, obj_id, decrypt=None)
async
¤
Get object by its index and object ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
index
|
str
|
Index for the query. |
required |
obj_id
|
str
|
ID of the object. |
required |
decrypt
|
None
|
Not implemented yet. Defaults to None. |
None
|
Raises:
Type | Description |
---|---|
NotImplementedError
|
Encryption and decryption has not yet been implemented for ECS. |
ConnectionError
|
Connection failed. |
ConnectionRefusedError
|
Authorization required. |
KeyError
|
Object with the ID does not exist. |
Returns:
Type | Description |
---|---|
dict
|
The query result. |
Source code in asab/storage/elasticsearch.py
get_index_template(template_name)
async
¤
Retrieve ECS Index template for the given template name.
:param template_name: The name of the ECS template to retrieve. :type template_name: str :raise Exception: Raised if connection to all server URLs fails. :return: ElasticSearch Index template.
Source code in asab/storage/elasticsearch.py
indices(search_string=None)
async
¤
Return high-level information about indices in a cluster, including backing indices for data streams.
:param search_string: A search string. Default to None.
Source code in asab/storage/elasticsearch.py
is_connected()
async
¤
Check if the service is connected to ElasticSearch cluster.
Raises:
Type | Description |
---|---|
ConnectionError
|
Connection failed. |
Returns:
Name | Type | Description |
---|---|---|
bool |
bool
|
True if the service is connected. |
Source code in asab/storage/elasticsearch.py
list(index, _from=0, size=10000, body=None)
async
¤
List data matching the index.
:param index: Specified index. :param _from: Starting document offset. Defaults to 0. :type _from: int :param size: The number of hits to return. Defaults to 10000. :type size: int :param body: An optional request body. Defaults to None. :type body: dict
:return: The query search result. :raise Exception: Raised if connection to all server URLs fails.
Source code in asab/storage/elasticsearch.py
mapping(index)
async
¤
Retrieve mapping definitions for one index.
:param index: Specified index. :type index: str :raise Exception: Connection failed.
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
Mapping definitions for the index. |
Source code in asab/storage/elasticsearch.py
policies()
async
¤
Return high-level information about ILM policies in a cluster, including backing indices for data streams.
:param search_string: A search string. Default to None.
Source code in asab/storage/elasticsearch.py
put_index_template(template_name, template)
async
¤
Create a new ECS index template.
:param template_name: The name of ECS template.
:param template: Body for the request.
:return: JSON response.
:raise Exception: Raised if connection to all server URLs fails.
Source code in asab/storage/elasticsearch.py
put_policy(policy_name, settings=None)
async
¤
Create a lifecycle policy.
Source code in asab/storage/elasticsearch.py
request(method, path, data=None, json=None)
async
¤
This method can be used to do a custom call to ElasticSearch like so:
async with self.request("GET", "cluster/_health") as resp: ...