Skip to content

Publish-Subscribe¤

Publish-subscribe is a messaging pattern where senders of messages, called publishers, send the messages to receivers, called subscribers, via the PubSub message bus.

Publishers don't directly interact with subscribers in any way. Similarly, subscribers express interest in one or more message types and only receive messages that are of interest, without knowledge of which publishers, if any, there are.

The ASAB PubSub module operates with a simple messages, defined by their message type, which is a string. The message can carry optional positional and keyword arguments. The delivery of a message is implemented as a the standard Python function.

Note

We recommend to add ! (an exclamation mark) at the end of the message type in order to distinguish this object from other types such as Python class names or functions.

Examples:

  • Application.run!

  • Application.tick/600!

  • Message.received!

Note

There is a default, application-wide Publish-Subscribe message bus at Application.PubSub that can be used to send messages. Alternatively, you can create your own instance of PubSub and enjoy isolated PubSub delivery space.

Subscription¤

The method PubSub.subscribe() subscribes to a message type. Messages will be delivered to a callback callable (function or method). The callback can be a standard callable or an async coroutine. Asynchronous callback means that the delivery of the message will happen in a Future, asynchronously.

Callback callable will be called with the first argument.

Example

Example of a subscription to an Application.tick! messages:

class MyClass:
    def __init__(self, app):
        app.PubSub.subscribe("Application.tick!", self.on_tick)

    def on_tick(self, message_type):
        print(message_type)

Example

Asynchronous version of the above:

class MyClass:
    def __init__(self, app):
        app.PubSub.subscribe("Application.tick!", self.on_tick)

    async def on_tick(self, message_type):
        print("Wait for it...")
        await asyncio.sleep(3.0)
        print(message_type)

To simplify the process of subscription to PubSub, ASAB offers the decorator-based "subscribe all" functionality.

Example

In the following example, both on_tick() and on_exit() methods are subscribed to Application.PubSub message bus.

class MyClass:
    def __init__(self, app):
        app.PubSub.subscribe_all(self)

    @asab.subscribe("Application.tick!")
    async def on_tick(self, message_type):
        print(message_type)

    @asab.subscribe("Application.exit!")
    def on_exit(self, message_type):
        print(message_type)

Example

async def my_coroutine(self):
    # Subscribe for a two application events
    subscriber = asab.Subscriber(
        self.PubSub,
        "Application.tick!",
        "Application.exit!"
    )
    async for message_type, args, kwargs in subscriber:
        if message_type == "Application.exit!":
            break;
        print("Tick.")

Publishing¤

PubSub.publish() publishes a message to the PubSub message bus. It will be delivered to each subscriber synchronously. It means that the method returns after each subscribed callback is called.

Example

The example of a message publish to the Application.PubSub message bus:

def my_function(app):
    app.PubSub.publish("My.Message!")

Asynchronous publishing of a message is requested by asynchronously=True argument. The publish() method returns immediately and the delivery of the message to subscribers happens, when control returns to the event loop.

Example

The example of a asynchronous version of a message publish to the Application.PubSub message bus:

def my_function(app):
    app.PubSub.publish("My.Message!", asynchronously=True)

Synchronous vs. asynchronous messaging¤

ASAB PubSub supports both modes of a message delivery: synchronous and asynchronous. Moreover, PubSub also deals with modes, when asynchronous code (coroutine) does publish to synchronous code and vice versa.

Synchronous publish Asynchronous publish
Synchronous subscribe called immediately call_soon()
Asynchronous subscribe ensure_future() call_soon() and ensure_future()

Application-wide PubSub¤

ASAB provides the application-wide Publish-Subscribe message bus as Application.PubSub

Well-Known Messages¤

ASAB itself automatically publishes various well-known messages on Application.PubSub:

Message Published when...
Application.init! ...the application is in the init-time
after the configuration is loaded,
logging is setup,
the event loop is constructed etc.
Application.run! ...the application enters the run-time.
Application.stop! ...the application wants to stop the run-time.
It can be sent multiple times because of a process
of graceful run-time termination.
The first argument of the message is a counter
that increases with every Application.stop! event.
Application.exit! ...the application enters the exit-time.
Application.hup! ...the application receives
UNIX signal SIGHUP or equivalent.
Application.housekeeping! ...the application is on the time for housekeeping.
Tick messages ...periodically with the specified tick frequency.

Tick messages¤

