Add to Technorati Favorites

A resizable dispatch queue for Twisted

In December 2009 I posted to the Twisted mailing list about what I called a Resizable Dispatch Queue. I’ve just spent some time making a new version that’s much improved. You can pick up the new version via pip install txrdq, from PyPI, or from the txRDQ project page on Launchpad. Here’s the example use case, taken from my original posting:

You want to write a server with a web interface that allows people to enter their phone number so you can send them an SMS. You anticipate lots of people will use the service. But sending SMS messages is quite slow, and the company that you ship those jobs off to is concerned that you’ll overrun their service (or maybe they have an API limit, etc). So you need to queue up jobs locally and send them off at a certain rate. You’d like to be able to adjust that rate up or down. You also want to be able to shut your service down cleanly (i.e., not in the middle of a task), and when you restart it you want to be able to re-queue the jobs that were queued last time but which hadn’t gone out.

To make the example more concrete, suppose your function that sends the SMS is called sendSMS and that it takes a (number, message) tuple argument. Here are some of the kinds of things you can do:

from txrdq.rdq import ResizableDispatchQueue

# Create a queue that will allow 5 simultaneous underway jobs.
rdq = ResizableDispatchQueue(sendSMS, 5)

# Later... send off some SMS messages.
d1 = rdq.put((2127399921, 'Hello...'), priority=5)
d1.addCallback(...)

d2 = rdq.put((5052929919, 'Test...'), priority=5)
d2.addCallback(...)

# Cancel the second job
d2.cancel()

# Widen the outgoing pipeline to 10 simultaneous jobs.
rdq.width = 10

# We're dispatching jobs too fast, turn it down a little.
rdq.width = 7

# Get a copy of the list of pending jobs.
jobs = rdq.pending()

# Cancel one of the pending jobs from the jobs list.
job.cancel()

# Reprioritize one of the pending jobs from the jobs list.
rdq.reprioritize(job, -1)

# Arrange to increase the number of jobs in one hour.
reactor.callLater(3600, rdq.setWidth, 20)

# Pause processing.
rdq.pause()

# Resume processing, with a new width of 8.
rdq.resume(8)

# Shutdown. Wait for any underway jobs to complete, and save
# the list of jobs not yet processed.

def saveJobs(jobs):
    pickle.dump(jobs, ...)

d = rdq.stop()
d.addCallback(saveJobs)

I’ve come up with many uses for this class over the last 18 months, and have quite often recommended it to people on the #twisted IRC channel. Other examples include fetching a large number of URLs in a controlled way, making many calls to the Twitter API, etc.

Usage of the class is very simple. You create the dispatch queue, giving it a function that will be called to process all jobs. Then you just put job arguments as fast as you like. Each call to put gets you a Twisted Deferred instance. If your function runs successfully on the argument, the deferred will call back with an instance of txrdq.job.Job. The job contains information about when it was queued, when it was launched, when it stopped, and of course the result of the function. If your function hits an error or the job is canceled (by calling cancel on the deferred), the deferred will errback and the failure will again contain a job instance with the details.

It’s also useful to have an admin interface to the queue, so calls such as pending and underway are provided. These return lists of job instances. You can call cancel on a job instance too. You can reprioritize jobs. And you can pause processing or shut the queue down cleanly.

The code also contains an independently useful Twisted classes called DeferredPriorityQueue (which I plan to write about), and DeferredPool (which I described earlier).


You can follow any responses to this entry through the RSS 2.0 feed. Both comments and pings are currently closed.

Comments are closed.