Asynchronous Functionality

The ploneintranet.async package aims to facilitate a rich, responsive user experience by offloading long running tasks to a worker instance of Plone.

For example document previews are generated when new documents are uploaded. This happens asynchronously to free up the user’s browser for other tasks. and user notifications arrive in the browser without user interaction.

Similarly, a fast autosave roundtrip is realised by not reindexing the edited page on each edit directly, but deferring the reindexing to an async worker.

Architecture

The model we’ve chosen is both simple and powerful: async jobs to be performed, are executed as HTTP requests against a Plone worker.

This provides several benefits:

  • Ensures asynchronous jobs can still be performed as if the original user had initiated them.
  • Makes it easy to implement an async task, since it is just a normal Plone browser view.
  • Makes it easy to debug any issues as the tasks themselves are just HTTP requests.
  • Removes the need to give Celery access to the ZODB directly.
  • Simplifies deployment, since there is no need for specially-configured async workers - everything runs in Plone instances.

Call flow

The following diagram illustrates the call flow:

Async collaboration diagram
  1. A Plone frontend serves a view which needs to offload some costly processing.
  2. The frontend delegates that task to Celery.
  3. Celery immediately returns, so the initial view can be rendered to the user.
  4. Celery dispatches the job to a Plone worker.
  5. The Plone worker commits the result to the ZODB.
  6. (ZODB returns. Plone worker returns).

At this stage, the original sequence is finished, and the job you would normally perform synchronously has been executed asynchronously and committed to the database, so any following views have access to the results:

  1. A later view on the Plone frontend accesses the ZODB.
  2. The ZODB returns a result that includes the async job modifications done above.

Deployment

Redis

You need a Redis server to store the async messaging queue. For security reasons, we recommend this be installed as a system service so as to be maintained by sysadmin, rather than adding it to buildout where it risks not having security patching performed.

On Ubuntu: sudo apt-get install redis-server.

Celery

Async is provided by Celery. Celery is a proven, pure python, distributed task queue.

Celery is included as a dependency in the ploneintranet buildout and is typically run from supervisor. You can run it manually for debugging purposes as follows:

bin/celery -A ploneintranet.async.celerytasks worker -l info

(or -l debug if you want more verbose logging info)

Plone Workers

The diagram above distinguishes between ‘Plone frontend’ and ‘Plone worker’ but that’s not really a requirement. You can run async on a single instance and it will work fine without ZEO: the worker job gets executed by the same instance as soon as the original view is rendered and the instance becomes available.

In a production environment, you would typically use a ZEO setup with multiple Plone instances. Async jobs get executed as HTTP calls on the normal portal URL, which is important since it enables HAProxy to keep track of the load on each instance.

You may want to designate special workers, separate from user-facing frontends, so that any queue buildup of costly processing jobs does not degrade user view performance. You can route async requests to special workers using the X-celery-task-name: your_task_name HTTP header set by ploneintranet.async using either Nginx rewrites or HAProxy config. You could even maintain separate worker pools for high-priority and low-priority async jobs, distinguished by their task names. YMMV.

If you’re running multiple servers (virtuals) with Plone, each of those needs to have Redis and Celery running.

@@async-status audit view

A special helper view {portal_url}/@@async-status implements a self-test on the async functionality.

If the unit tests pass but this one doesn’t, it’s a deployment issue.

Adding a new async task

If you’re a Plone Intranet developer, it is very easy to add a new async task or to convert an existing piece of functionality into an async job.

Let me walk you through the stack using a working example: performing async object reindexes.

The key thing to understand is, that this is a 3-step process: 1. Prepare the async delegation in Plone. 2. Trigger the delayed http request in Celery. 3. Execute the actual job in Plone.

We’re maintaining Plone task preprocessors (1) in a separate module ploneintranet.async.tasks to distinguish them from the Celery tasks (2), which are maintained in ploneintranet.async.celerytasks.

1. Task preprocessor

The reason we need to pre-process is, that Celery in step 2 does not have access to anything in Plone, so we need to convert any object references into strings we can pass to Celery.

In ploneintranet.async.tasks:

from ploneintranet.async import celerytasks
from ploneintranet.async.interfaces import IAsyncTask
from ploneintranet.async.core import AbstractPost