Tick messages are published by the application periodically. For example, Application.tick! is published every tick, Application.tick/10! is published every 10th tick etc. The tick frequency is configurable to whole seconds, the default is 1 second.

[general]
# tick every 3 seconds
tick_period = 3
Message Default period
Application.tick! Every second.
Application.tick/10! Every 10 seconds.
Application.tick/60! Every minute.
Application.tick/300! Every 5 minutes.
Application.tick/600! Every 10 minutes.
Application.tick/1800! Every 30 minutes.
Application.tick/3600! Every hour.
Application.tick/43200! Every 12 hours.
Application.tick/86400! Every 24 hours.

Warning

Don't use arbitrary tick period number (ie. .../15!). It will not work.

Housekeeping¤

Housekeeping is intended for scheduled processes that run once a day, e.g. for cleaning server databases.

app.PubSub.subscribe("Application.housekeeping!", clean_recycle_bin)

def clean_recycle_bin(msg):
    ...

The application checks every ten minutes if it's time for housekeeping. If the UTC time reaches the value for housekeeping, the app will publish Application.housekeeping! and schedules the next housekeeping for tomorrow at the same time. There is also a time limit, which is set to 05:00 AM UTC by default.

By default, the time for housekeeping is set to 03:00 AM UTC and the limit to 05:00 AM UTC.

Housekeeping can be also configured to run during the application init-time. Housekeeping time, time limit, and housekeeping at startup can be changed in the configuration file:

[housekeeping]
at=19:30
limit=21:00
run_at_startup=yes

This sets the housekeeping time to 7:30 PM UTC and the time limit to 9:00 PM UTC. The time must be written in the format 'HH:MM'. Remember that the time is set to UTC, so be careful when operating in a different timezone.

Note

If the computer is in a sleep state, housekeeping will not be performed. Then, when the computer is reawakened, it will check if it has exceeded the time limit. If not, then housekeeping will be published. If it has exceeded it, it simply informs the user and sets the housekeeping time for the next day.

Note that this only limits the time when the housekeeping can start. If the housekeeping event triggers a procedure that takes a long time to finish, it will not be terminated when the time limit is reached.

Reference:¤

asab.pubsub.PubSub ¤

Bases: object

Object for delivering messages across the ASAB application.

A message is a function or coroutine with specific message_type that can be published and subscribed at various places in the code.

