Multi-threading in Python
Programming with threads is one of the more difficult tasks in programming. The Python threading and Queue modules make this significantly easier, but it still takes some deliberation to use threads in an efficient way.
Python’s thread and threading libraries use POSIX threads. The threading library is the higher level of the two and is therefore the one to use in your typical programming tasks. The Queue module provides a thread-safe mechanism for communicating between threads, like a combination list and semaphore.
POSIX threads are expensive, so it takes a little planning to know when to use them. Generally, the best uses of threads are for multiple tasks that cause side effects but do not depend on the state of other threads, such as output. An example of this is a program that writes out a large number of files with data from a database or a large data migration. Here is the general template for a threading class that encapsulates the actions to be taken.
Assume we have around 500 XML documents to download off of a remote server (via HTTP). Each is a large enough file that it warrants downloading several at a time. The server and the network can take a fair load, but we don’t want to simulate a botnet attack or overload our local network connection, which would slow down each download to a crawl and drastically increase collisions and errors while downloading. Let’s limit the number of files we download at once to 4. We start off with a function to download the files:
import urllib def get_file(url): try: f = urllib.urlopen(url) contents = f.read() f.close() return contents except IOError: print "Could not open document: %s" % url
So much for error handling, but you get the idea. Assuming our url is stored in a variable with the name url, to execute this function in another thread, we run:
import threading thread = threading.Thread(target=get_file, args=(url,))
We can make that a little simpler in two ways. The object oriented way is to implement the threading.Thread class. We would then put the code get_file(url) in our run() method. This is useful for instances when the result of the function is required for later processing. If the results are not needed, we can simplify using the functional programming method and utilize a partial application:
from functools import partial, threading thread = threading.Thread(target=partial(get_file, url))
While that method is more fun, let’s use the OO method (no pun intended) since we want to do something with this data. Remember, we are downloading the file and storing it as a string, rather than simply downloading the file to the local file system. That implies we have more work to do after the download.
import urllib, threading class FileGetter(threading.Thread): def __init__(self, url): self.url = url self.result = None threading.Thread.__init__(self) def get_result(self): return self.result def run(self): try: f = urllib.urlopen(url) contents = f.read() f.close() self.result = contents except IOError: print "Could not open document: %s" % url
Now we have our Thread implementation. Note that instantiating an instance of FileGetter does not cause the thread to start. That is done with the start() method. However, we don’t want all of the threads running at the same time, so we need to use the Queue module and a couple of helper functions to manage our list of files.
import threading from Queue import Queue def get_files(files): def producer(q, files): for file in files: thread = FileGetter(file) thread.start() q.put(thread, True) finished = [] def consumer(q, total_files): while len(finished) < total_files: thread = q.get(True) thread.join() finished.append(thread.get_result()) q = Queue(3) prod_thread = threading.Thread(target=producer, args=(q, files)) cons_thread = threading.Thread(target=consumer, args=(q, len(files)) prod_thread.start() cons_thread.start() prod_thread.join() cons_thread.join()
Let’s take a look at what we did here. The first function, producer, accepts the queue and the list of files. For each file, it starts a new FileGetter thread. The last line is significant. We add the thread to the queue. The second parameter, boolean True, tells the put() method to block until a slot is available. Note that the thread stores before the blocking does. This means that even if the queue is full, the thread will have started. Because of this, we reduce our queue size to 3.
The second function, the consumer, reads items out of the queue, blocking until an item is available in the queue. Then comes the important part, thread.join(). This causes the consumer to block until the thread completes its execution. This line is what keeps the queue from emptying before the next thread has complete execution (and therefore starting more threads). The consumer uses the module-level variable, finished, to store the results of each thread’s execution.
Last, we begin a thread for the producer and the consumer, start them, and then block until they have completed. Here is the complete code:
import urllib, threading from Queue import Queue class FileGetter(threading.Thread): def __init__(self, url): self.url = url self.result = None threading.Thread.__init__(self) def get_result(self): return self.result def run(self): try: f = urllib.urlopen(url) contents = f.read() f.close() self.result = contents except IOError: print "Could not open document: %s" % url def get_files(files): def producer(q, files): for file in files: thread = FileGetter(file) thread.start() q.put(thread, True) finished = [] def consumer(q, total_files): while len(finished) < total_files: thread = q.get(True) thread.join() finished.append(thread.get_result()) q = Queue(3) prod_thread = threading.Thread(target=producer, args=(q, files)) cons_thread = threading.Thread(target=consumer, args=(q, len(files)) prod_thread.start() cons_thread.start() prod_thread.join() cons_thread.join()
Of course, this approach is not perfect. A queue is FIFO – first in, first out. If one of the threads currently executing finishes before the thread ahead of it, we lose efficiency in that now we only have three files downloading at a time. However, the solution to that is a complex one and outside the scope of this article.
Edit 02/19/2009: Fixed: FileGetter was not setting self.result in its run method. Thanks to tgray for pointing out the problem.
Tags:concurrency, python, threads





18. February 2009 at 3:38 pm :
I’m confused as to how self.result gets updated, would you please explain this?
On a similar note, I was under the impression that a “return” from the run() method doesn’t do anything since run() is called from Thread.start(). Does this somehow **magic** “contents” into “self.result”?
18. February 2009 at 3:52 pm :
Thanks for pointing out the bug. I don’t know how that went unnoticed for so long :).
23. May 2009 at 5:24 pm :
Hi, thanks for this helpful tutorial. Please correct me if I’m wrong, but I believe you forgot calling the init :
class FileGetter(threading.Thread):
def __init__(self, url):
self.url = url
self.result = None
threading.Thread.__init__(self)
Thanks again BTW.
Best Regards,
Denis
29. May 2009 at 6:39 am :
Denis – thanks for noticing that. I’ve updated the code.