@implementer(IAsyncTask)
class ReindexObject(AbstractPost):
  """Reindex an object asynchronously."""
  """

  task = celerytasks.reindex_object
  url = '/@@reindex_object'

That’s it. Note that the task and url point forward to the Celery task and worker view we’ll set up in steps 2 and 3 below.

All the heavy lifting here is done in AbstractPost, which:

  • Extracts login credentials from the current request
  • Sets up a plone.protect authenticator
  • Adds a X-celery-task-name http header
  • Munges the given relative url into an absolute url on the current context object

Note

Adding a new preprocessor is as simple as subclassing AbstractPost and setting the task and url properties.

2. Celery task

The Celery task for our reindex job is very simple:

from ploneintranet.async.core import dispatch

@app.task
def reindex_object(url, data={}, headers={}, cookies={}):
  """Reindex a content object."""
  dispatch(url, data, headers, cookies)

As you can see, this is just a wrapper that turns a dispatch call into a Celery task. app is the Celery API that is set up in ploneintranet.async.celerytasks.

Note

Adding a new Celery task is as simple as creating a function that calls dispatcher, and decorating that function with @app.task.

3. View that executes the job

The actual execution of an async job is handled by a normal Plone view. Since this is the code that will interact with the Plone database, you need to take care to not take any security shortcuts here, which means:

  • Register the view with proper view permissions
  • Do not disable CSRF protection!

Think about it: we’re cloning an original request and then re-use those credentials to fire off a new request, that will result in a database write. That’s about the definition of a Cross-Site Request Forgery.

ploneintranet.async plays nice with plone.protect and adds an authenticator to the async request to prove that it’s not an attack.

If you’re developing and debugging the worker view, you’re missing that authenticator (it gets added in the preprocessor step which you’re skipping if you hit the worker view directly.) So you need to add that to help debugging without compromising security.

Here’s the implementation (see ploneintranet.async.browser.views):

import logging
from Products.Five import BrowserView
from Products.Five.browser.pagetemplatefile import ViewPageTemplateFile

logger = logging.getLogger(__name__)


class ReindexObjectView(BrowserView):
    """
    Reindex the current object
    """
    template = ViewPageTemplateFile('templates/protected.pt')

    def authenticated(self):
        return self.request.get('_authenticator', False)

    def __call__(self):
        """
        Execute the actual reindex.
        The ploneintranet.async framework provides a plone.protect
        authenticator automatically.
        For manual testing, render a simple form to provide the
        authenticator.
        Please do not disable CSRF protection.
        """
        if self.authenticated():
            logger.info("Reindexing %s", self.context.absolute_url(1))
            self.context.reindexObject()
        return self.template()

The corresponding view template is as follows:

<html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en"
            xmlns:tal="http://xml.zope.org/namespaces/tal"
            xmlns:metal="http://xml.zope.org/namespaces/metal"
            xmlns:i18n="http://xml.zope.org/namespaces/i18n"
            lang="en"
            i18n:domain="ploneintranet">

    <body>
        <h1>Protected async handler</h1>
        <div tal:condition="not:view/authenticated">
          <p>Manual execution of async tasks requires confirmation
            in order to exclude CSRF attacks.</p>

          <form>
            <span tal:replace="structure context/@@authenticator/authenticator"/>
            <input type="submit" value="confirm action"/>
            </form>

        </div>
        <div tal:condition="view/authenticated">
          <p>Action confirmed and executed.</p>
        </div>
    </body>
</html>

Finally, the view is registered for cmf.ModifyPortalContent since only users with write access should be allowed to trigger a reindex.

<browser:page
    name="reindex_object"
    for="*"
    class=".views.ReindexObjectView"
    permission="cmf.ModifyPortalContent"
    layer="..interfaces.IPloneintranetAsyncLayer"
    />

Note

The actual business logic for executing an async task can be developed as a normal Plone view which you can access directly.

Calling the task

Now the async job pipeline is setup, all you have to do is to import the task preprocessor in your own view code somewhere, and calling it:

from ploneintranet.async import tasks
tasks.ReindexObject(self.context, self.request)()

Note

Triggering an async job is as simple as instantiating and then calling the preprocessor task you prepared above in step 1.

Note the final parentheses, wich is a __call__ that takes various extra arguments with which you can tune both the requests.post and the celery.apply_async calls that are taking place under the hood.

A fictitious call to a view that expects form arguments, which is called with a delay of two seconds, could for example look like:

tasks.GoNuts(self.context, self.request)(data={'nut':'hazel'}, countdown=2)

See the IAsyncTask interface for the full API:

interface ploneintranet.async.interfaces.IAsyncTask(context, request)[source]

Execute a request asynchronously via Celery.

Extracts authentication credentials from a original request and submits a new post request, taking special care that all calls are properly serialized.

task = <zope.interface.interface.Attribute object>

A Celery @app.task callable

url = <zope.interface.interface.Attribute object>

A url to be called, relative to self.context.absolute_url()

data = <zope.interface.interface.Attribute object>

A dictionary with request cookie key: value pairs

headers = <zope.interface.interface.Attribute object>

A dictionary with request header key: value pairs

cookies = <zope.interface.interface.Attribute object>

A dictionary with request cookie key: value pairs

__init__()
__call__(...) <==> x(...)

Re-using ploneintranet.async outside of ploneintranet

Celery is typically set up as follows:

  • a celeryconfig module contains the configuration
  • a celerytasks module loads that configuration module and creates an app
  • a Celery worker is then run against the celerytasks module.

Because of the way Celery wraps tasks in decorators and is quite picky about names and module paths, this means that you cannot import anything from the ploneintranet.celerytasks module.

However, we factored the heavy lifting infrastructure out into a separate ploneintranet.async.core module which you can easily use to build your own Celery tasks:

  • ploneintranet.async.core.AbstractPost is a generic engine for creating task preprocessors. See above how little work is required to subclass it into a custom preprocessor.
  • ploneintranet.async.core.dispatch is a generic engine for executing http requests from Celery.

Note

By importing ploneintranet.async.core you can easily create your own tasks preprocessors and celerytasks.

class ploneintranet.async.core.AbstractPost(context, request)[source]

Execute a HTTP POST request asynchronously via Celery.

Extracts authentication credentials from a original request and submits a new post request, taking special care that all calls are properly serialized.

Sets a X-celery-task-name http header for task request routing in HAProxy etc. YMMV.

This task is suitable as a base class for more specialized subclasses. It is structured as if it were an adapter but it is not registered or used as an adapter.

See tasks.Post for an actual implementation example.

__init__(context, request)[source]

Extract credentials.

__call__(url=None, data={}, headers={}, cookies={}, **kwargs)[source]

Start a Celery task that will execute a post request.

The optional url argument may be used to override self.url. The optional data, headers and cookies args will update the corresponding self.* attributes.

self.task.apply_async will be called and is expected to call url with self.headers as request headers, self.cookie as request cookies, and self.data as post data via the python request library.

**kwargs will be passed through as arguments to celery apply_async so you can set async execution options like countdown, expires or eta.

Returns a <class ‘celery.result.AsyncResult’> when running async, or a <class ‘celery.result.EagerResult’> when running in sync mode.

ploneintranet.async.core.dispatch(url, data={}, headers={}, cookies={})[source]

Execute a HTTP POST via the requests library. This is not a task but a building block for Celery tasks.

Parameters:
  • url (str) – URL to be called by celery, resolvable behind the webserver (i.e. {portal_url}/path/to/object)
  • data (dict) – POST variables to pass through to the url
  • headers (dict) – request headers.
  • cookies (dict) – request cookies. Normally contains __ac for Plone.

Websockets push

The following is not implemented yet but sketches our full vision for the async subsystem.

Todo

The following details how the final, full async system should work, making use of websockets.

Final goal

In the example of document preview generation, a full roundtrip would be as follows:

  • Jane logs into the Intranet.
  • Her browser attempts to open a websocket connection to a Tornado Websocket server.
  • The Tornado server authenticates the socket open request against Plone using Jane’s __ac cookie.
  • Jane uploads a document to a workspace.
  • Plone Intranet handles the object created event, and adds a document preview generation task to the async queue.
  • Celery makes an HTTP request to the Plone Worker instance, authenticated as Jane.
  • Plone Worker instance converts document/generates preview.
  • Plone Worker instance sends a “done” message to a special frontend queue.
  • Tornado server publishes “done” message to Jane’s browser together with URL to fetch HTML snippet from.
  • Browser receives a pat-push marked up message from the websocket.
  • Browser executes the pat-push injection: loads the additional HTML snippet and replaces the placeholder preview image with the actually generated preview image.

Technology stack

  • Tornado will provide a simple websocket server that authenticates against Plone
  • Redis provides message queues
  • Celery provides a simple worker to consume tasks from Redis and perform HTTP requests to Plone Worker instances

As we are using Celery, the message queue can be swapped out per deployment.

pat-push

See https://github.com/ploneintranet/ploneintranet.prototype/issues/75

Previews that have been generated asynchronously get pushed back into the DOM without requiring a refresh of the user’s browser.

To do this we generate a HTML snippet of the preview which contains the source and target attributes for pat-inject. This snippet is sent to the browser over a websocket (described above). pat-inject-async attaches an event handler to on_message event of SockJS

Todo

Pat-push is not implemented yet