Upsertor is an object that works like a pointer to the specified
database and optionally to object id. It is used for inserting new
objects, updating existing objects and deleting them.
u=storage.upsertor("test-collection")
The StorageService.upsertor() method creates an upsertor object associated with the specified collection.
It takes collection as an argument and can have two
parameters obj_id and version, which are used for getting an existing object by its ID and version.
For inserting an object to the collection, use the Upsertor.set() method.
u.set("key","value")
To execute these procedures, simply run the Upsertor.execute() coroutine method,
which commits the upsertor data to the storage and returns the ID of the object.
Since it is a coroutine, it must be awaited.
object_id=awaitu.execute()
The Upsertor.execute() method has optional parameters
custom_data and event_type, which are used for webhook requests.
For getting a single object, use StorageService.get() coroutine method
that takes two arguments collection and obj_id and finds an object by its ID in collection.
When the requested object is not found in the collection, the method
raises KeyError. Remember to handle this exception properly when using
databases in your services and prevent them from crashing!
Note
MongoDB storage service in addition provides a coroutine method get_by() which is used for accessing an object by finding its key-value pair.
We strongly recommend to read the version from the object such as above.
That creates a soft lock on the record. This means that if the object is
updated by other component in meanwhile, your upsertor will fail, and you
should retry the whole operation. The new objects should have a version
set to 0, which is done by default.
After obtaining an upsertor, you can update the object via the
Upsertor.set() coroutine.
For deleting an object from database, use the StorageService.delete() coroutine method which takes arguments collection and obj_id,
deletes the object and returns its ID.
If the option inmemory is set, ASAB will store data in its own memory.
In particular, asab.StorageService is initialized with an attribute InMemoryCollections which is
a dictionary in which all the collections are stored.
Note
You can go through all the databases directly by accessing InMemoryCollections attribute, although we do not
recommend that.
You can use all the methods from the abstract class. MongoDB Storage class provides in addition two methods,
StorageService.get_by() and StorageService.collection().
The method StorageService.get_by() is used in the same way as StorageService.get().
The method collection() is used for accessing the database directly.
It takes collection as the argument and returns motor.motor_asyncio.AsyncIOMotorCollection object, which can be used for calling MongoDB directives.
In order to use encryption, first make sure you have the cryptography package installed. Then specify
the AES Key in the config file.
[asab:storage]aes_key=random_key_string
Note
The AES Key is used as both an encryption and decryption key.
It is recommended to keep it in a separate configuration file
that is not exposed anywhere publicly.
The actual binary AES Key is obtained from the aes_key
specified in the config file by encoding and hashing it using the
standard hashlib
algorithms, so do not worry about the length and type of the key.
The Upsertor.set() method has an
optional boolean parameter encrypt for encrypting the data before they are stored.
Only values of the type bytes can be encrypted. If you want to encrypt other values, encode them first.
message="This is a super secret message!"number=2023message_binary=message.encode("ascii")number_binary=number.encode("ascii")u.set("message",message_binary,encrypt=True)u.set("number",number_binary,encrypt=True)object_id=awaitu.execute()
The StorageService.get() coroutine method has an optional parameter decrypt which takes an iterable object (i.e. a list, tuple, set, ...)
with the names of keys whose values are to be decrypted.
For encrypting data, we use the certified symmetric AES-CBC algorithm.
In fact, the abstract base class StorageServiceABC provides two methods aes_encrypt() and aes_decrypt()
that are called automatically in Upsertor.set() and StorageService.get() methods when
the parameter encrypt or decrypt is specified.
AES-CBC is a mode of operation for the Advanced Encryption Standard
(AES) algorithm that provides confidentiality and integrity for data.
In AES-CBC, the plaintext is divided into blocks of fixed size (usually 128 bits),
and each block is encrypted using the AES algorithm with a secret key.
CBC stands for \"Cipher Block Chaining\" and it is a technique that adds an extra step to the encryption
process to ensure that each ciphertext block depends on the previous one.
This means that any modification to the ciphertext will produce a completely different plaintext after decryption.
The algorithm is a symmetric cipher, which is suitable for encrypting large amounts of data.
It requires much less computation power than asymmetric ciphers and is much more useful for bulk encrypting large amounts of data.
classModule(asab.Module):def__init__(self,app):super().__init__(app)sttype=asab.Config.get('asab:storage','type',fallback=None)ifsttype=='inmemory':from.inmemoryimportStorageServiceself.Service=StorageService(app,"asab.StorageService")elifsttype=='mongodb':from.mongodbimportStorageServiceself.Service=StorageService(app,"asab.StorageService")elifsttype=="elasticsearch":from.elasticsearchimportStorageServiceself.Service=StorageService(app,"asab.StorageService")elifsttypeisNone:L.critical("Missing configuration for [asab:storage] type.")raiseSystemExit("Exit due to a critical configuration error.")else:L.critical("Unknown configuration type '{}' in [asab:storage].".format(sttype))raiseSystemExit("Exit due to a critical configuration error.")
classStorageServiceABC(asab.Service):""" An abstract class for the Storage Service. """def__init__(self,app,service_name):super().__init__(app,service_name)self.WebhookURIs=asab.Config.get("asab:storage:changestream","webhook_uri",fallback="")orNoneifself.WebhookURIsisnotNone:self.WebhookURIs=[uriforuriinre.split(r"\s+",self.WebhookURIs)iflen(uri)>0]try:self.ProactorService=app.get_service("asab.ProactorService")exceptKeyErrorase:raiseException("Storage webhooks require ProactorService")fromeself.WebhookAuth=asab.Config.get("asab:storage:changestream","webhook_auth",fallback="")orNone# Specify a non-empty AES key to enable AES encryption of selected fieldsself._AESKey=asab.Config.get("asab:storage","aes_key",fallback="")iflen(self._AESKey)>0:ifcryptographyisNone:raiseModuleNotFoundError("You are using storage encryption without 'cryptography' installed. ""Please run 'pip install cryptography' ""or install asab with 'storage_encryption' optional dependency.")self._AESKey=hashlib.sha256(self._AESKey.encode("utf-8")).digest()else:self._AESKey=None@abc.abstractmethoddefupsertor(self,collection:str,obj_id=None,version:int=0)->None:""" Create an upsertor object for the specified collection. If updating an existing object, please specify its `obj_id` and also `version` that you need to read from a storage upfront. If `obj_id` is None, we assume that you want to insert a new object and generate its new `obj_id`, `version` should be set to 0 (default) in that case. If you want to insert a new object with a specific `obj_id`, specify `obj_id` and set a version to 0. - If there will be a colliding object already stored in a storage, `execute()` method will fail on `DuplicateError`. Args: collection: Name of collection to work with obj_id: Primary identification of an object in the storage (e.g. primary key) version: Specify a current version of the object and hence prevent byzantine faults. \ You should always read the version from the storage upfront, prior using an upsertor. \ That creates a soft lock on the record. It means that if the object is updated by other \ component in meanwhile, your upsertor will fail and you should retry the whole operation. \ The new objects should have a `version` set to 0. """pass@abc.abstractmethodasyncdefget(self,collection:str,obj_id,decrypt:bool=None)->dict:""" Get object from collection by its ID. Args: collection: Collection to get from. obj_id: Object identification. decrypt: Set of fields to decrypt. Returns: The object retrieved from a storage. Raises: KeyError: Raised if `obj_id` is not found in `collection`. """pass@abc.abstractmethodasyncdefget_by(self,collection:str,key:str,value,decrypt=None)->dict:""" Get object from collection by its key and value. Args: collection: Collection to get from key: Key to filter on value: Value to filter on decrypt: Set of fields to decrypt Returns: The object retrieved from a storage. Raises: KeyError: If object {key: value} not found in `collection` """pass@abc.abstractmethodasyncdefdelete(self,collection:str,obj_id):""" Delete object from collection. Args: collection: Collection to get from obj_id: Object identification Returns: ID of the deleted object. Raises: KeyError: Raised when obj_id cannot be found in collection. """passdefaes_encrypt(self,raw:bytes,iv:bytes=None)->bytes:""" Take an array of bytes and encrypt it using AES-CBC. Args: raw: The data to be encrypted. iv: AES-CBC initialization vector, 16 bytes long. If left empty, a random 16-byte array will be used. Returns: The encrypted data. Raises: TypeError: The data are not in binary format. """block_size=cryptography.hazmat.primitives.ciphers.algorithms.AES.block_sizeifself._AESKeyisNone:raiseRuntimeError("No aes_key specified in asab:storage configuration. ""If you want to use encryption, specify a non-empty aes_key.")ifnotisinstance(raw,bytes):ifisinstance(raw,str):raiseTypeError("String objects must be encoded before encryption")else:raiseTypeError("Only 'bytes' objects can be encrypted")# Pad the value to fit the block sizepadder=cryptography.hazmat.primitives.padding.PKCS7(block_size).padder()padded=padder.update(raw)padded+=padder.finalize()ifivisNone:iv=secrets.token_bytes(block_size//8)algorithm=cryptography.hazmat.primitives.ciphers.algorithms.AES(self._AESKey)mode=cryptography.hazmat.primitives.ciphers.modes.CBC(iv)cipher=cryptography.hazmat.primitives.ciphers.Cipher(algorithm,mode)encryptor=cipher.encryptor()encrypted=ENCRYPTED_PREFIX+iv+(encryptor.update(padded)+encryptor.finalize())returnencrypteddefaes_decrypt(self,encrypted:bytes,_obsolete_padding:bool=False)->bytes:""" Decrypt encrypted data using AES-CBC. Args: encrypted: The encrypted data to decrypt. It must start with b"$aes-cbc$" prefix, followed by one-block-long initialization vector. _obsolete_padding: Back-compat option: Use incorrect old padding method Returns: The decrypted data. """block_size=cryptography.hazmat.primitives.ciphers.algorithms.AES.block_sizeifself._AESKeyisNone:raiseRuntimeError("No aes_key specified in asab:storage configuration. ""If you want to use encryption, specify a non-empty aes_key.")ifnotisinstance(encrypted,bytes):raiseTypeError("Only values of type 'bytes' can be decrypted")# Strip the prefixifnotencrypted.startswith(ENCRYPTED_PREFIX):raiseValueError("Encrypted data must start with {!r} prefix".format(ENCRYPTED_PREFIX))encrypted=encrypted[len(ENCRYPTED_PREFIX):]# Separate the initialization vectoriv,encrypted=encrypted[:block_size//8],encrypted[block_size//8:]algorithm=cryptography.hazmat.primitives.ciphers.algorithms.AES(self._AESKey)mode=cryptography.hazmat.primitives.ciphers.modes.CBC(iv)cipher=cryptography.hazmat.primitives.ciphers.Cipher(algorithm,mode)decryptor=cipher.decryptor()padded=decryptor.update(encrypted)+decryptor.finalize()# Strip paddingif_obsolete_padding:# Back-compat: Incorrect old padding methodraw=padded.rstrip(b"\x00")else:unpadder=cryptography.hazmat.primitives.padding.PKCS7(block_size).unpadder()raw=unpadder.update(padded)raw+=unpadder.finalize()returnrawdefencryption_enabled(self)->bool:""" Check if AESKey is not empty. Returns: True if AESKey is not empty. """returnself._AESKeyisnotNone
defaes_decrypt(self,encrypted:bytes,_obsolete_padding:bool=False)->bytes:""" Decrypt encrypted data using AES-CBC. Args: encrypted: The encrypted data to decrypt. It must start with b"$aes-cbc$" prefix, followed by one-block-long initialization vector. _obsolete_padding: Back-compat option: Use incorrect old padding method Returns: The decrypted data. """block_size=cryptography.hazmat.primitives.ciphers.algorithms.AES.block_sizeifself._AESKeyisNone:raiseRuntimeError("No aes_key specified in asab:storage configuration. ""If you want to use encryption, specify a non-empty aes_key.")ifnotisinstance(encrypted,bytes):raiseTypeError("Only values of type 'bytes' can be decrypted")# Strip the prefixifnotencrypted.startswith(ENCRYPTED_PREFIX):raiseValueError("Encrypted data must start with {!r} prefix".format(ENCRYPTED_PREFIX))encrypted=encrypted[len(ENCRYPTED_PREFIX):]# Separate the initialization vectoriv,encrypted=encrypted[:block_size//8],encrypted[block_size//8:]algorithm=cryptography.hazmat.primitives.ciphers.algorithms.AES(self._AESKey)mode=cryptography.hazmat.primitives.ciphers.modes.CBC(iv)cipher=cryptography.hazmat.primitives.ciphers.Cipher(algorithm,mode)decryptor=cipher.decryptor()padded=decryptor.update(encrypted)+decryptor.finalize()# Strip paddingif_obsolete_padding:# Back-compat: Incorrect old padding methodraw=padded.rstrip(b"\x00")else:unpadder=cryptography.hazmat.primitives.padding.PKCS7(block_size).unpadder()raw=unpadder.update(padded)raw+=unpadder.finalize()returnraw
defaes_encrypt(self,raw:bytes,iv:bytes=None)->bytes:""" Take an array of bytes and encrypt it using AES-CBC. Args: raw: The data to be encrypted. iv: AES-CBC initialization vector, 16 bytes long. If left empty, a random 16-byte array will be used. Returns: The encrypted data. Raises: TypeError: The data are not in binary format. """block_size=cryptography.hazmat.primitives.ciphers.algorithms.AES.block_sizeifself._AESKeyisNone:raiseRuntimeError("No aes_key specified in asab:storage configuration. ""If you want to use encryption, specify a non-empty aes_key.")ifnotisinstance(raw,bytes):ifisinstance(raw,str):raiseTypeError("String objects must be encoded before encryption")else:raiseTypeError("Only 'bytes' objects can be encrypted")# Pad the value to fit the block sizepadder=cryptography.hazmat.primitives.padding.PKCS7(block_size).padder()padded=padder.update(raw)padded+=padder.finalize()ifivisNone:iv=secrets.token_bytes(block_size//8)algorithm=cryptography.hazmat.primitives.ciphers.algorithms.AES(self._AESKey)mode=cryptography.hazmat.primitives.ciphers.modes.CBC(iv)cipher=cryptography.hazmat.primitives.ciphers.Cipher(algorithm,mode)encryptor=cipher.encryptor()encrypted=ENCRYPTED_PREFIX+iv+(encryptor.update(padded)+encryptor.finalize())returnencrypted
@abc.abstractmethodasyncdefdelete(self,collection:str,obj_id):""" Delete object from collection. Args: collection: Collection to get from obj_id: Object identification Returns: ID of the deleted object. Raises: KeyError: Raised when obj_id cannot be found in collection. """pass
@abc.abstractmethodasyncdefget(self,collection:str,obj_id,decrypt:bool=None)->dict:""" Get object from collection by its ID. Args: collection: Collection to get from. obj_id: Object identification. decrypt: Set of fields to decrypt. Returns: The object retrieved from a storage. Raises: KeyError: Raised if `obj_id` is not found in `collection`. """pass
@abc.abstractmethodasyncdefget_by(self,collection:str,key:str,value,decrypt=None)->dict:""" Get object from collection by its key and value. Args: collection: Collection to get from key: Key to filter on value: Value to filter on decrypt: Set of fields to decrypt Returns: The object retrieved from a storage. Raises: KeyError: If object {key: value} not found in `collection` """pass
Create an upsertor object for the specified collection.
If updating an existing object, please specify its obj_id and also version that you need to read from a storage upfront.
If obj_id is None, we assume that you want to insert a new object and generate its new obj_id, version should be set to 0 (default) in that case.
If you want to insert a new object with a specific obj_id, specify obj_id and set a version to 0.
- If there will be a colliding object already stored in a storage, execute() method will fail on DuplicateError.
Parameters:
Name
Type
Description
Default
collection
str
Name of collection to work with
required
obj_id
Primary identification of an object in the storage (e.g. primary key)
None
version
int
Specify a current version of the object and hence prevent byzantine faults. You should always read the version from the storage upfront, prior using an upsertor. That creates a soft lock on the record. It means that if the object is updated by other component in meanwhile, your upsertor will fail and you should retry the whole operation. The new objects should have a version set to 0.
@abc.abstractmethoddefupsertor(self,collection:str,obj_id=None,version:int=0)->None:""" Create an upsertor object for the specified collection. If updating an existing object, please specify its `obj_id` and also `version` that you need to read from a storage upfront. If `obj_id` is None, we assume that you want to insert a new object and generate its new `obj_id`, `version` should be set to 0 (default) in that case. If you want to insert a new object with a specific `obj_id`, specify `obj_id` and set a version to 0. - If there will be a colliding object already stored in a storage, `execute()` method will fail on `DuplicateError`. Args: collection: Name of collection to work with obj_id: Primary identification of an object in the storage (e.g. primary key) version: Specify a current version of the object and hence prevent byzantine faults. \ You should always read the version from the storage upfront, prior using an upsertor. \ That creates a soft lock on the record. It means that if the object is updated by other \ component in meanwhile, your upsertor will fail and you should retry the whole operation. \ The new objects should have a `version` set to 0. """pass
classStorageService(StorageServiceABC):def__init__(self,app,service_name):super().__init__(app,service_name)self.InMemoryCollections={}defupsertor(self,collection:str,obj_id=None,version=0)->InMemoryUpsertor:"""Obtain an in-memory upsertor for given collection and possibly for the specified object. :collection (str): The name of the collection. :obj_id (_type_, optional): The ID of the document to retrieve. Defaults to None. :version (int, optional): The version of the collection. Defaults to 0. Returns: :InMemoryUpsertor: Upsertor for given collection. """returnInMemoryUpsertor(self,collection,obj_id,version)asyncdefget(self,collection:str,obj_id:typing.Union[str,bytes],decrypt=None)->dict:"""Retrieve a document from an in-memory collection by its ID. :collection (str): The name of the collection to retrieve the document from. :obj_id (str | bytes): The ID of the document to retrieve. :decrypt (_type_, optional): A list of field names to decrypt. Defaults to None. Returns: :dict: A dictionary representing the retrieved document. If `decrypt` is not None, the specified fields in the document are decrypted using AES decryption algorithm. """coll=self.InMemoryCollections[collection]data=coll[obj_id]ifdecryptisnotNone:forfieldindecrypt:iffieldindata:data[field]=self.aes_decrypt(data[field])returndataasyncdefget_by(self,collection:str,key:str,value,decrypt=None)->dict:""" Retrieve a document from an in-memory collection by key and value. Not implemented yet. Raises: :NotImplementedError: Not implemented on InMemoryStorage """raiseNotImplementedError()asyncdefdelete(self,collection:str,obj_id):""" Delete a document from an in-memory collection. :param collection: Collection to delete from :param obj_id: Object identification Raises: :KeyError: If `obj_id` not found in `collection` """coll=self.InMemoryCollections[collection]delcoll[obj_id]def_set(self,collection:str,obj_id,obj):try:coll=self.InMemoryCollections[collection]exceptKeyError:coll={}self.InMemoryCollections[collection]=collnobj=coll.setdefault(obj_id,obj)ifnobj!=obj:raiseDuplicateError("Already exists",obj_id)
asyncdefdelete(self,collection:str,obj_id):""" Delete a document from an in-memory collection. :param collection: Collection to delete from :param obj_id: Object identification Raises: :KeyError: If `obj_id` not found in `collection` """coll=self.InMemoryCollections[collection]delcoll[obj_id]
Retrieve a document from an in-memory collection by its ID.
:collection (str): The name of the collection to retrieve the document from.
:obj_id (str | bytes): The ID of the document to retrieve.
:decrypt (type, optional): A list of field names to decrypt. Defaults to None.
Returns:
Type
Description
dict
dict: A dictionary representing the retrieved document.
dict
If decrypt is not None, the specified fields in the document are decrypted using AES decryption algorithm.
asyncdefget(self,collection:str,obj_id:typing.Union[str,bytes],decrypt=None)->dict:"""Retrieve a document from an in-memory collection by its ID. :collection (str): The name of the collection to retrieve the document from. :obj_id (str | bytes): The ID of the document to retrieve. :decrypt (_type_, optional): A list of field names to decrypt. Defaults to None. Returns: :dict: A dictionary representing the retrieved document. If `decrypt` is not None, the specified fields in the document are decrypted using AES decryption algorithm. """coll=self.InMemoryCollections[collection]data=coll[obj_id]ifdecryptisnotNone:forfieldindecrypt:iffieldindata:data[field]=self.aes_decrypt(data[field])returndata
asyncdefget_by(self,collection:str,key:str,value,decrypt=None)->dict:""" Retrieve a document from an in-memory collection by key and value. Not implemented yet. Raises: :NotImplementedError: Not implemented on InMemoryStorage """raiseNotImplementedError()
Obtain an in-memory upsertor for given collection and possibly for the specified object.
:collection (str): The name of the collection.
:obj_id (type, optional): The ID of the document to retrieve. Defaults to None.
:version (int, optional): The version of the collection. Defaults to 0.
defupsertor(self,collection:str,obj_id=None,version=0)->InMemoryUpsertor:"""Obtain an in-memory upsertor for given collection and possibly for the specified object. :collection (str): The name of the collection. :obj_id (_type_, optional): The ID of the document to retrieve. Defaults to None. :version (int, optional): The version of the collection. Defaults to 0. Returns: :InMemoryUpsertor: Upsertor for given collection. """returnInMemoryUpsertor(self,collection,obj_id,version)
classStorageService(StorageServiceABC):''' StorageService for MongoDB. Depends on `pymongo` and `motor`. '''def__init__(self,app,service_name,config_section_name='asab:storage'):super().__init__(app,service_name)# Check the old section and then the new section for uriuri=asab.Config.get(config_section_name,'mongodb_uri',fallback='')iflen(uri)==0:uri=asab.Config.get("mongo",'uri',fallback='')iflen(uri)==0:raiseRuntimeError("No MongoDB URI has been provided.")self.Client=motor.motor_asyncio.AsyncIOMotorClient(uri)# Check the old section and then the new section for database namedb_name=asab.Config.get(config_section_name,'mongodb_database',fallback='')iflen(db_name)==0:db_name=asab.Config.get('mongo','database',fallback='')self.Database=self.Client.get_database(db_name,codec_options=bson.codec_options.CodecOptions(tz_aware=True,tzinfo=datetime.timezone.utc),)assertself.DatabaseisnotNonedefupsertor(self,collection:str,obj_id=None,version=0):returnMongoDBUpsertor(self,collection,obj_id,version)asyncdefget(self,collection:str,obj_id,decrypt=None)->dict:coll=self.Database[collection]ret=awaitcoll.find_one({'_id':obj_id})ifretisNone:raiseKeyError("NOT-FOUND")ifdecryptisnotNone:awaitself._decrypt(ret,fields=decrypt,collection=collection)returnretasyncdefget_by(self,collection:str,key:str,value,decrypt=None)->dict:coll=self.Database[collection]ret=awaitcoll.find_one({key:value})ifretisNone:raiseKeyError("NOT-FOUND")ifdecryptisnotNone:awaitself._decrypt(ret,fields=decrypt,collection=collection)returnretasyncdefcollection(self,collection:str)->motor.motor_asyncio.AsyncIOMotorCollection:""" Get collection. Useful for custom operations. """returnself.Database[collection]asyncdefdelete(self,collection:str,obj_id):coll=self.Database[collection]ret=awaitcoll.find_one_and_delete({'_id':obj_id})ifretisNone:raiseKeyError("NOT-FOUND")returnret['_id']asyncdeflist(self,collection_name:str,_from:int=0,size:int=0,_filter=None,sorts=None):""" Lists all the elements in the collection starting from _from and ending with size (unless the size is 0). """collection=self.Database[collection_name]# Build filterif_filterisNone:query={}else:query={"_id":{"$regex":f"^{_filter}"}}items_cursor=collection.find(query)# Apply sorting if neededifsorts:sort_list=[(field,-1ifdescendingelse1)forfield,descendinginsorts]items_cursor=items_cursor.sort(sort_list)# Apply skip and limitif_from:items_cursor=items_cursor.skip(_from)ifsize:items_cursor=items_cursor.limit(size)asyncforiteminitems_cursor:yielditemasyncdefrename(self,previous_collection_name,new_collection_name):ifprevious_collection_name==new_collection_name:return# No action neededexisting_collections=awaitself.Database.list_collection_names()ifprevious_collection_namenotinexisting_collections:raiseKeyError(f"Collection '{previous_collection_name}' does not exist.")ifnew_collection_nameinexisting_collections:raiseDuplicateError(f"Collection '{new_collection_name}' already exists.")returnawaitself.Database[previous_collection_name].rename(new_collection_name)asyncdefcount(self,collection_name,_filter=None)->int:""" Counts all the elements in the collection. """coll=self.Database[collection_name]count=awaitcoll.count()returncountasyncdefcollections(self,search_string=None):returnawaitself.Database.list_collection_names()asyncdefupdate_by_bulk(self,collection_name:str,documents:list)->dict:""" Writes all the documents defined by _id and _source to the MongoDB as bulk. """bulk=[]coll=self.Database[collection_name]fordocumentindocuments:doc_id=document.get("_id")source=document.get("_source")ifnotdoc_idornotsource:continue# Skip invalid documentsbulk.append(pymongo.UpdateOne({"_id":doc_id},{"$set":source,},upsert=True))returncoll.bulk_write(bulk)asyncdef_decrypt(self,db_obj:dict,fields:typing.Iterable,collection:str):""" Decrypt object fields in-place """re_encrypt_fields={}forfieldinfields:iffieldindb_obj:try:db_obj[field]=self.aes_decrypt(db_obj[field])exceptValueError:db_obj[field]=self.aes_decrypt(db_obj[field],_obsolete_padding=True)re_encrypt_fields[field]=db_obj[field]# Update fields encrypted with flawed padding in previous versions (before #587)ifre_encrypt_fields:upsertor=self.upsertor(collection,db_obj["_id"],db_obj["_v"])fork,vinre_encrypt_fields.items():upsertor.set(k,v,encrypt=True)L.debug("Object encryption updated.",struct_data={"coll":collection,"_id":db_obj["_id"],"fields":list(re_encrypt_fields)})awaitupsertor.execute()
asyncdefcollection(self,collection:str)->motor.motor_asyncio.AsyncIOMotorCollection:""" Get collection. Useful for custom operations. """returnself.Database[collection]
asyncdefcount(self,collection_name,_filter=None)->int:""" Counts all the elements in the collection. """coll=self.Database[collection_name]count=awaitcoll.count()returncount
asyncdeflist(self,collection_name:str,_from:int=0,size:int=0,_filter=None,sorts=None):""" Lists all the elements in the collection starting from _from and ending with size (unless the size is 0). """collection=self.Database[collection_name]# Build filterif_filterisNone:query={}else:query={"_id":{"$regex":f"^{_filter}"}}items_cursor=collection.find(query)# Apply sorting if neededifsorts:sort_list=[(field,-1ifdescendingelse1)forfield,descendinginsorts]items_cursor=items_cursor.sort(sort_list)# Apply skip and limitif_from:items_cursor=items_cursor.skip(_from)ifsize:items_cursor=items_cursor.limit(size)asyncforiteminitems_cursor:yielditem
asyncdefupdate_by_bulk(self,collection_name:str,documents:list)->dict:""" Writes all the documents defined by _id and _source to the MongoDB as bulk. """bulk=[]coll=self.Database[collection_name]fordocumentindocuments:doc_id=document.get("_id")source=document.get("_source")ifnotdoc_idornotsource:continue# Skip invalid documentsbulk.append(pymongo.UpdateOne({"_id":doc_id},{"$set":source,},upsert=True))returncoll.bulk_write(bulk)
classStorageService(StorageServiceABC):""" StorageService for Elastic Search. Depends on `aiohttp` library. """def__init__(self,app,service_name,config_section_name='asab:storage'):super().__init__(app,service_name)self.Refresh=Config.get(config_section_name,'refresh',fallback='true')self.ScrollTimeout=Config.get(config_section_name,'scroll_timeout',fallback='1m')# always check if there is a url in the old config section firsturl=Config.getmultiline(config_section_name,'elasticsearch_url',fallback='')iflen(url)>0:asab.LogObsolete.warning("Do not configure elasticsearch connection in [asab:storage]. Please use [elasticsearch] section with url, username and password parameters.")eliflen(url)==0:url=asab.Config.getmultiline('elasticsearch','url',fallback='')self.ServerUrls=get_url_list(url)iflen(self.ServerUrls)==0:L.error("No ElasticSearch URL has been provided. The application will work without Elasticsearch storage.")return# Authorization: username or API-keyusername=Config.get(config_section_name,'elasticsearch_username')iflen(username)==0:username=Config.get('elasticsearch','username',fallback='')password=Config.get(config_section_name,'elasticsearch_password')iflen(password)==0:password=Config.get('elasticsearch','password',fallback='')api_key=Config.get(config_section_name,'elasticsearch_api_key')iflen(api_key)==0:api_key=Config.get('elasticsearch','api_key',fallback='')# Create headers for requestsself.Headers=build_headers(username,password,api_key)# Build ssl contextifself.ServerUrls[0].startswith('https://'):# check if [asab:storage] section has data for SSL or default to the [elasticsearch] sectionifsection_has_ssl_option(config_section_name):self.SSLContextBuilder=SSLContextBuilder(config_section_name=config_section_name)else:self.SSLContextBuilder=SSLContextBuilder(config_section_name='elasticsearch')self.SSLContext=self.SSLContextBuilder.build(ssl.PROTOCOL_TLS_CLIENT)else:self.SSLContext=None@contextlib.asynccontextmanagerasyncdefrequest(self,method,path,data=None,json=None):''' This method can be used to do a custom call to ElasticSearch like so: async with self.request("GET", "cluster/_health") as resp: ... '''asyncwithaiohttp.ClientSession()assession:forn,urlinenumerate(self.ServerUrls,1):try:asyncwithsession.request(method=method,url=url+path,ssl=self.SSLContext,headers=self.Headers,data=data,json=json,)asresp:ifresp.status==401:raiseConnectionRefusedError("Response code 401: Unauthorized. Provide authorization by specifying either user name and password or api key.")yieldrespreturnexceptaiohttp.client_exceptions.ClientConnectorError:ifn==len(self.ServerUrls):raiseConnectionError("Failed to connect to '{}'.".format(url))asyncdefis_connected(self)->bool:""" Check if the service is connected to ElasticSearch cluster. Raises: ConnectionError: Connection failed. Returns: bool: True if the service is connected. """asyncwithself.request("GET","")asresp:ifresp.statusnotin{200,201}:resp=awaitresp.json()L.error("Failed to connect to ElasticSearch.",struct_data={"code":resp.get("status"),"reason":resp.get("error")})returnFalseelse:L.info("Connected to ElasticSearch.",struct_data={"urls":self.ServerUrls})returnTrueasyncdefget(self,index:str,obj_id:str,decrypt=None)->dict:""" Get object by its index and object ID. Args: index (str): Index for the query. obj_id (str): ID of the object. decrypt (None): Not implemented yet. Defaults to None. Raises: NotImplementedError: Encryption and decryption has not yet been implemented for ECS. ConnectionError: Connection failed. ConnectionRefusedError: Authorization required. KeyError: Object with the ID does not exist. Returns: The query result. """ifdecryptisnotNone:raiseNotImplementedError("AES encryption for ElasticSearch not implemented")asyncwithself.request("GET","{}/_doc/{}".format(index,urllib.parse.quote(obj_id)))asresp:ifresp.statusnotin{200,201,404}:resp=awaitresp.json()raiseConnectionError("Failed to retrieve data from ElasticSearch. Got {}: {}".format(resp.get("status"),resp.get("error")))else:obj=awaitresp.json()ifnotobj.get("found"):returnNoneret=obj['_source']ret['_v']=obj['_version']ret['_id']=obj['_id']returnretasyncdefget_by(self,collection:str,key:str,value,decrypt=None):raiseNotImplementedError("get_by")asyncdefdelete(self,index:str,_id=None)->dict:""" Delete an entire index or document from that index. Args: index: Index to delete. _id: If specified, only document with the ID is deleted. Raises: ConnectionRefusedError: Authorization required (status 401) KeyError: No existing object with ID ConnectionError: Unexpected status code Exception: ClientConnectorError Returns: The deleted document or message that the entire index was deleted. """if_id:path="{}/_doc/{}?refresh={}".format(index,urllib.parse.quote(_id),self.Refresh)else:path="{}".format(index)asyncwithself.request("DELETE",path)asresp:ifresp.status==404:raiseKeyError("No existing object with ID {}".format(_id))elifresp.statusnotin{200,201}:resp=awaitresp.json()raiseConnectionError("Failed to retrieve data from ElasticSearch. Got {}: {}".format(resp.get("status"),resp.get("error",{})))else:json_response=awaitresp.json()ifjson_response.get("acknowledged",False):returnjson_responseassertjson_response["result"]=="deleted","Document was not deleted"returnjson_responseasyncdefmapping(self,index:str)->dict:""" Retrieve mapping definitions for one index. :param index: Specified index. :type index: str :raise Exception: Connection failed. Returns: dict: Mapping definitions for the index. """asyncwithself.request("GET","{}/_mapping".format(index))asresp:ifresp.status!=200:raiseException("Unexpected response code: {}: '{}'".format(resp.status,awaitresp.text()))returnawaitresp.json()asyncdefget_index_template(self,template_name:str)->dict:""" Retrieve ECS Index template for the given template name. :param template_name: The name of the ECS template to retrieve. :type template_name: str :raise Exception: Raised if connection to all server URLs fails. :return: ElasticSearch Index template. """asyncwithself.request("GET","_index_template/{}?format=json".format(template_name))asresp:ifresp.status!=200:raiseException("Unexpected response code: {}: '{}'".format(resp.status,awaitresp.text()))returnawaitresp.json()asyncdefput_index_template(self,template_name:str,template:dict)->dict:""" Create a new ECS index template. :param template_name: The name of ECS template. :param template: Body for the request. :return: JSON response. :raise Exception: Raised if connection to all server URLs fails. """asyncwithself.request("PUT","_index_template/{}?master_timeout=120s".format(template_name),json=template)asresp:ifresp.status!=200:raiseException("Unexpected response code: {}: '{}'".format(resp.status,awaitresp.text()))returnawaitresp.json()asyncdefreindex(self,previous_index,new_index):data={"source":{"index":previous_index,},"dest":{"index":new_index,}}asyncwithself.request("POST","_reindex",json=data)asresp:ifresp.status!=200:raiseAssertionError("Unexpected response code when reindexing: {}, {}".format(resp.status,awaitresp.text()))returnawaitresp.json()defupsertor(self,index:str,obj_id=None,version:int=0):returnElasticSearchUpsertor(self,index,obj_id,version)asyncdeflist(self,index:str,_from:int=0,size:int=10000,body:typing.Optional[dict]=None,last_hit_sort=None,_filter=None,sorts=None)->dict:"""List data matching the index with pagination. :param index: Specified index. :param _from: Starting document offset. Defaults to 0. :type _from: int :param size: The number of hits to return. Defaults to 10000. :type size: int :param body: An optional request body. Defaults to None. :type body: dict :return: The query search result. :raise Exception: Raised if connection to all server URLs fails. """ifbodyisNoneandnot_filter:body={'query':{'bool':{'must':{'match_all':{}}}}}elif_filter:# Apply case-insensitive filtering if _filter is providedbody={'query':{}}body['query']['wildcard']={'_keys':{'value':f"*{_filter.lower()}*",# Case-insensitive wildcard search'case_insensitive':True# Requires ES 7.10+}}ifsorts:body['sort']=[]forfield,descinsorts:order='desc'ifdescelse'asc'body['sort'].append({field:{"order":order}})else:# https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html#search-afterbody['sort']=[{'_id':'asc'}]# Always need a consistent sort order for deep pagination# Use "search_after" for deep pagination when "_from" exceeds 10,000iflast_hit_sort:body['search_after']=last_hit_sortasyncwithself.request("GET","{}/_search?size={}&from={}&version=true".format(index,size,_from),json=body)asresp:ifresp.status!=200:raiseException("Unexpected response code: {}: '{}'".format(resp.status,awaitresp.text()))result=awaitresp.json()last_hit=result['hits']['hits'][-1]ifresult['hits']['hits']elseNonelast_hit_sort=last_hit['sort']iflast_hitelse[]returnresult,last_hit_sortasyncdefcount(self,index,_filter=None)->int:""" Get the number of matches for a given index. :param index: The specified index. :return: The number of matches for a given index. :raise Exception: Connection failed. """if_filter:# Apply case-insensitive filtering if _filter is providedbody={'query':{}}body['query']['wildcard']={'_keys':{'value':f"*{_filter.lower()}*",# Case-insensitive wildcard search'case_insensitive':True# Requires ES 7.10+}}else:body={}asyncwithself.request("GET","{}/_count".format(index),json=body)asresp:ifresp.status!=200:raiseException("Unexpected response code: {}: '{}'".format(resp.status,awaitresp.text()))returnawaitresp.json()asyncdefindices(self,search_string=None):""" Return high-level information about indices in a cluster, including backing indices for data streams. :param search_string: A search string. Default to None. """asyncwithself.request("GET","_cat/indices/{}?format=json&s=index".format(search_stringifsearch_stringisnotNoneelse"*"))asresp:ifresp.status!=200:raiseException("Unexpected response code: {}: '{}'".format(resp.status,awaitresp.text()))res=awaitresp.json()returnresasyncdefempty_index(self,index,settings=None):''' Create an empty ECS index. '''# TODO: There is an option here to specify settings (e.g. shard number, replica number etc) and mappings hereifsettingsisNone:settings={}asyncwithself.request("PUT",index,json=settings)asresp:ifresp.status!=200:raiseException("Unexpected response code: {}: '{}'".format(resp.status,awaitresp.text()))returnawaitresp.json()asyncdefput_policy(self,policy_name,settings=None):''' Create a lifecycle policy. '''ifsettingsisNone:settings={}asyncwithself.request("PUT","_ilm/policy/{}?master_timeout=120s".format(policy_name),json=settings)asresp:ifresp.status!=200:raiseException("Unexpected response code: {}: '{}'".format(resp.status,awaitresp.text()))returnawaitresp.json()asyncdefpolicies(self):""" Return high-level information about ILM policies in a cluster, including backing indices for data streams. :param search_string: A search string. Default to None. """asyncwithself.request("GET","_ilm/policy")asresp:ifresp.status!=200:raiseException("Unexpected response code: {}: '{}'".format(resp.status,awaitresp.text()))returnawaitresp.json()asyncdefupdate_by_bulk(self,index:str,documents:list)->dict:""" Use Elasticsearch bulk API to add/update multiple documents in an index. :param index: The Elasticsearch index to update. :param documents: A list of dictionaries, each containing '_id' and '_source' keys. :raise RuntimeError: If the bulk update request fails. """ifnotdocuments:return0# Construct bulk request payloadbulk_data=""fordocindocuments:doc_id=doc.get("_id")source=doc.get("_source")ifnotdoc_idornotsource:continue# Skip invalid documentsbulk_data+=json.dumps({"update":{"_index":index,"_id":doc_id}})+"\n"bulk_data+=json.dumps({"doc":source,"doc_as_upsert":True})+"\n"asyncwithself.request("POST","_bulk?refresh={}".format(self.Refresh),data=bulk_data.encode("utf-8"))asresp:ifresp.status!=200:raiseRuntimeError(f"Bulk update failed: {resp.status} - {awaitresp.text()}")result=awaitresp.json()# Get how many items were updateditems_updated=0foriteminresult.get("items",[]):forvalueinitem.values():ifvalue.get("status")in{200,201}:items_updated+=1returnitems_updated
asyncdefcount(self,index,_filter=None)->int:""" Get the number of matches for a given index. :param index: The specified index. :return: The number of matches for a given index. :raise Exception: Connection failed. """if_filter:# Apply case-insensitive filtering if _filter is providedbody={'query':{}}body['query']['wildcard']={'_keys':{'value':f"*{_filter.lower()}*",# Case-insensitive wildcard search'case_insensitive':True# Requires ES 7.10+}}else:body={}asyncwithself.request("GET","{}/_count".format(index),json=body)asresp:ifresp.status!=200:raiseException("Unexpected response code: {}: '{}'".format(resp.status,awaitresp.text()))returnawaitresp.json()
asyncdefdelete(self,index:str,_id=None)->dict:""" Delete an entire index or document from that index. Args: index: Index to delete. _id: If specified, only document with the ID is deleted. Raises: ConnectionRefusedError: Authorization required (status 401) KeyError: No existing object with ID ConnectionError: Unexpected status code Exception: ClientConnectorError Returns: The deleted document or message that the entire index was deleted. """if_id:path="{}/_doc/{}?refresh={}".format(index,urllib.parse.quote(_id),self.Refresh)else:path="{}".format(index)asyncwithself.request("DELETE",path)asresp:ifresp.status==404:raiseKeyError("No existing object with ID {}".format(_id))elifresp.statusnotin{200,201}:resp=awaitresp.json()raiseConnectionError("Failed to retrieve data from ElasticSearch. Got {}: {}".format(resp.get("status"),resp.get("error",{})))else:json_response=awaitresp.json()ifjson_response.get("acknowledged",False):returnjson_responseassertjson_response["result"]=="deleted","Document was not deleted"returnjson_response
asyncdefempty_index(self,index,settings=None):''' Create an empty ECS index. '''# TODO: There is an option here to specify settings (e.g. shard number, replica number etc) and mappings hereifsettingsisNone:settings={}asyncwithself.request("PUT",index,json=settings)asresp:ifresp.status!=200:raiseException("Unexpected response code: {}: '{}'".format(resp.status,awaitresp.text()))returnawaitresp.json()
asyncdefget(self,index:str,obj_id:str,decrypt=None)->dict:""" Get object by its index and object ID. Args: index (str): Index for the query. obj_id (str): ID of the object. decrypt (None): Not implemented yet. Defaults to None. Raises: NotImplementedError: Encryption and decryption has not yet been implemented for ECS. ConnectionError: Connection failed. ConnectionRefusedError: Authorization required. KeyError: Object with the ID does not exist. Returns: The query result. """ifdecryptisnotNone:raiseNotImplementedError("AES encryption for ElasticSearch not implemented")asyncwithself.request("GET","{}/_doc/{}".format(index,urllib.parse.quote(obj_id)))asresp:ifresp.statusnotin{200,201,404}:resp=awaitresp.json()raiseConnectionError("Failed to retrieve data from ElasticSearch. Got {}: {}".format(resp.get("status"),resp.get("error")))else:obj=awaitresp.json()ifnotobj.get("found"):returnNoneret=obj['_source']ret['_v']=obj['_version']ret['_id']=obj['_id']returnret
Retrieve ECS Index template for the given template name.
:param template_name: The name of the ECS template to retrieve.
:type template_name: str
:raise Exception: Raised if connection to all server URLs fails.
:return: ElasticSearch Index template.
asyncdefget_index_template(self,template_name:str)->dict:""" Retrieve ECS Index template for the given template name. :param template_name: The name of the ECS template to retrieve. :type template_name: str :raise Exception: Raised if connection to all server URLs fails. :return: ElasticSearch Index template. """asyncwithself.request("GET","_index_template/{}?format=json".format(template_name))asresp:ifresp.status!=200:raiseException("Unexpected response code: {}: '{}'".format(resp.status,awaitresp.text()))returnawaitresp.json()
asyncdefindices(self,search_string=None):""" Return high-level information about indices in a cluster, including backing indices for data streams. :param search_string: A search string. Default to None. """asyncwithself.request("GET","_cat/indices/{}?format=json&s=index".format(search_stringifsearch_stringisnotNoneelse"*"))asresp:ifresp.status!=200:raiseException("Unexpected response code: {}: '{}'".format(resp.status,awaitresp.text()))res=awaitresp.json()returnres
asyncdefis_connected(self)->bool:""" Check if the service is connected to ElasticSearch cluster. Raises: ConnectionError: Connection failed. Returns: bool: True if the service is connected. """asyncwithself.request("GET","")asresp:ifresp.statusnotin{200,201}:resp=awaitresp.json()L.error("Failed to connect to ElasticSearch.",struct_data={"code":resp.get("status"),"reason":resp.get("error")})returnFalseelse:L.info("Connected to ElasticSearch.",struct_data={"urls":self.ServerUrls})returnTrue
:param index: Specified index.
:param _from: Starting document offset. Defaults to 0.
:type _from: int
:param size: The number of hits to return. Defaults to 10000.
:type size: int
:param body: An optional request body. Defaults to None.
:type body: dict
:return: The query search result.
:raise Exception: Raised if connection to all server URLs fails.
asyncdeflist(self,index:str,_from:int=0,size:int=10000,body:typing.Optional[dict]=None,last_hit_sort=None,_filter=None,sorts=None)->dict:"""List data matching the index with pagination. :param index: Specified index. :param _from: Starting document offset. Defaults to 0. :type _from: int :param size: The number of hits to return. Defaults to 10000. :type size: int :param body: An optional request body. Defaults to None. :type body: dict :return: The query search result. :raise Exception: Raised if connection to all server URLs fails. """ifbodyisNoneandnot_filter:body={'query':{'bool':{'must':{'match_all':{}}}}}elif_filter:# Apply case-insensitive filtering if _filter is providedbody={'query':{}}body['query']['wildcard']={'_keys':{'value':f"*{_filter.lower()}*",# Case-insensitive wildcard search'case_insensitive':True# Requires ES 7.10+}}ifsorts:body['sort']=[]forfield,descinsorts:order='desc'ifdescelse'asc'body['sort'].append({field:{"order":order}})else:# https://www.elastic.co/guide/en/elasticsearch/reference/current/paginate-search-results.html#search-afterbody['sort']=[{'_id':'asc'}]# Always need a consistent sort order for deep pagination# Use "search_after" for deep pagination when "_from" exceeds 10,000iflast_hit_sort:body['search_after']=last_hit_sortasyncwithself.request("GET","{}/_search?size={}&from={}&version=true".format(index,size,_from),json=body)asresp:ifresp.status!=200:raiseException("Unexpected response code: {}: '{}'".format(resp.status,awaitresp.text()))result=awaitresp.json()last_hit=result['hits']['hits'][-1]ifresult['hits']['hits']elseNonelast_hit_sort=last_hit['sort']iflast_hitelse[]returnresult,last_hit_sort
asyncdefpolicies(self):""" Return high-level information about ILM policies in a cluster, including backing indices for data streams. :param search_string: A search string. Default to None. """asyncwithself.request("GET","_ilm/policy")asresp:ifresp.status!=200:raiseException("Unexpected response code: {}: '{}'".format(resp.status,awaitresp.text()))returnawaitresp.json()
:param template_name: The name of ECS template.
:param template: Body for the request.
:return: JSON response.
:raise Exception: Raised if connection to all server URLs fails.
asyncdefput_index_template(self,template_name:str,template:dict)->dict:""" Create a new ECS index template. :param template_name: The name of ECS template. :param template: Body for the request. :return: JSON response. :raise Exception: Raised if connection to all server URLs fails. """asyncwithself.request("PUT","_index_template/{}?master_timeout=120s".format(template_name),json=template)asresp:ifresp.status!=200:raiseException("Unexpected response code: {}: '{}'".format(resp.status,awaitresp.text()))returnawaitresp.json()
@contextlib.asynccontextmanagerasyncdefrequest(self,method,path,data=None,json=None):''' This method can be used to do a custom call to ElasticSearch like so: async with self.request("GET", "cluster/_health") as resp: ... '''asyncwithaiohttp.ClientSession()assession:forn,urlinenumerate(self.ServerUrls,1):try:asyncwithsession.request(method=method,url=url+path,ssl=self.SSLContext,headers=self.Headers,data=data,json=json,)asresp:ifresp.status==401:raiseConnectionRefusedError("Response code 401: Unauthorized. Provide authorization by specifying either user name and password or api key.")yieldrespreturnexceptaiohttp.client_exceptions.ClientConnectorError:ifn==len(self.ServerUrls):raiseConnectionError("Failed to connect to '{}'.".format(url))
Use Elasticsearch bulk API to add/update multiple documents in an index.
:param index: The Elasticsearch index to update.
:param documents: A list of dictionaries, each containing '_id' and '_source' keys.
:raise RuntimeError: If the bulk update request fails.
asyncdefupdate_by_bulk(self,index:str,documents:list)->dict:""" Use Elasticsearch bulk API to add/update multiple documents in an index. :param index: The Elasticsearch index to update. :param documents: A list of dictionaries, each containing '_id' and '_source' keys. :raise RuntimeError: If the bulk update request fails. """ifnotdocuments:return0# Construct bulk request payloadbulk_data=""fordocindocuments:doc_id=doc.get("_id")source=doc.get("_source")ifnotdoc_idornotsource:continue# Skip invalid documentsbulk_data+=json.dumps({"update":{"_index":index,"_id":doc_id}})+"\n"bulk_data+=json.dumps({"doc":source,"doc_as_upsert":True})+"\n"asyncwithself.request("POST","_bulk?refresh={}".format(self.Refresh),data=bulk_data.encode("utf-8"))asresp:ifresp.status!=200:raiseRuntimeError(f"Bulk update failed: {resp.status} - {awaitresp.text()}")result=awaitresp.json()# Get how many items were updateditems_updated=0foriteminresult.get("items",[]):forvalueinitem.values():ifvalue.get("status")in{200,201}:items_updated+=1returnitems_updated