Source code in asab/pubsub.py
class PubSub(object):
	"""
	Object for delivering messages across the ASAB application.

	A message is a function or coroutine with specific `message_type` that can be published and subscribed at various places in the code.
	"""


	def __init__(self, app):
		self.Subscribers = {}
		self.Loop = app.Loop


	def subscribe(self, message_type: str, callback: typing.Callable):
		"""
		Set `callback` that will be called when `message_type` is received.

		Args:
			message_type: Message to be subscribed to. It should end with an exclamation mark `"!"`.
			callback: Function or coroutine that is called when the message is received. `message_type` is passed as the first argument to the callback.

		Examples:

		```python
		class MyClass:
			def __init__(self, app):
				app.PubSub.subscribe("Application.tick!", self.on_tick)

			def on_tick(self, message_type):
				print(message_type)
		```
		"""

		# If subscribe is a bound method, do special treatment
		# https://stackoverflow.com/questions/53225/how-do-you-check-whether-a-python-method-is-bound-or-not
		if hasattr(callback, '__self__'):
			callback = weakref.WeakMethod(callback)
		else:
			callback = weakref.ref(callback)

		if message_type not in self.Subscribers:
			self.Subscribers[message_type] = [callback]
		else:
			self.Subscribers[message_type].append(callback)


	def subscribe_all(self, obj):
		"""
		Find all methods decorated by `@asab.subscribe` on the object and subscribe for them.

		Examples:

		```python
		class MyClass:
			def __init__(self, app):
				app.PubSub.subscribe_all(self)

			@asab.subscribe("Application.tick!")
			async def on_tick(self, message_type):
				print(message_type)

			@asab.subscribe("Application.exit!")
			def on_exit(self, message_type):
				print(message_type)
		```
		"""
		for member_name in dir(obj):
			member = getattr(obj, member_name)
			message_types = getattr(member, 'asab_pubsub_subscribe_to_message_types', None)
			if message_types is not None:
				for message_type in message_types:
					self.subscribe(message_type, member)


	def unsubscribe(self, message_type, callback):
		"""
		Remove `callback` from the subscribed `message_type`.

		When the subscription does not exist, warning is displayed.

		Examples:

		```python
		class MyClass:
			def __init__(self, app):
				app.PubSub.subscribe("Application.tick!", self.only_once)

			def only_once(self, message_type):
				print("This message is displayed only once!")
				app.PubSub.unsubscribe("Application.tick!", self.only_once)
		```
		"""
		callback_list = self.Subscribers.get(message_type)
		if callback_list is None:
			L.warning("Message type subscription '{}'' not found.".format(message_type))
			return

		remove_list = None

		for i in range(len(callback_list)):
			# Take an weakref entry in the callback list and references it
			c = callback_list[i]()

			# Check if a weak reference is working
			if c is None:  # a reference is lost, remove this entry
				if remove_list is None:
					remove_list = list()
				remove_list.append(callback_list[i])
				continue

			if c == callback:
				callback_list.pop(i)
				break

		else:
			L.warning("Subscriber '{}'' not found for the message type '{}'.".format(message_type, callback))

		if remove_list is not None:
			for callback_ref in remove_list:
				callback_list.remove(callback_ref)

		if len(callback_list) == 0:
			del self.Subscribers[message_type]


	def _callback_iter(self, message_type):

		callback_list = self.Subscribers.get(message_type)
		if callback_list is None:
			return

		remove_list = None

		for callback_ref in callback_list:
			callback = callback_ref()

			# Check if a weak reference is working
			if callback is None:  # a reference is lost
				if remove_list is None:
					remove_list = list()
				remove_list.append(callback_ref)
				continue

			if asyncio.iscoroutinefunction(callback):
				callback = functools.partial(_deliver_async, self.Loop, callback)

			yield callback

		if remove_list is not None:
			for callback_ref in remove_list:
				callback_list.remove(callback_ref)


	def publish(self, message_type: str, *args, **kwargs):
		"""
		Publish the message and notify the subscribers of an `message type`.

		`message_type` is passed as the first argument to the subscribed callback.

		Args:
			message_type: The emitted message.
			asynchronously (bool, optional): If `True`, `call_soon()` method will be used for the asynchronous delivery of the message. Defaults to `False`.

		Examples:

		```python
		class MyApplication(asab.Application):
			async def initialize(self):
				self.Count = 0
				self.PubSub.subscribe("Fireworks.started!", self.on_fireworks)

			async def main(self):
				for i in range(3):
					self.Count += 1
					self.PubSub.publish("Fireworks.started!", self.Count)
					await asyncio.sleep(1)

			def on_fireworks(self, message_type, count):
				print("boom " * count)
		```
		"""

		asynchronously = kwargs.pop('asynchronously', False)

		if asynchronously:
			for callback in self._callback_iter(message_type):
				self.Loop.call_soon(functools.partial(callback, message_type, *args, **kwargs))

		else:
			for callback in self._callback_iter(message_type):
				try:
					callback(message_type, *args, **kwargs)
				except Exception:
					L.exception("Error in a PubSub callback", struct_data={'message_type': message_type})


	def publish_threadsafe(self, message_type: str, *args, **kwargs):
		"""
		Publish the message and notify the subscribers of an `message type` safely form a different that main thread.

		`message_type` is passed as the first argument to the subscribed callback.

		Args:
			message_type: The emitted message.
			asynchronously (bool, optional): If `True`, `call_soon()` method will be used for the asynchronous delivery of the message. Defaults to `False`.
		"""
		def in_main_thread():
			self.publish(message_type, *args, **kwargs)
		self.Loop.call_soon_threadsafe(in_main_thread)


	async def message(self, message_type: str) -> tuple:
		"""
		Await specific message from a coroutine. It is a convenience method for the `Subscriber` object.

		Args:
			message_type: Message to be awaited.

		Returns:
			Triple (message_type, args, kwargs).

		Examples:

		```python
		message_type, args, kwargs = await self.PubSub.message("Library.ready!")
		```
		"""
		subscriber = Subscriber(self, message_type)
		message_type, args, kwargs = await subscriber.message()
		return message_type, args, kwargs

message(message_type) async ¤

Await specific message from a coroutine. It is a convenience method for the Subscriber object.

Parameters:

Name Type Description Default
message_type str

Message to be awaited.

required

Returns:

Type Description
tuple

Triple (message_type, args, kwargs).

Examples:

message_type, args, kwargs = await self.PubSub.message("Library.ready!")
Source code in asab/pubsub.py
async def message(self, message_type: str) -> tuple:
	"""
	Await specific message from a coroutine. It is a convenience method for the `Subscriber` object.

	Args:
		message_type: Message to be awaited.

	Returns:
		Triple (message_type, args, kwargs).

	Examples:

	```python
	message_type, args, kwargs = await self.PubSub.message("Library.ready!")
	```
	"""
	subscriber = Subscriber(self, message_type)
	message_type, args, kwargs = await subscriber.message()
	return message_type, args, kwargs

publish(message_type, *args, **kwargs) ¤

Publish the message and notify the subscribers of an message type.

message_type is passed as the first argument to the subscribed callback.

Parameters:

Name Type Description Default
message_type str

The emitted message.

required
asynchronously bool

If True, call_soon() method will be used for the asynchronous delivery of the message. Defaults to False.

required

Examples:

class MyApplication(asab.Application):
        async def initialize(self):
                self.Count = 0
                self.PubSub.subscribe("Fireworks.started!", self.on_fireworks)

        async def main(self):
                for i in range(3):
                        self.Count += 1
                        self.PubSub.publish("Fireworks.started!", self.Count)
                        await asyncio.sleep(1)

        def on_fireworks(self, message_type, count):
                print("boom " * count)
Source code in asab/pubsub.py
def publish(self, message_type: str, *args, **kwargs):
	"""
	Publish the message and notify the subscribers of an `message type`.

	`message_type` is passed as the first argument to the subscribed callback.

	Args:
		message_type: The emitted message.
		asynchronously (bool, optional): If `True`, `call_soon()` method will be used for the asynchronous delivery of the message. Defaults to `False`.

	Examples:

	```python
	class MyApplication(asab.Application):
		async def initialize(self):
			self.Count = 0
			self.PubSub.subscribe("Fireworks.started!", self.on_fireworks)

		async def main(self):
			for i in range(3):
				self.Count += 1
				self.PubSub.publish("Fireworks.started!", self.Count)
				await asyncio.sleep(1)

		def on_fireworks(self, message_type, count):
			print("boom " * count)
	```
	"""

	asynchronously = kwargs.pop('asynchronously', False)

	if asynchronously:
		for callback in self._callback_iter(message_type):
			self.Loop.call_soon(functools.partial(callback, message_type, *args, **kwargs))

	else:
		for callback in self._callback_iter(message_type):
			try:
				callback(message_type, *args, **kwargs)
			except Exception:
				L.exception("Error in a PubSub callback", struct_data={'message_type': message_type})

publish_threadsafe(message_type, *args, **kwargs) ¤

Publish the message and notify the subscribers of an message type safely form a different that main thread.

message_type is passed as the first argument to the subscribed callback.

Parameters:

Name Type Description Default
message_type str

The emitted message.

required
asynchronously bool

If True, call_soon() method will be used for the asynchronous delivery of the message. Defaults to False.

required
Source code in asab/pubsub.py
def publish_threadsafe(self, message_type: str, *args, **kwargs):
	"""
	Publish the message and notify the subscribers of an `message type` safely form a different that main thread.

	`message_type` is passed as the first argument to the subscribed callback.

	Args:
		message_type: The emitted message.
		asynchronously (bool, optional): If `True`, `call_soon()` method will be used for the asynchronous delivery of the message. Defaults to `False`.
	"""
	def in_main_thread():
		self.publish(message_type, *args, **kwargs)
	self.Loop.call_soon_threadsafe(in_main_thread)

subscribe(message_type, callback) ¤

Set callback that will be called when message_type is received.

Parameters:

Name Type Description Default
message_type str

Message to be subscribed to. It should end with an exclamation mark "!".

required
callback Callable

Function or coroutine that is called when the message is received. message_type is passed as the first argument to the callback.

required

Examples:

class MyClass:
        def __init__(self, app):
                app.PubSub.subscribe("Application.tick!", self.on_tick)

        def on_tick(self, message_type):
                print(message_type)
Source code in asab/pubsub.py
def subscribe(self, message_type: str, callback: typing.Callable):
	"""
	Set `callback` that will be called when `message_type` is received.

	Args:
		message_type: Message to be subscribed to. It should end with an exclamation mark `"!"`.
		callback: Function or coroutine that is called when the message is received. `message_type` is passed as the first argument to the callback.

	Examples:

	```python
	class MyClass:
		def __init__(self, app):
			app.PubSub.subscribe("Application.tick!", self.on_tick)

		def on_tick(self, message_type):
			print(message_type)
	```
	"""

	# If subscribe is a bound method, do special treatment
	# https://stackoverflow.com/questions/53225/how-do-you-check-whether-a-python-method-is-bound-or-not
	if hasattr(callback, '__self__'):
		callback = weakref.WeakMethod(callback)
	else:
		callback = weakref.ref(callback)

	if message_type not in self.Subscribers:
		self.Subscribers[message_type] = [callback]
	else:
		self.Subscribers[message_type].append(callback)

subscribe_all(obj) ¤

Find all methods decorated by @asab.subscribe on the object and subscribe for them.

Examples:

class MyClass:
        def __init__(self, app):
                app.PubSub.subscribe_all(self)

        @asab.subscribe("Application.tick!")
        async def on_tick(self, message_type):
                print(message_type)

        @asab.subscribe("Application.exit!")
        def on_exit(self, message_type):
                print(message_type)
Source code in asab/pubsub.py
def subscribe_all(self, obj):
	"""
	Find all methods decorated by `@asab.subscribe` on the object and subscribe for them.

	Examples:

	```python
	class MyClass:
		def __init__(self, app):
			app.PubSub.subscribe_all(self)

		@asab.subscribe("Application.tick!")
		async def on_tick(self, message_type):
			print(message_type)

		@asab.subscribe("Application.exit!")
		def on_exit(self, message_type):
			print(message_type)
	```
	"""
	for member_name in dir(obj):
		member = getattr(obj, member_name)
		message_types = getattr(member, 'asab_pubsub_subscribe_to_message_types', None)
		if message_types is not None:
			for message_type in message_types:
				self.subscribe(message_type, member)

unsubscribe(message_type, callback) ¤

Remove callback from the subscribed message_type.

When the subscription does not exist, warning is displayed.

Examples:

class MyClass:
        def __init__(self, app):
                app.PubSub.subscribe("Application.tick!", self.only_once)

        def only_once(self, message_type):
                print("This message is displayed only once!")
                app.PubSub.unsubscribe("Application.tick!", self.only_once)
Source code in asab/pubsub.py
def unsubscribe(self, message_type, callback):
	"""
	Remove `callback` from the subscribed `message_type`.

	When the subscription does not exist, warning is displayed.

	Examples:

	```python
	class MyClass:
		def __init__(self, app):
			app.PubSub.subscribe("Application.tick!", self.only_once)

		def only_once(self, message_type):
			print("This message is displayed only once!")
			app.PubSub.unsubscribe("Application.tick!", self.only_once)
	```
	"""
	callback_list = self.Subscribers.get(message_type)
	if callback_list is None:
		L.warning("Message type subscription '{}'' not found.".format(message_type))
		return

	remove_list = None

	for i in range(len(callback_list)):
		# Take an weakref entry in the callback list and references it
		c = callback_list[i]()

		# Check if a weak reference is working
		if c is None:  # a reference is lost, remove this entry
			if remove_list is None:
				remove_list = list()
			remove_list.append(callback_list[i])
			continue

		if c == callback:
			callback_list.pop(i)
			break

	else:
		L.warning("Subscriber '{}'' not found for the message type '{}'.".format(message_type, callback))

	if remove_list is not None:
		for callback_ref in remove_list:
			callback_list.remove(callback_ref)

	if len(callback_list) == 0:
		del self.Subscribers[message_type]

asab.pubsub.Subscriber ¤

Bases: object

Object for consuming PubSub messages in coroutines.

It subscribes for various message types and consumes them. It is built on (first-in, first-out) basis. If pubsub argument is None, the initial subscription is skipped.

Examples:

The example of the subscriber object usage in async for statement:

async def my_coroutine(self):
        # Subscribe for two application events
        subscriber = asab.Subscriber(
                self.PubSub,
                "Application.tick!",
                "Application.exit!"
        )
        async for message_type, args, kwargs in subscriber:
                if message_type == "Application.exit!":
                        break;
                print("Tick.")
