Skip to content

Proactor Service¤

Proactor Service is useful for executing CPU bound operations, e.g., mathematical computations, data transformation, sorting or cryptographic algorithms, which would potentially block the main thread. Proactor service will execute these operations in a ThreadPoolExecutor.

CPU bound operations

CPU bound are tasks where the speed or performance is primarily limited by the processing power of the CPU rather than by I/O or other resources. In a CPU-bound operation, the task spends most of its time performing calculations or processing data rather than waiting for external resources like disk or network to become available.

Example

class MyApp(asab.Application):
    async def initialize(self):
        from asab.proactor import Module
            self.add_module(Module)
            self.ProactorService = self.App.get_service("asab.ProactorService")

    async def main(self):
        result = await self.ProactorService.execute(self.blocking_function)

asab.proactor.service.ProactorService ¤

Bases: Service

Proactor service is useful for running CPU bound operations from asynchronous part of the code that would potentially block the main thread. It allows to run these processes from different threads.

Source code in asab/proactor/service.py
class ProactorService(asab.Service):
	"""
	Proactor service is useful for running CPU bound operations from asynchronous part of the code that would potentially block the main thread.
	It allows to run these processes from different threads.
	"""

	def __init__(self, app, service_name):
		super().__init__(app, service_name)
		self.Loop = app.Loop

		max_workers = asab.Config.get('asab:proactor', 'max_workers')
		try:
			max_workers = int(max_workers)
		except BaseException:
			max_workers = None
		if max_workers <= 0:
			max_workers = None

		self.Executor = concurrent.futures.ThreadPoolExecutor(
			max_workers=max_workers,  # The maximum number of threads that can be used to execute the given calls.
			# If None, ThreadPoolExecutor will determine the number itself based on number of CPU's.
			thread_name_prefix="AsabProactorThread"
		)

		if asab.Config.get('asab:proactor', 'default_executor'):
			self.Loop.set_default_executor(self.Executor)


	# There was the method run, which is obsolete
	def execute(self, func, *args):
		"""
		Execute `func(*args)` in the thread from the Proactor Service pool.
		Return Future or Task that must be awaited and it provides the result of the `func()` call.
		"""
		return self.Loop.run_in_executor(self.Executor, func, *args)


	def schedule(self, func, *args):
		"""
		Execute `func(*args)` in the thread from the Proactor Service pool.
		The result of the future is discarded (using Task Service).
		"""

		future = self.execute(func, *args)
		self.App.TaskService.schedule(future)

execute(func, *args) ¤

Execute func(*args) in the thread from the Proactor Service pool. Return Future or Task that must be awaited and it provides the result of the func() call.

Source code in asab/proactor/service.py
def execute(self, func, *args):
	"""
	Execute `func(*args)` in the thread from the Proactor Service pool.
	Return Future or Task that must be awaited and it provides the result of the `func()` call.
	"""
	return self.Loop.run_in_executor(self.Executor, func, *args)

schedule(func, *args) ¤

Execute func(*args) in the thread from the Proactor Service pool. The result of the future is discarded (using Task Service).

Source code in asab/proactor/service.py
def schedule(self, func, *args):
	"""
	Execute `func(*args)` in the thread from the Proactor Service pool.
	The result of the future is discarded (using Task Service).
	"""

	future = self.execute(func, *args)
	self.App.TaskService.schedule(future)