Task Service
Task service is for managed execution of fire-and-forget, one-off, background tasks.
Task in this context is represented either by a coroutine, asyncio.Future
or asyncio.Task
instance and it is executed in the main event loop.
The result of the task is collected (and discarded) automatically.
When the task raises Exception, it will be printed to the log.
TaskService
is handy for dealing with so called "I/O bound operations", especially for calling asynchronous operations from the synchronous part of code.
I/O bound operations
I/O bound operations refer to tasks that spend more time waiting for input/output operations to complete than they do performing actual computations, e.g., reading or writing large files from/to a disk, making API calls, fetching data from a remote database. The speed of these operations is determined by the speed of the I/O subsystems, such as disk drivers, network interfaces, etc.
Usage of Task Service
The TaskService
is implemented in every ASAB application and accessible as asab.Application.TaskService
.
class MyApp(asab.Application):
async def main(self):
asab.Application.TaskService.schedule(
self.task1(),
self.task2(),
self.task3(),
)
Reference
asab.task.TaskService
Bases: Service
Task service is for managed execution of fire-and-forget, one-off, background tasks.
Task can be a coroutine, asyncio.Future
or asyncio.Task
and it is executed in the main event loop.
The result of the task is collected (and discarded) automatically.
When the task raises Exception, it will be printed to the log.
Source code in asab/task.py
| class TaskService(asab.Service):
"""
Task service is for managed execution of fire-and-forget, one-off, background tasks.
Task can be a coroutine, `asyncio.Future` or `asyncio.Task` and it is executed in the main event loop.
The result of the task is collected (and discarded) automatically.
When the task raises Exception, it will be printed to the log.
"""
def __init__(self, app, service_name="asab.TaskService"):
super().__init__(app, service_name)
self.NewTasks = asyncio.Queue()
self.PendingTasks = set()
self.Main = None
async def initialize(self, app):
self.start()
def start(self):
assert self.Main is None
self.Main = asyncio.ensure_future(self.main())
self.Main.add_done_callback(self._main_task_exited)
async def finalize(self, app):
if self.Main is not None:
task = self.Main
self.Main = None
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
except Exception as e:
L.exception("Error '{}' during task service:".format(e))
for task in list(self.PendingTasks):
task.cancel()
try:
await task
self.PendingTasks.remove(task)
except asyncio.CancelledError:
self.PendingTasks.remove(task)
except Exception as e:
L.exception("Error '{}' during task service:".format(e))
total_tasks = len(self.PendingTasks) + self.NewTasks.qsize()
if total_tasks > 0:
L.warning("{}+{} pending and incomplete tasks".format(len(self.PendingTasks), self.NewTasks.qsize()))
def _main_task_exited(self, ctx):
if self.Main is None:
return
try:
self.Main.result()
except asyncio.CancelledError:
pass
except Exception as e:
L.exception("Error '{}' during task service:".format(e))
self.Main = None
L.warning("Main task exited unexpectedly, restarting ...")
self.start()
def schedule(self, *tasks):
"""
Schedule a task (or tasks) for immediate fire-and-forget execution (e.g. compressing files).
Examples:
```python
app.TaskService.schedule(self._start())
```
"""
for task in tasks:
self.NewTasks.put_nowait(task)
def schedule_threadsafe(self, *tasks):
"""
Schedule a task (or tasks) threadsafe for immediate fire-and-forget execution (e.g. compressing files).
Use this method to schedule task from a thread to be executed in the main thread.
Examples:
```python
app.TaskService.schedule_threadsafe(self._start())
```
"""
for task in tasks:
self.App.Loop.call_soon_threadsafe(self.NewTasks.put_nowait, task)
def run_forever(self, *async_functions):
"""
Schedule an async function (or functions) for immediate fire-and-forget execution.
The function is expected to run forever.
If function exits, the error is logged and the function is restarted.
Function is called without any argument.
Examples:
```python
class MyClass(object):
def __init__(self, app):
...
app.TaskService.run_forever(self.my_forever_method)
async def my_forever_method(self):
while True:
await ...
```
"""
for async_fn in async_functions:
self.NewTasks.put_nowait(
forever(async_fn)
)
async def main(self):
while True:
while self.NewTasks.qsize() > 0:
task = self.NewTasks.get_nowait()
if isinstance(task, typing.Coroutine):
task = asyncio.create_task(task)
self.PendingTasks.add(task)
if len(self.PendingTasks) == 0:
# Block until a new task is scheduled
task = await self.NewTasks.get()
if isinstance(task, typing.Coroutine):
task = asyncio.create_task(task)
self.PendingTasks.add(task)
else:
done, self.PendingTasks = await asyncio.wait(self.PendingTasks, timeout=1.0)
for task in done:
try:
await task
except Exception as e:
L.exception("Error '{}' during task:".format(e))
self.App.PubSub.publish("TaskService.task_done!", task)
|
run_forever(*async_functions)
Schedule an async function (or functions) for immediate fire-and-forget execution.
The function is expected to run forever.
If function exits, the error is logged and the function is restarted.
Function is called without any argument.
Examples:
class MyClass(object):
def __init__(self, app):
...
app.TaskService.run_forever(self.my_forever_method)
async def my_forever_method(self):
while True:
await ...
Source code in asab/task.py
| def run_forever(self, *async_functions):
"""
Schedule an async function (or functions) for immediate fire-and-forget execution.
The function is expected to run forever.
If function exits, the error is logged and the function is restarted.
Function is called without any argument.
Examples:
```python
class MyClass(object):
def __init__(self, app):
...
app.TaskService.run_forever(self.my_forever_method)
async def my_forever_method(self):
while True:
await ...
```
"""
for async_fn in async_functions:
self.NewTasks.put_nowait(
forever(async_fn)
)
|
schedule(*tasks)
Schedule a task (or tasks) for immediate fire-and-forget execution (e.g. compressing files).
Examples:
app.TaskService.schedule(self._start())
Source code in asab/task.py
| def schedule(self, *tasks):
"""
Schedule a task (or tasks) for immediate fire-and-forget execution (e.g. compressing files).
Examples:
```python
app.TaskService.schedule(self._start())
```
"""
for task in tasks:
self.NewTasks.put_nowait(task)
|
schedule_threadsafe(*tasks)
Schedule a task (or tasks) threadsafe for immediate fire-and-forget execution (e.g. compressing files).
Use this method to schedule task from a thread to be executed in the main thread.
Examples:
app.TaskService.schedule_threadsafe(self._start())
Source code in asab/task.py
| def schedule_threadsafe(self, *tasks):
"""
Schedule a task (or tasks) threadsafe for immediate fire-and-forget execution (e.g. compressing files).
Use this method to schedule task from a thread to be executed in the main thread.
Examples:
```python
app.TaskService.schedule_threadsafe(self._start())
```
"""
for task in tasks:
self.App.Loop.call_soon_threadsafe(self.NewTasks.put_nowait, task)
|