Thursday, August 15, 2013

Practical event-driven programming with Python and Twisted

Introduction

A article from 2008 entitled Practical threaded programming with Python was posted to HN today. And I thought, "how would those examples look with Twisted?"

For a great explanation about how Twisted does concurrency, see krondo's Twisted Introduction. On to the code:

Hello World

The first example in the article demonstrates that threads have IDs. Since we're not using threads, the most equiavelent way to do the same thing with Twisted is to not use Twisted at all:

import datetime


def run(what):
now = datetime.datetime.now()
print '%s says Hello World at time: %s' % (what, now)


for i in range(2):
run(i)

Output:

0 says Hello World at time: 2013-08-15 13:45:17.164933
1 says Hello World at time: 2013-08-15 13:45:17.165442

Using queues

The next example shows first a serial approach and then a threaded approach to "grab a URL of a website, and print out the first 1024 bytes of the page." Here are the synchronous/serial and threaded versions.

I should note that I've modified them to get all the page (instead of the first 1024 bytes) and to print a hash of the content (so as not to clutter up this post). It's interesting that only apple.com and ibm.com return the same hash every time.

Synchronous version

import urllib2
import time
import hashlib

hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
"http://ibm.com", "http://apple.com"]

start = time.time()
#grabs urls of hosts and prints first 1024 bytes of page
for host in hosts:
url = urllib2.urlopen(host)
print hashlib.sha1(url.read()).hexdigest(), host

print "Elapsed Time: %s" % (time.time() - start)

Output:

2430771cc3723e965b64eda2d69dd22b697dd4a0 http://yahoo.com
790ace256c1b683a585226d286859f9f2910d9b0 http://google.com
63fbbe761817ebef066f9562e96209ca25a6f0b3 http://amazon.com
dd2f34c7c4f47b49272d7922e4f17f7c1cafd3aa http://ibm.com
562ffc06504dc0557386524b382372448d6e953a http://apple.com
Elapsed Time: 3.34798121452

Threaded version

#!/usr/bin/env python
import Queue
import threading
import urllib2
import time
import hashlib

hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
"http://ibm.com", "http://apple.com"]

queue = Queue.Queue()

class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue

def run(self):
while True:
#grabs host from queue
host = self.queue.get()

#grabs urls of hosts and prints first 1024 bytes of page
url = urllib2.urlopen(host)
print hashlib.sha1(url.read()).hexdigest(), host

#signals to queue job is done
self.queue.task_done()


start = time.time()
def main():
#spawn a pool of threads, and pass them queue instance
for i in range(5):
t = ThreadUrl(queue)
t.setDaemon(True)
t.start()

#populate queue with data
for host in hosts:
queue.put(host)

#wait on the queue until everything has been processed
queue.join()

main()
print "Elapsed Time: %s" % (time.time() - start)

Output:

562ffc06504dc0557386524b382372448d6e953a http://apple.com
fb6fe32cb270f7929157bec5f29ee44f729949fd http://google.com
dd2f34c7c4f47b49272d7922e4f17f7c1cafd3aa http://ibm.com
3643a39f4dd641a3c08f8e5c409d0f5bc6407aed http://amazon.com
3072477b1680fc2650d9cb0674e5ef7972873bf6 http://yahoo.com
Elapsed Time: 1.23798894882

Twisted version

Here's one way to do the same thing with Twisted:

from twisted.internet import defer, task
from twisted.web.client import getPage
import time
import hashlib

hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
"http://ibm.com", "http://apple.com"]

start = time.time()

def printHash(content, host):
print hashlib.sha1(content).hexdigest(), host


def main(reactor, hosts):
dlist = []
for host in hosts:
d = getPage(host)
# when we have the content, call printHash with it
d.addCallback(printHash, host)
dlist.append(d)

# finish the process when the "queue" is done
return defer.gatherResults(dlist).addCallback(printElapsedTime)


def printElapsedTime(ignore):
print "Elapsed Time: %s" % (time.time() - start)


task.react(main, [hosts])

Output:

188eecd4da73515a9d1b3fde88d81ccc3a1e6028 http://google.com
562ffc06504dc0557386524b382372448d6e953a http://apple.com
dd2f34c7c4f47b49272d7922e4f17f7c1cafd3aa http://ibm.com
968fc83c1c7717575af03d43b236baf508134d0f http://yahoo.com
90c51ab729261bb72db922fb5ad22c0ae33c09da http://amazon.com
Elapsed Time: 1.36157393456

