API Reference

Clients

class rele.client.Publisher(gc_project_id, credentials, encoder, timeout, blocking=None)

The Publisher Class

Wraps the Google Cloud Publisher Client and handles encoding of the data.

It is important that this class remains a Singleton class in the process. Otherwise, a memory leak will occur. To avoid this, it is strongly recommended to use the publish() method.

If the setting USE_EMULATOR evaluates to True, the Publisher Client will not have any credentials assigned.

Parameters:
publish(topic, data, blocking=None, timeout=None, raise_exception=True, **attrs)

Publishes message to Google PubSub topic.

Usage:

publisher = Publisher()
publisher.publish('topic_name', {'foo': 'bar'})

By default, this method is non-blocking, meaning that the method does not wait for the future to be returned.

If you would like to wait for the future so you can track the message later, you can:

Usage:

publisher = Publisher()
future = publisher.publish('topic_name', {'foo': 'bar'}, blocking=True, timeout=10.0) # noqa

However, it should be noted that using blocking=True may incur a significant performance hit.

In addition, the method adds a timestamp published_at to the message attrs using epoch floating point number.

Parameters:
  • topic – string topic to publish the data.

  • data – dict with the content of the message.

  • blocking – boolean, default None falls back to PUBLISHER_BLOCKING

  • timeout – float, default None falls back to PUBLISHER_TIMEOUT

  • raise_exception – boolean. If True, exceptions coming from PubSub will be raised

  • attrs – additional string parameters to be published.

Returns:

Future # noqa

class rele.client.Subscriber(gc_project_id, credentials, message_storage_policy, default_ack_deadline=None, default_retry_policy=None)

The Subscriber Class.

For convenience, this class wraps the creation and consumption of a topic subscription.

Parameters:
  • gc_project_id – str MIDDLEWARE .

  • credentials – obj credentials().

  • message_storage_policy – str Region to store the messages

  • default_ack_deadline – int Ack Deadline defined in settings

  • default_retry_policy – RetryPolicy Rele’s RetryPolicy defined in settings

consume(subscription_name, callback, scheduler)

Begin listening to topic from the SubscriberClient.

Parameters:
  • subscription_name – str Subscription name

  • callback – Function which act on a topic message

  • schedulerThread pool-based scheduler. # noqa

Returns:

Future # noqa

update_or_create_subscription(subscription)

Handles creating the subscription when it does not exists or updates it if the subscription contains any parameter that allows it.

This makes it easier to deploy a worker and forget about the subscription side of things. The subscription must have a topic to subscribe to. Which means that the topic must be created manually before the worker is started.

Parameters:

subscription – obj Subscription.

Publish

rele.publishing.publish(topic, data, **kwargs)

Shortcut method to publishing data to PubSub.

This is a shortcut method that instantiates the Publisher if not already instantiated in the process. This is to ensure that the Publisher remains a Singleton class.

Usage:

import rele

def myfunc():
    # ...
    rele.publish(topic='lets-tell-everyone',
                 data={'foo': 'bar'},
                 myevent='arrival')
Parameters:
  • topic – str PubSub topic name

  • data – dict-like Data to be sent as the message.

  • timeout – float. Default None, falls back to RELE[‘PUBLISHER_TIMEOUT’] value

  • blocking – boolean. Default False

  • kwargs – Any optional key-value pairs that are included as attributes in the message

Returns:

None

Subscription

class rele.subscription.Subscription(func, topic, prefix='', suffix='', filter_by=None, backend_filter_by=None, retry_policy=None)

The Subscription class

In addition to using the @sub decorator, it is possible to subclass the Subscription.

For example:

from rele import Subscription

class DoSomethingSub(Subscription):
    topic = 'photo-uploaded'

    def __init__(self):
        self._func = self.callback_func
        super().__init__(self._func, self.topic)

    def callback_func(self, data, **kwargs):
        print(data["id"])

If rele-cli run is used, the DoSomethingSub will be a valid subscription and registered on Google Cloud.

rele.subscription.sub(topic, prefix=None, suffix=None, filter_by=None, backend_filter_by=None, retry_policy=None)

Decorator function that makes declaring a PubSub Subscription simple.

The Subscriber returned will automatically create and name the subscription for the topic. The subscription name will be the topic name prefixed by the project name.

For example, if the topic name to subscribe too is lets-tell-everyone, the subscriber will be named project-name-lets-tell-everyone.

Additionally, if a suffix param is added, the subscriber will be project-name-lets-tell-everyone-my-suffix.

It is recommended to add **kwargs to your sub function. This will allow message attributes to be sent without breaking the subscriber implementation.

Usage:

@sub(topic='lets-tell-to-alice', prefix='shop')
def bob_purpose(data, **kwargs):
     pass

