Publish-Subscribe¤
Publish-subscribe is a messaging pattern where senders of messages, called publishers, send the messages to receivers, called subscribers, via the PubSub message bus.
Publishers don't directly interact with subscribers in any way. Similarly, subscribers express interest in one or more message types and only receive messages that are of interest, without knowledge of which publishers, if any, there are.
The ASAB PubSub
module operates with a simple messages, defined by their message type, which is a string.
The message can carry optional positional and keyword arguments.
The delivery of a message is implemented as a the standard Python function.
Note
We recommend to add !
(an exclamation mark) at the end of the message type in order to distinguish this object from
other types such as Python class names or functions.
Examples:
-
Application.run!
-
Application.tick/600!
-
Message.received!
Note
There is a default, application-wide Publish-Subscribe message
bus at Application.PubSub
that can be used to send messages.
Alternatively, you can create your own instance of PubSub
and enjoy isolated PubSub delivery space.
Subscription¤
The method PubSub.subscribe()
subscribes to a message type. Messages will be delivered to a callback
callable (function or method).
The callback
can be a standard callable or an async
coroutine.
Asynchronous callback
means that the delivery of the message will happen in a Future
, asynchronously.
Callback
callable will be called with the first argument.
Example
Example of a subscription to an Application.tick!
messages:
Example
Asynchronous version of the above:
To simplify the process of subscription to PubSub
, ASAB offers the decorator-based "subscribe all" functionality.
Example
In the following example, both on_tick()
and on_exit()
methods are subscribed to Application.PubSub
message bus.
Example
Publishing¤
PubSub.publish()
publishes a message to the PubSub message bus. It will be delivered to each subscriber synchronously.
It means that the method returns after each subscribed callback
is called.
Example
The example of a message publish to the Application.PubSub
message bus:
Asynchronous publishing of a message is requested by asynchronously=True
argument.
The publish()
method returns immediately and the delivery of the message to subscribers happens,
when control returns to the event loop.
Example
The example of a asynchronous version of a message publish to the Application.PubSub
message bus:
Synchronous vs. asynchronous messaging¤
ASAB PubSub supports both modes of a message delivery: synchronous and asynchronous. Moreover, PubSub also deals with modes, when asynchronous code (coroutine) does publish to synchronous code and vice versa.
Synchronous publish | Asynchronous publish | |
---|---|---|
Synchronous subscribe | called immediately | call_soon() |
Asynchronous subscribe | ensure_future() |
call_soon() and ensure_future() |
Application-wide PubSub¤
ASAB provides the application-wide Publish-Subscribe message bus as Application.PubSub
Well-Known Messages¤
ASAB itself automatically publishes various well-known messages on Application.PubSub
:
Message | Published when... |
---|---|
Application.init! |
...the application is in the init-time after the configuration is loaded, logging is setup, the event loop is constructed etc. |
Application.run! |
...the application enters the run-time. |
Application.stop! |
...the application wants to stop the run-time. It can be sent multiple times because of a process of graceful run-time termination. The first argument of the message is a counter that increases with every Application.stop! event. |
Application.exit! |
...the application enters the exit-time. |
Application.hup! |
...the application receives UNIX signal SIGHUP or equivalent. |
Application.housekeeping! |
...the application is on the time for housekeeping. |
Tick messages | ...periodically with the specified tick frequency. |
Tick messages¤
Tick messages are published by the application periodically.
For example, Application.tick!
is published every tick, Application.tick/10!
is published every 10th tick etc.
The tick frequency is configurable to whole seconds, the default is 1 second.
Message | Default period |
---|---|
Application.tick! |
Every second. |
Application.tick/10! |
Every 10 seconds. |
Application.tick/60! |
Every minute. |
Application.tick/300! |
Every 5 minutes. |
Application.tick/600! |
Every 10 minutes. |
Application.tick/1800! |
Every 30 minutes. |
Application.tick/3600! |
Every hour. |
Application.tick/43200! |
Every 12 hours. |
Application.tick/86400! |
Every 24 hours. |
Warning
Don't use arbitrary tick period number (ie. .../15!
). It will not work.
Housekeeping¤
Housekeeping is intended for scheduled processes that run once a day, e.g. for cleaning server databases.
app.PubSub.subscribe("Application.housekeeping!", clean_recycle_bin)
def clean_recycle_bin(msg):
...
The application checks every ten minutes if it's time for housekeeping. If the UTC time reaches the value for housekeeping, the app will publish Application.housekeeping! and schedules the next housekeeping for tomorrow at the same time. There is also a time limit, which is set to 05:00 AM UTC by default.
By default, the time for housekeeping is set to 03:00 AM UTC and the limit to 05:00 AM UTC.
Housekeeping can be also configured to run during the application init-time. Housekeeping time, time limit, and housekeeping at startup can be changed in the configuration file:
This sets the housekeeping time to 7:30 PM UTC and the time limit to 9:00 PM UTC. The time must be written in the format 'HH:MM'. Remember that the time is set to UTC, so be careful when operating in a different timezone.
Note
If the computer is in a sleep state, housekeeping will not be performed. Then, when the computer is reawakened, it will check if it has exceeded the time limit. If not, then housekeeping will be published. If it has exceeded it, it simply informs the user and sets the housekeeping time for the next day.
Note that this only limits the time when the housekeeping can start. If the housekeeping event triggers a procedure that takes a long time to finish, it will not be terminated when the time limit is reached.
Reference:¤
asab.pubsub.PubSub
¤
Bases: object
Object for delivering messages across the ASAB application.
A message is a function or coroutine with specific message_type
that can be published and subscribed at various places in the code.
Source code in asab/pubsub.py
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
|
message(message_type)
async
¤
Await specific message from a coroutine. It is a convenience method for the Subscriber
object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message_type
|
str
|
Message to be awaited. |
required |
Returns:
Type | Description |
---|---|
tuple
|
Triple (message_type, args, kwargs). |
Examples:
Source code in asab/pubsub.py
publish(message_type, *args, **kwargs)
¤
Publish the message and notify the subscribers of an message type
.
message_type
is passed as the first argument to the subscribed callback.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message_type
|
str
|
The emitted message. |
required |
asynchronously
|
bool
|
If |
required |
Examples:
class MyApplication(asab.Application):
async def initialize(self):
self.Count = 0
self.PubSub.subscribe("Fireworks.started!", self.on_fireworks)
async def main(self):
for i in range(3):
self.Count += 1
self.PubSub.publish("Fireworks.started!", self.Count)
await asyncio.sleep(1)
def on_fireworks(self, message_type, count):
print("boom " * count)
Source code in asab/pubsub.py
publish_threadsafe(message_type, *args, **kwargs)
¤
Publish the message and notify the subscribers of an message type
safely form a different that main thread.
message_type
is passed as the first argument to the subscribed callback.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message_type
|
str
|
The emitted message. |
required |
asynchronously
|
bool
|
If |
required |
Source code in asab/pubsub.py
subscribe(message_type, callback)
¤
Set callback
that will be called when message_type
is received.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message_type
|
str
|
Message to be subscribed to. It should end with an exclamation mark |
required |
callback
|
Callable
|
Function or coroutine that is called when the message is received. |
required |
Examples:
class MyClass:
def __init__(self, app):
app.PubSub.subscribe("Application.tick!", self.on_tick)
def on_tick(self, message_type):
print(message_type)
Source code in asab/pubsub.py
subscribe_all(obj)
¤
Find all methods decorated by @asab.subscribe
on the object and subscribe for them.
Examples:
class MyClass:
def __init__(self, app):
app.PubSub.subscribe_all(self)
@asab.subscribe("Application.tick!")
async def on_tick(self, message_type):
print(message_type)
@asab.subscribe("Application.exit!")
def on_exit(self, message_type):
print(message_type)
Source code in asab/pubsub.py
unsubscribe(message_type, callback)
¤
Remove callback
from the subscribed message_type
.
When the subscription does not exist, warning is displayed.
Examples:
class MyClass:
def __init__(self, app):
app.PubSub.subscribe("Application.tick!", self.only_once)
def only_once(self, message_type):
print("This message is displayed only once!")
app.PubSub.unsubscribe("Application.tick!", self.only_once)
Source code in asab/pubsub.py
asab.pubsub.Subscriber
¤
Bases: object
Object for consuming PubSub messages in coroutines.
It subscribes for various message types and consumes them.
It is built on (first-in, first-out) basis.
If pubsub
argument is None
, the initial subscription is skipped.
Examples:
The example of the subscriber object usage in async for statement:
async def my_coroutine(self):
# Subscribe for two application events
subscriber = asab.Subscriber(
self.PubSub,
"Application.tick!",
"Application.exit!"
)
async for message_type, args, kwargs in subscriber:
if message_type == "Application.exit!":
break;
print("Tick.")
Source code in asab/pubsub.py
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 |
|
message()
¤
Wait for a message asynchronously and return triple (message_type, args, kwargs)
.
Examples:
async def my_coroutine(app):
# Subscribe for a two application events
subscriber = asab.Subscriber(
app.PubSub,
"Application.tick!",
"Application.exit!"
)
while True:
message_type, args, kwargs = await subscriber.message()
if message_type == "Application.exit!":
break
print("Tick.")
Source code in asab/pubsub.py
subscribe(pubsub, message_type)
¤
Subscribe for more message types. This method can be called many times with various pubsub
objects.
asab.pubsub.subscribe
¤
Bases: object
Decorator function that simplifies the process of subscription together with PubSub.subscribe_all()
method.
Examples:
class MyClass(object):
def __init__(self, app):
app.PubSub.subscribe_all(self)
@asab.subscribe("Application.tick!")
async def on_tick(self, message_type):
print(message_type)
@asab.subscribe("Application.exit!")
def on_exit(self, message_type):
print(message_type)