The run times of the threaded version and the Twisted version are comparable. Running them each multiple times, sometimes the threaded version is faster and sometimes the Twisted version is faster. They are both consistently faster than the synchronous version. Either way, this isn't a great benchmark and doesn't say much about how ansynchronous v. threaded will work in your particular case.

Working with multiple queues

The article's third bit of code shows how to use multiple queues to get the URL's body in one thread, then process it in another thread.

Threaded version

import Queue
import threading
import urllib2
import time
from BeautifulSoup import BeautifulSoup

hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
"http://ibm.com", "http://apple.com"]

queue = Queue.Queue()
out_queue = Queue.Queue()

class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, queue, out_queue):
threading.Thread.__init__(self)
self.queue = queue
self.out_queue = out_queue

def run(self):
while True:
#grabs host from queue
host = self.queue.get()

#grabs urls of hosts and then grabs chunk of webpage
url = urllib2.urlopen(host)
chunk = url.read()

#place chunk into out queue
self.out_queue.put(chunk)

#signals to queue job is done
self.queue.task_done()

class DatamineThread(threading.Thread):
"""Threaded Url Grab"""
def __init__(self, out_queue):
threading.Thread.__init__(self)
self.out_queue = out_queue

def run(self):
while True:
#grabs host from queue
chunk = self.out_queue.get()

#parse the chunk
soup = BeautifulSoup(chunk)
print soup.findAll(['title'])

#signals to queue job is done
self.out_queue.task_done()

start = time.time()
def main():

#spawn a pool of threads, and pass them queue instance
for i in range(5):
t = ThreadUrl(queue, out_queue)
t.setDaemon(True)
t.start()

#populate queue with data
for host in hosts:
queue.put(host)

for i in range(5):
dt = DatamineThread(out_queue)
dt.setDaemon(True)
dt.start()


#wait on the queue until everything has been processed
queue.join()
out_queue.join()

main()
print "Elapsed Time: %s" % (time.time() - start)

Output:

[<title>Apple</title>]
[<title>Google</title>]
[<title>IBM - United States</title>]
[<title>Amazon.com: Online Shopping for Electronics, Apparel, Computers, Books, DVDs &amp; more</title>]
[<title>Yahoo!</title>]
Elapsed Time: 1.65801095963

Twisted version

For this simple example, it makes sense to just do the processing right after receiving the body. That would look like this:

from twisted.internet import defer, task
from twisted.web.client import getPage
import time
from BeautifulSoup import BeautifulSoup

hosts = ["http://yahoo.com", "http://google.com", "http://amazon.com",
"http://ibm.com", "http://apple.com"]

start = time.time()

def printTitle(content, host):
soup = BeautifulSoup(content)
print soup.findAll(['title'])


def main(reactor, hosts):
dlist = []
for host in hosts:
d = getPage(host)
# when we have the content, call printTitle with it
d.addCallback(printTitle, host)
dlist.append(d)

# finish the process when the "queue" is done
return defer.gatherResults(dlist).addCallback(printElapsedTime)


def printElapsedTime(ignore):
print "Elapsed Time: %s" % (time.time() - start)


task.react(main, [hosts])

Output:

[<title>Google</title>]
[<title>Apple</title>]
[<title>IBM - United States</title>]
[<title>Amazon.com: Online Shopping for Electronics, Apparel, Computers, Books, DVDs &amp; more</title>]
[<title>Yahoo!</title>]
Elapsed Time: 1.80365180969

(As with the previous examples, neither the threaded nor the Twisted version are much different in speed.)

Hey!

"Hey! Those aren't the same!" I hear you say. You are right. They are not. The threaded version could extract the title in ThreadUrl.run instead of putting the content in queue for a DatamineThread.

I think the author was trying to show how you can make two threads work together on something... big? I haven't come up with a problem where it makes sense to write something in the Twisted version other than d.addCallback(printTitle, ...). If you have an idea post a comment, and I'll happily update this post (or make another post).

Conclusion

You can do things with threading. You can do things with Twisted. You should investigate Twisted (mostly for reasons not mentioned in this post). As noted above, krondo's Twisted Introduction is good, or there's some stuff I've written.

Also, if anyone can think of a better scenario for the two-kinds-of-thread-workers model, I'll update (or post again) with what a Twisted version might look like.

1 comment: