Distributing Work in Python Without Celery

We’ve been migrating a lot of data to various places lately at DISQUS. These generally have been things like running
consistancy checks on our PostgreSQL shards, or creating a new system which requires a certain form of denormalized data.

usually involves iterating through the results of an entire table (and sometimes even more), and performing some action
based on that row. We never care about results, we just want to be able to finish as quickly as possible.

Generally, we’d just create a simple do_something.py that would look something like this:

for comment in RangeQuerySetWrapper(Post.objects.all()):

Note: RangeQuerySetWrapper is a wrapper around Django’s ORM that efficiently iterates a table.

Eventually we came up with an internal tool to make this a bit more bearable. Mostly to handle resuming processes based
on the last primary key, and to track status. It evolved into a slightly more complex, but still simple utility we called

def callback(obj):

def main(**options):
    qs = Post.objects.all()
    tm = Taskmaster(callback, qs, **options)

This used to never be much of a problem. We’d just spin up some utility server and max the CPUs on that single machine
to get data processed in a day or less. Lately however, we’ve grown beyond the bounds of what is reasonable for a single
machine to take care of, and we’ve had to look towards other solutions.

Why Not Celery?

As with most people, we rely on Celery and RabbitMQ for distributing asyncrhonous tasks in our application. Unfortunately
that’s not quite the ideal fit out of the box for us in these situations. The root of the problem stems from the fact that
we may need to run through a billion objects, and without some effort, that would mean every single task would need to
fit into a RabbitMQ instance.

Given that we can’t simply queue every task and then distribute them to some Celery workers, and even more so that we
simply dont want to bring up Celery machines/write throwaway Celery code for a simple script, we chose to take a different
route. That route ended up with a simple distributed buffer queue, built on the
Python multiprocessing module.

Introducing Taskmaster

Taskmaster takes advantage of the remote management capabilities built into the multiprocessing module. This makes it
very simple to just throw in a capped Queue and have workers connect, get and execute jobs, and control state via that
single master process. In the end, we came up with an API looking something like this:

# spawn the master process
$ tm-master taskmaster.example --reset --key=foo --host=

# run a slave
$ tm-slave do_something:handle_job --host=

You’ll see the status on the master as things process, and if you cancel the process and start it again, it will
automatically resume:

$ tm-master taskmaster.example --reset --key=foo --host=
Taskmaster server running on ''
Current Job: 30421 | Rate:  991.06/s | Elapsed Time: 0:00:40

Implementing the iterator and the callback are just as simple as they used to be:

def get_jobs(last=0):
    # ``last`` will only be passed if previous state was available
    for obj in RangeQuerySetWrapper(Post.objects.all(), min_id=last):
        yield obj

def handle_job(obj):
    print "Got %r!" % obj

Now under the hood Taskmaster will continue to iterate on get_jobs whenever the size of the queue is
under the threshold (which defaults to 10,000 items). This means we have a constant memory footprint and can just spin
slaves to process the data.

Taskmaster is still new, but if you’re in need of these kinds of one-off migration scripts, we encourage you to try
it out
and see if it fits.

Original Source

Leave a comment