API Reference¶
Clients¶
-
class
rele.client.
Publisher
(gc_project_id, credentials, encoder, timeout, blocking=None)¶ The Publisher Class
Wraps the Google Cloud Publisher Client and handles encoding of the data.
It is important that this class remains a Singleton class in the process. Otherwise, a memory leak will occur. To avoid this, it is strongly recommended to use the
publish()
method.If the setting USE_EMULATOR evaluates to True, the Publisher Client will not have any credentials assigned.
Parameters: - gc_project_id – string Google Cloud Project ID.
- credentials – string Google Cloud Credentials.
- encoder – A valid json.encoder.JSONEncoder subclass # noqa
- timeout – float, default PUBLISHER_TIMEOUT
- blocking – boolean, default None falls back to PUBLISHER_BLOCKING
-
publish
(topic, data, blocking=None, timeout=None, raise_exception=True, **attrs)¶ Publishes message to Google PubSub topic.
Usage:
publisher = Publisher() publisher.publish('topic_name', {'foo': 'bar'})
By default, this method is non-blocking, meaning that the method does not wait for the future to be returned.
If you would like to wait for the future so you can track the message later, you can:
Usage:
publisher = Publisher() future = publisher.publish('topic_name', {'foo': 'bar'}, blocking=True, timeout=10.0) # noqa
However, it should be noted that using blocking=True may incur a significant performance hit.
In addition, the method adds a timestamp published_at to the message attrs using epoch floating point number.
Parameters: - topic – string topic to publish the data.
- data – dict with the content of the message.
- blocking – boolean, default None falls back to PUBLISHER_BLOCKING
- timeout – float, default None falls back to PUBLISHER_TIMEOUT
- raise_exception – boolean. If True, exceptions coming from PubSub will be raised
- attrs – additional string parameters to be published.
Returns: Future # noqa
-
class
rele.client.
Subscriber
(gc_project_id, credentials, message_storage_policy, default_ack_deadline=None, default_retry_policy=None)¶ The Subscriber Class.
For convenience, this class wraps the creation and consumption of a topic subscription.
Parameters: - gc_project_id – str MIDDLEWARE .
- credentials – obj
credentials()
. - message_storage_policy – str Region to store the messages
- default_ack_deadline – int Ack Deadline defined in settings
- default_retry_policy – RetryPolicy Rele’s RetryPolicy defined in settings
-
consume
(subscription_name, callback, scheduler)¶ Begin listening to topic from the SubscriberClient.
Parameters: - subscription_name – str Subscription name
- callback – Function which act on a topic message
- scheduler – Thread pool-based scheduler. # noqa
Returns: Future # noqa
-
update_or_create_subscription
(subscription)¶ Handles creating the subscription when it does not exists or updates it if the subscription contains any parameter that allows it.
This makes it easier to deploy a worker and forget about the subscription side of things. The subscription must have a topic to subscribe to. Which means that the topic must be created manually before the worker is started.
Parameters: subscription – obj Subscription
.
Publish¶
-
rele.publishing.
publish
(topic, data, **kwargs)¶ Shortcut method to publishing data to PubSub.
This is a shortcut method that instantiates the Publisher if not already instantiated in the process. This is to ensure that the Publisher remains a Singleton class.
Usage:
import rele def myfunc(): # ... rele.publish(topic='lets-tell-everyone', data={'foo': 'bar'}, myevent='arrival')
Parameters: - topic – str PubSub topic name
- data – dict-like Data to be sent as the message.
- timeout – float. Default None, falls back to RELE[‘PUBLISHER_TIMEOUT’] value
- blocking – boolean. Default False
- kwargs – Any optional key-value pairs that are included as attributes in the message
Returns: None
Subscription¶
-
class
rele.subscription.
Subscription
(func, topic, prefix='', suffix='', filter_by=None, backend_filter_by=None, retry_policy=None)¶ The Subscription class
In addition to using the
@sub
decorator, it is possible to subclass the Subscription.For example:
from rele import Subscription class DoSomethingSub(Subscription): topic = 'photo-uploaded' def __init__(self): self._func = self.callback_func super().__init__(self._func, self.topic) def callback_func(self, data, **kwargs): print(data["id"])
If
rele-cli run
is used, theDoSomethingSub
will be a valid subscription and registered on Google Cloud.
-
rele.subscription.
sub
(topic, prefix=None, suffix=None, filter_by=None, backend_filter_by=None, retry_policy=None)¶ Decorator function that makes declaring a PubSub Subscription simple.
The Subscriber returned will automatically create and name the subscription for the topic. The subscription name will be the topic name prefixed by the project name.
For example, if the topic name to subscribe too is lets-tell-everyone, the subscriber will be named project-name-lets-tell-everyone.
Additionally, if a suffix param is added, the subscriber will be project-name-lets-tell-everyone-my-suffix.
It is recommended to add **kwargs to your sub function. This will allow message attributes to be sent without breaking the subscriber implementation.
Usage:
@sub(topic='lets-tell-to-alice', prefix='shop') def bob_purpose(data, **kwargs): pass @sub(topic='lets-tell-everyone', suffix='sub1') def purpose_1(data, **kwargs): pass @sub(topic='lets-tell-everyone', suffix='sub2') def purpose_2(data, **kwargs): pass @sub(topic='photo-updated', filter_by=lambda **attrs: attrs.get('type') == 'landscape') def sub_process_landscape_photos(data, **kwargs): pass
Parameters: - topic – string The topic that is being subscribed to.
- prefix – string An optional prefix to the subscription name. Useful to namespace your subscription with your project name
- suffix – string An optional suffix to the subscription name. Useful when you have two subscribers in the same project that are subscribed to the same topic.
- filter_by – Union[function, list] An optional function or tuple of functions that filters the messages to be processed by the sub regarding their attributes.
- retry_policy – obj
RetryPolicy
Returns:
Worker¶
-
class
rele.worker.
Worker
(subscriptions, gc_project_id=None, credentials=None, gc_storage_region=None, default_ack_deadline=None, threads_per_subscription=None, default_retry_policy=None)¶ A Worker manages the subscriptions which consume Google PubSub messages.
Facilitates the creation of subscriptions if not already created, and the starting and stopping the consumption of them.
Parameters: subscriptions – list Subscription
-
run_forever
(sleep_interval=1)¶ Shortcut for calling setup, start, and _wait_forever.
Parameters: sleep_interval – Number of seconds to sleep in the while True
loop
-
setup
()¶ Create the subscriptions on a Google PubSub topic.
If the subscription already exists, the subscription will not be re-created. Therefore, it is idempotent.
-
start
()¶ Begin consuming all subscriptions.
When consuming a subscription, a
StreamingPullFuture
is returned from the Google PubSub client library. This future can be used to manage the background stream.The futures are stored so that they can be cancelled later on for a graceful shutdown of the worker.
-
stop
(signal=None, frame=None)¶ Manage the shutdown process of the worker.
This function has two purposes:
- Cancel all the futures created.
- And close all the database connections opened by Django. Even though we cancel the connections for every execution of the callback, we want to be sure that all the database connections are closed in this process.
Exits with code 0 for a clean exit.
Parameters: - signal – Needed for signal.signal # noqa
- frame –
Needed for signal.signal # noqa
-
-
rele.worker.
create_and_run
(subs, config)¶ Create and run a worker from a list of Subscription objects and a config while waiting forever, until the process is stopped.
We stop a worker process on: - SIGINT - SIGTSTP
Parameters: - subs – List
Subscription
- config –
Config
- subs – List
Middleware¶
Relé middleware’s provide additional functionality to default behavior. Simply subclass
BaseMiddleware
and declare the hooks you wish to use.
Base Middleware¶
-
class
rele.middleware.
BaseMiddleware
¶ Base class for middleware. The default implementations for all hooks are no-ops and subclasses may implement whatever subset of hooks they like.
-
post_process_message
()¶ Called after the Worker processes the message.
-
post_process_message_failure
(subscription, exception, start_time, message)¶ Called after the message has been unsuccessfully processed. :param subscription: :param exception: :param start_time: :param message:
-
post_process_message_success
(subscription, start_time, message)¶ Called after the message has been successfully processed. :param subscription: :param start_time: :param message:
-
post_publish_failure
(topic, exception, message)¶ Called after publishing fails. :param topic: :param exception: :param message:
-
post_publish_success
(topic, data, attrs)¶ Called after Publisher succesfully sends message. :param topic: :param data: :param attrs:
-
post_worker_start
()¶ Called after the Worker process starts up.
-
post_worker_stop
()¶ Called after the Worker process shuts down.
-
pre_process_message
(subscription, message)¶ Called when the Worker receives a message. :param subscription: :param message:
-
pre_publish
(topic, data, attrs)¶ Called before Publisher sends message. :param topic: :param data: :param attrs:
-
pre_worker_start
()¶ Called before the Worker process starts up.
-
pre_worker_stop
(subscriptions)¶ Called before the Worker process shuts down.
-
setup
(config, **kwargs)¶ Called when middleware is registered. :param config: Relé Config object
-
Logging Middleware¶
-
class
rele.contrib.logging_middleware.
LoggingMiddleware
¶ Default logging middleware.
Logging format has been configured for Prometheus.
-
post_process_message_failure
(subscription, exception, start_time, message)¶ Called after the message has been unsuccessfully processed. :param subscription: :param exception: :param start_time: :param message:
-
post_process_message_success
(subscription, start_time, message)¶ Called after the message has been successfully processed. :param subscription: :param start_time: :param message:
-
post_publish_failure
(topic, exception, message)¶ Called after publishing fails. :param topic: :param exception: :param message:
-
post_publish_success
(topic, data, attrs)¶ Called after Publisher succesfully sends message. :param topic: :param data: :param attrs:
-
pre_process_message
(subscription, message)¶ Called when the Worker receives a message. :param subscription: :param message:
-
pre_publish
(topic, data, attrs)¶ Called before Publisher sends message. :param topic: :param data: :param attrs:
-
pre_worker_stop
(subscriptions)¶ Called before the Worker process shuts down.
-
setup
(config, **kwargs)¶ Called when middleware is registered. :param config: Relé Config object
-