API Reference¶
Clients¶
- class rele.client.Publisher(gc_project_id, credentials, encoder, timeout, client_options, blocking=None)¶
The Publisher Class
Wraps the Google Cloud Publisher Client and handles encoding of the data.
It is important that this class remains a Singleton class in the process. Otherwise, a memory leak will occur. To avoid this, it is strongly recommended to use the
publish()
method.If the setting USE_EMULATOR evaluates to True, the Publisher Client will not have any credentials assigned.
- Parameters:
gc_project_id – string Google Cloud Project ID.
credentials – string Google Cloud Credentials.
encoder – A valid json.encoder.JSONEncoder subclass # noqa
timeout – float, default PUBLISHER_TIMEOUT
blocking – boolean, default None falls back to PUBLISHER_BLOCKING
- publish(topic, data, blocking=None, timeout=None, raise_exception=True, **attrs)¶
Publishes message to Google PubSub topic.
Usage:
publisher = Publisher() publisher.publish('topic_name', {'foo': 'bar'})
By default, this method is non-blocking, meaning that the method does not wait for the future to be returned.
If you would like to wait for the future so you can track the message later, you can:
Usage:
publisher = Publisher() future = publisher.publish('topic_name', {'foo': 'bar'}, blocking=True, timeout=10.0) # noqa
However, it should be noted that using blocking=True may incur a significant performance hit.
In addition, the method adds a timestamp published_at to the message attrs using epoch floating point number.
- Parameters:
topic – string topic to publish the data.
data – dict with the content of the message.
blocking – boolean, default None falls back to PUBLISHER_BLOCKING
timeout – float, default None falls back to PUBLISHER_TIMEOUT
raise_exception – boolean. If True, exceptions coming from PubSub will be raised
attrs – additional string parameters to be published.
- Returns:
Future # noqa
- class rele.client.Subscriber(gc_project_id, credentials, message_storage_policy, client_options, default_ack_deadline=None, default_retry_policy=None)¶
The Subscriber Class.
For convenience, this class wraps the creation and consumption of a topic subscription.
- Parameters:
gc_project_id – str MIDDLEWARE .
credentials – obj
credentials()
.message_storage_policy – str Region to store the messages
default_ack_deadline – int Ack Deadline defined in settings
default_retry_policy – RetryPolicy Rele’s RetryPolicy defined in settings
- consume(subscription_name, callback, scheduler)¶
Begin listening to topic from the SubscriberClient.
- Parameters:
subscription_name – str Subscription name
callback – Function which act on a topic message
scheduler – Thread pool-based scheduler. # noqa
- Returns:
Future # noqa
- update_or_create_subscription(subscription)¶
Handles creating the subscription when it does not exists or updates it if the subscription contains any parameter that allows it.
This makes it easier to deploy a worker and forget about the subscription side of things. If the topic of the subscription do not exist, it will be created automatically.
- Parameters:
subscription – obj
Subscription
.
Publish¶
- rele.publishing.publish(topic, data, **kwargs)¶
Shortcut method to publishing data to PubSub.
This is a shortcut method that instantiates the Publisher if not already instantiated in the process. This is to ensure that the Publisher remains a Singleton class.
Usage:
import rele def myfunc(): # ... rele.publish(topic='lets-tell-everyone', data={'foo': 'bar'}, myevent='arrival')
- Parameters:
topic – str PubSub topic name
data – dict-like Data to be sent as the message.
timeout – float. Default None, falls back to RELE[‘PUBLISHER_TIMEOUT’] value
blocking – boolean. Default False
kwargs – Any optional key-value pairs that are included as attributes in the message
- Returns:
None
Subscription¶
- class rele.subscription.Subscription(func, topic, prefix='', suffix='', filter_by=None, backend_filter_by=None, retry_policy=None)¶
The Subscription class
In addition to using the
@sub
decorator, it is possible to subclass the Subscription.For example:
from rele import Subscription class DoSomethingSub(Subscription): topic = 'photo-uploaded' def __init__(self): self._func = self.callback_func super().__init__(self._func, self.topic) def callback_func(self, data, **kwargs): print(data["id"])
If
rele-cli run
is used, theDoSomethingSub
will be a valid subscription and registered on Google Cloud.
- rele.subscription.sub(topic, prefix=None, suffix=None, filter_by=None, backend_filter_by=None, retry_policy=None)¶
Decorator function that makes declaring a PubSub Subscription simple.
The Subscriber returned will automatically create and name the subscription for the topic. The subscription name will be the topic name prefixed by the project name.
For example, if the topic name to subscribe too is lets-tell-everyone, the subscriber will be named project-name-lets-tell-everyone.
Additionally, if a suffix param is added, the subscriber will be project-name-lets-tell-everyone-my-suffix.
It is recommended to add **kwargs to your sub function. This will allow message attributes to be sent without breaking the subscriber implementation.
Usage:
@sub(topic='lets-tell-to-alice', prefix='shop') def bob_purpose(data, **kwargs): pass @sub(topic='lets-tell-everyone', suffix='sub1') def purpose_1(data, **kwargs): pass @sub(topic='lets-tell-everyone', suffix='sub2') def purpose_2(data, **kwargs): pass @sub(topic='photo-updated', filter_by=lambda **attrs: attrs.get('type') == 'landscape') def sub_process_landscape_photos(data, **kwargs): pass
- Parameters:
topic – string The topic that is being subscribed to.
prefix – string An optional prefix to the subscription name. Useful to namespace your subscription with your project name
suffix – string An optional suffix to the subscription name. Useful when you have two subscribers in the same project that are subscribed to the same topic.
filter_by – Union[function, list] An optional function or tuple of functions that filters the messages to be processed by the sub regarding their attributes.
retry_policy – obj
RetryPolicy
- Returns:
Worker¶
- exception rele.worker.NotConnectionError¶
- class rele.worker.Worker(subscriptions, client_options, gc_project_id=None, credentials=None, gc_storage_region=None, default_ack_deadline=None, threads_per_subscription=None, default_retry_policy=None)¶
A Worker manages the subscriptions which consume Google PubSub messages.
Facilitates the creation of subscriptions if not already created, and the starting and stopping the consumption of them.
- Parameters:
subscriptions – list
Subscription
- run_forever(sleep_interval=1)¶
Shortcut for calling setup, start, and _wait_forever.
- Parameters:
sleep_interval – Number of seconds to sleep in the
while True
loop
- setup()¶
Create the subscriptions on a Google PubSub topic.
If the subscription already exists, the subscription will not be re-created. Therefore, it is idempotent.
- start()¶
Begin consuming all subscriptions.
When consuming a subscription, a
StreamingPullFuture
is returned from the Google PubSub client library. This future can be used to manage the background stream.The futures are stored so that they can be cancelled later on for a graceful shutdown of the worker.
- stop(signal=None, frame=None)¶
Manage the shutdown process of the worker.
This function has two purposes:
Cancel all the futures created.
And close all the database connections opened by Django. Even though we cancel the connections for every execution of the callback, we want to be sure that all the database connections are closed in this process.
Exits with code 0 for a clean exit.
- Parameters:
signal – Needed for signal.signal # noqa
frame –
Needed for signal.signal # noqa
- rele.worker.create_and_run(subs, config)¶
Create and run a worker from a list of Subscription objects and a config while waiting forever, until the process is stopped.
We stop a worker process on: - SIGINT - SIGTSTP
- Parameters:
subs – List
Subscription
config –
Config
Middleware¶
Relé middleware’s provide additional functionality to default behavior. Simply subclass
BaseMiddleware
and declare the hooks you wish to use.
Base Middleware¶
- class rele.middleware.BaseMiddleware¶
Base class for middleware. The default implementations for all hooks are no-ops and subclasses may implement whatever subset of hooks they like.
- post_process_message()¶
Called after the Worker processes the message.
- post_process_message_failure(subscription, exception, start_time, message)¶
Called after the message has been unsuccessfully processed. :param subscription: :param exception: :param start_time: :param message:
- post_process_message_success(subscription, start_time, message)¶
Called after the message has been successfully processed. :param subscription: :param start_time: :param message:
- post_publish_failure(topic, exception, message)¶
Called after publishing fails. :param topic: :param exception: :param message:
- post_publish_success(topic, data, attrs)¶
Called after Publisher succesfully sends message. :param topic: :param data: :param attrs:
- post_worker_start()¶
Called after the Worker process starts up.
- post_worker_stop()¶
Called after the Worker process shuts down.
- pre_process_message(subscription, message)¶
Called when the Worker receives a message. :param subscription: :param message:
- pre_publish(topic, data, attrs)¶
Called before Publisher sends message. :param topic: :param data: :param attrs:
- pre_worker_start()¶
Called before the Worker process starts up.
- pre_worker_stop(subscriptions)¶
Called before the Worker process shuts down.
- setup(config, **kwargs)¶
Called when middleware is registered. :param config: Relé Config object
Logging Middleware¶
- class rele.contrib.logging_middleware.LoggingMiddleware¶
Default logging middleware.
Logging format has been configured for Prometheus.
- post_process_message_failure(subscription, exception, start_time, message)¶
Called after the message has been unsuccessfully processed. :param subscription: :param exception: :param start_time: :param message:
- post_process_message_success(subscription, start_time, message)¶
Called after the message has been successfully processed. :param subscription: :param start_time: :param message:
- post_publish_failure(topic, exception, message)¶
Called after publishing fails. :param topic: :param exception: :param message:
- post_publish_success(topic, data, attrs)¶
Called after Publisher succesfully sends message. :param topic: :param data: :param attrs:
- pre_process_message(subscription, message)¶
Called when the Worker receives a message. :param subscription: :param message:
- pre_publish(topic, data, attrs)¶
Called before Publisher sends message. :param topic: :param data: :param attrs:
- pre_worker_stop(subscriptions)¶
Called before the Worker process shuts down.
- setup(config, **kwargs)¶
Called when middleware is registered. :param config: Relé Config object