Source code in asab/pubsub.py
class Subscriber(object):
	"""
	Object for consuming PubSub messages in coroutines.

	It subscribes for various message types and consumes them.
	It is built on (first-in, first-out) basis.
	If `pubsub` argument is `None`, the initial subscription is skipped.

	Examples:

	The example of the subscriber object usage in async for statement:

	```python
	async def my_coroutine(self):
		# Subscribe for two application events
		subscriber = asab.Subscriber(
			self.PubSub,
			"Application.tick!",
			"Application.exit!"
		)
		async for message_type, args, kwargs in subscriber:
			if message_type == "Application.exit!":
				break;
			print("Tick.")
	```
	"""

	def __init__(self, pubsub=None, *message_types):

		self._q = asyncio.Queue()
		self._subscriptions = []

		if pubsub is not None:
			for message_type in message_types:
				self.subscribe(pubsub, message_type)


	def subscribe(self, pubsub, message_type):
		"""
		Subscribe for more message types. This method can be called many times with various `pubsub` objects.
		"""
		pubsub.subscribe(message_type, self)
		self._subscriptions.append((pubsub, message_type))


	def __call__(self, message_type, *args, **kwargs):
		self._q.put_nowait((message_type, args, kwargs))


	def message(self):
		"""
		Wait for a message asynchronously and return triple `(message_type, args, kwargs)`.

		Examples:

		```python
		async def my_coroutine(app):
			# Subscribe for a two application events
			subscriber = asab.Subscriber(
				app.PubSub,
				"Application.tick!",
				"Application.exit!"
			)
			while True:
				message_type, args, kwargs = await subscriber.message()
				if message_type == "Application.exit!":
					break
				print("Tick.")
		```
		"""
		return self._q.get()


	def __aiter__(self):
		return self


	async def __anext__(self):
		return await self._q.get()

message() ¤

Wait for a message asynchronously and return triple (message_type, args, kwargs).

Examples:

async def my_coroutine(app):
        # Subscribe for a two application events
        subscriber = asab.Subscriber(
                app.PubSub,
                "Application.tick!",
                "Application.exit!"
        )
        while True:
                message_type, args, kwargs = await subscriber.message()
                if message_type == "Application.exit!":
                        break
                print("Tick.")
Source code in asab/pubsub.py
def message(self):
	"""
	Wait for a message asynchronously and return triple `(message_type, args, kwargs)`.

	Examples:

	```python
	async def my_coroutine(app):
		# Subscribe for a two application events
		subscriber = asab.Subscriber(
			app.PubSub,
			"Application.tick!",
			"Application.exit!"
		)
		while True:
			message_type, args, kwargs = await subscriber.message()
			if message_type == "Application.exit!":
				break
			print("Tick.")
	```
	"""
	return self._q.get()

subscribe(pubsub, message_type) ¤

Subscribe for more message types. This method can be called many times with various pubsub objects.

Source code in asab/pubsub.py
def subscribe(self, pubsub, message_type):
	"""
	Subscribe for more message types. This method can be called many times with various `pubsub` objects.
	"""
	pubsub.subscribe(message_type, self)
	self._subscriptions.append((pubsub, message_type))

asab.pubsub.subscribe ¤

Bases: object

Decorator function that simplifies the process of subscription together with PubSub.subscribe_all() method.

Examples:

class MyClass(object):
        def __init__(self, app):
                app.PubSub.subscribe_all(self)

        @asab.subscribe("Application.tick!")
        async def on_tick(self, message_type):
                print(message_type)

        @asab.subscribe("Application.exit!")
        def on_exit(self, message_type):
                print(message_type)

Source code in asab/pubsub.py
class subscribe(object):
	"""
	Decorator function that simplifies the process of subscription together with `PubSub.subscribe_all()` method.

	Examples:
	```python
	class MyClass(object):
		def __init__(self, app):
			app.PubSub.subscribe_all(self)

		@asab.subscribe("Application.tick!")
		async def on_tick(self, message_type):
			print(message_type)

		@asab.subscribe("Application.exit!")
		def on_exit(self, message_type):
			print(message_type)
	```
	"""


	def __init__(self, message_type):
		self.message_type = message_type


	def __call__(self, f):
		if getattr(f, 'asab_pubsub_subscribe_to_message_types', None) is None:
			f.asab_pubsub_subscribe_to_message_types = [self.message_type]
		else:
			f.asab_pubsub_subscribe_to_message_types.append(self.message_type)
		return f