@sub(topic='lets-tell-everyone', suffix='sub1')
def purpose_1(data, **kwargs):
     pass

@sub(topic='lets-tell-everyone', suffix='sub2')
def purpose_2(data, **kwargs):
     pass

@sub(topic='photo-updated',
     filter_by=lambda **attrs: attrs.get('type') == 'landscape')
def sub_process_landscape_photos(data, **kwargs):
    pass
Parameters:
  • topic – string The topic that is being subscribed to.

  • prefix – string An optional prefix to the subscription name. Useful to namespace your subscription with your project name

  • suffix – string An optional suffix to the subscription name. Useful when you have two subscribers in the same project that are subscribed to the same topic.

  • filter_by – Union[function, list] An optional function or tuple of functions that filters the messages to be processed by the sub regarding their attributes.

  • retry_policy – obj RetryPolicy

Returns:

Subscription

Worker

class rele.worker.Worker(subscriptions, gc_project_id=None, credentials=None, gc_storage_region=None, default_ack_deadline=None, threads_per_subscription=None, default_retry_policy=None)

A Worker manages the subscriptions which consume Google PubSub messages.

Facilitates the creation of subscriptions if not already created, and the starting and stopping the consumption of them.

Parameters:

subscriptions – list Subscription

run_forever(sleep_interval=1)

Shortcut for calling setup, start, and _wait_forever.

Parameters:

sleep_interval – Number of seconds to sleep in the while True loop

setup()

Create the subscriptions on a Google PubSub topic.

If the subscription already exists, the subscription will not be re-created. Therefore, it is idempotent.

start()

Begin consuming all subscriptions.

When consuming a subscription, a StreamingPullFuture is returned from the Google PubSub client library. This future can be used to manage the background stream.

The futures are stored so that they can be cancelled later on for a graceful shutdown of the worker.

stop(signal=None, frame=None)

Manage the shutdown process of the worker.

This function has two purposes:

  1. Cancel all the futures created.

  2. And close all the database connections opened by Django. Even though we cancel the connections for every execution of the callback, we want to be sure that all the database connections are closed in this process.

Exits with code 0 for a clean exit.

Parameters:
rele.worker.create_and_run(subs, config)

Create and run a worker from a list of Subscription objects and a config while waiting forever, until the process is stopped.

We stop a worker process on: - SIGINT - SIGTSTP

Parameters:

Middleware

Relé middleware’s provide additional functionality to default behavior. Simply subclass BaseMiddleware and declare the hooks you wish to use.

Base Middleware

class rele.middleware.BaseMiddleware

Base class for middleware. The default implementations for all hooks are no-ops and subclasses may implement whatever subset of hooks they like.

post_process_message()

Called after the Worker processes the message.

post_process_message_failure(subscription, exception, start_time, message)

Called after the message has been unsuccessfully processed. :param subscription: :param exception: :param start_time: :param message:

post_process_message_success(subscription, start_time, message)

Called after the message has been successfully processed. :param subscription: :param start_time: :param message:

post_publish_failure(topic, exception, message)

Called after publishing fails. :param topic: :param exception: :param message:

post_publish_success(topic, data, attrs)

Called after Publisher succesfully sends message. :param topic: :param data: :param attrs:

post_worker_start()

Called after the Worker process starts up.

post_worker_stop()

Called after the Worker process shuts down.

pre_process_message(subscription, message)

Called when the Worker receives a message. :param subscription: :param message:

pre_publish(topic, data, attrs)

Called before Publisher sends message. :param topic: :param data: :param attrs:

pre_worker_start()

Called before the Worker process starts up.

pre_worker_stop(subscriptions)

Called before the Worker process shuts down.

setup(config, **kwargs)

Called when middleware is registered. :param config: Relé Config object

Logging Middleware

class rele.contrib.logging_middleware.LoggingMiddleware

Default logging middleware.

Logging format has been configured for Prometheus.

post_process_message_failure(subscription, exception, start_time, message)

Called after the message has been unsuccessfully processed. :param subscription: :param exception: :param start_time: :param message:

post_process_message_success(subscription, start_time, message)

Called after the message has been successfully processed. :param subscription: :param start_time: :param message:

post_publish_failure(topic, exception, message)

Called after publishing fails. :param topic: :param exception: :param message:

post_publish_success(topic, data, attrs)

Called after Publisher succesfully sends message. :param topic: :param data: :param attrs:

pre_process_message(subscription, message)

Called when the Worker receives a message. :param subscription: :param message:

pre_publish(topic, data, attrs)

Called before Publisher sends message. :param topic: :param data: :param attrs:

pre_worker_stop(subscriptions)

Called before the Worker process shuts down.

setup(config, **kwargs)

Called when middleware is registered. :param config: Relé Config object

Django Middleware