Multi-threading in Python

Programming with threads is one of the more difficult tasks in programming. The Python threading and Queue modules make this significantly easier, but it still takes some deliberation to use threads in an efficient way.

Python’s thread and threading libraries use POSIX threads. The threading library is the higher level of the two and is therefore the one to use in your typical programming tasks. The Queue module provides a thread-safe mechanism for communicating between threads, like a combination list and semaphore.

POSIX threads are expensive, so it takes a little planning to know when to use them. Generally, the best uses of threads are for multiple tasks that cause side effects but do not depend on the state of other threads, such as output. An example of this is a program that writes out a large number of files with data from a database or a large data migration. Here is the general template for a threading class that encapsulates the actions to be taken.

Assume we have around 500 XML documents to download off of a remote server (via HTTP). Each is a large enough file that it warrants downloading several at a time. The server and the network can take a fair load, but we don’t want to simulate a botnet attack or overload our local network connection, which would slow down each download to a crawl and drastically increase collisions and errors while downloading. Let’s limit the number of files we download at once to 4. We start off with a function to download the files:

import urllib
 
def get_file(url):
    try:
        f = urllib.urlopen(url)
        contents = f.read()
        f.close()
        return contents
     except IOError:
        print "Could not open document: %s" % url

So much for error handling, but you get the idea. Assuming our url is stored in a variable with the name url, to execute this function in another thread, we run:

import threading
 
thread = threading.Thread(target=get_file, args=(url,))

We can make that a little simpler in two ways. The object oriented way is to implement the threading.Thread class. We would then put the code get_file(url) in our run() method. This is useful for instances when the result of the function is required for later processing. If the results are not needed, we can simplify using the functional programming method and utilize a partial application:

from functools import partial, threading
 
thread = threading.Thread(target=partial(get_file, url))

While that method is more fun, let’s use the OO method (no pun intended) since we want to do something with this data. Remember, we are downloading the file and storing it as a string, rather than simply downloading the file to the local file system. That implies we have more work to do after the download.

import urllib, threading
 
class FileGetter(threading.Thread):
    def __init__(self, url):
        self.url = url
        self.result = None
        threading.Thread.__init__(self)
 
    def get_result(self):
        return self.result
 
    def run(self):
        try:
            f = urllib.urlopen(url)
            contents = f.read()
            f.close()
            self.result = contents
         except IOError:
            print "Could not open document: %s" % url

Now we have our Thread implementation. Note that instantiating an instance of FileGetter does not cause the thread to start. That is done with the start() method. However, we don’t want all of the threads running at the same time, so we need to use the Queue module and a couple of helper functions to manage our list of files.

import threading
from Queue import Queue
 
def get_files(files):
    def producer(q, files):
        for file in files:
            thread = FileGetter(file)
            thread.start()
            q.put(thread, True)
 
    finished = []
    def consumer(q, total_files):
        while len(finished) < total_files:
            thread = q.get(True)
            thread.join()
            finished.append(thread.get_result())
 
    q = Queue(3)
    prod_thread = threading.Thread(target=producer, args=(q, files))
    cons_thread = threading.Thread(target=consumer, args=(q, len(files))
    prod_thread.start()
    cons_thread.start()
    prod_thread.join()
    cons_thread.join()

Let’s take a look at what we did here. The first function, producer, accepts the queue and the list of files. For each file, it starts a new FileGetter thread. The last line is significant. We add the thread to the queue. The second parameter, boolean True, tells the put() method to block until a slot is available. Note that the thread stores before the blocking does. This means that even if the queue is full, the thread will have started. Because of this, we reduce our queue size to 3.

The second function, the consumer, reads items out of the queue, blocking until an item is available in the queue. Then comes the important part, thread.join(). This causes the consumer to block until the thread completes its execution. This line is what keeps the queue from emptying before the next thread has complete execution (and therefore starting more threads). The consumer uses the module-level variable, finished, to store the results of each thread’s execution.

Last, we begin a thread for the producer and the consumer, start them, and then block until they have completed. Here is the complete code:

import urllib, threading
from Queue import Queue
 
class FileGetter(threading.Thread):
    def __init__(self, url):
        self.url = url
        self.result = None
        threading.Thread.__init__(self)
 
    def get_result(self):
        return self.result
 
    def run(self):
        try:
            f = urllib.urlopen(url)
            contents = f.read()
            f.close()
            self.result = contents
         except IOError:
            print "Could not open document: %s" % url
 
def get_files(files):
    def producer(q, files):
        for file in files:
            thread = FileGetter(file)
            thread.start()
            q.put(thread, True)
 
    finished = []
    def consumer(q, total_files):
        while len(finished) < total_files:
            thread = q.get(True)
            thread.join()
            finished.append(thread.get_result())
 
    q = Queue(3)
    prod_thread = threading.Thread(target=producer, args=(q, files))
    cons_thread = threading.Thread(target=consumer, args=(q, len(files))
    prod_thread.start()
    cons_thread.start()
    prod_thread.join()
    cons_thread.join()

Of course, this approach is not perfect. A queue is FIFO – first in, first out. If one of the threads currently executing finishes before the thread ahead of it, we lose efficiency in that now we only have three files downloading at a time. However, the solution to that is a complex one and outside the scope of this article.

Edit 02/19/2009: Fixed: FileGetter was not setting self.result in its run method. Thanks to tgray for pointing out the problem.

43 thoughts on “Multi-threading in Python

  1. I’m confused as to how self.result gets updated, would you please explain this?

    On a similar note, I was under the impression that a “return” from the run() method doesn’t do anything since run() is called from Thread.start(). Does this somehow **magic** “contents” into “self.result”?

  2. Hi, thanks for this helpful tutorial. Please correct me if I’m wrong, but I believe you forgot calling the init :

    class FileGetter(threading.Thread):
    def __init__(self, url):
    self.url = url
    self.result = None
    threading.Thread.__init__(self)

    Thanks again BTW.

    Best Regards,

    Denis

  3. Aloha,

    in the run() parts it should be “self.url” instead of just “url”, or?

    Else a very nice example, thanks for it.

    greetinx
    kristall

  4. Aloha,

    also imho “get_files(files)” should “return finished”. And even its inside a def imho one should not use “file” as a variable since there is a builtin with that name.

    Also ‘<’ does not show what it should.

    kristall

  5. Hey mate ! Thanks for the tutorial. Allowed me to get my head around threads again ! Hadn’t done it for years! Dam python makes it easier these days … /me remembers doing it in C++ …

  6. You’re missing a bracket:

    cons_thread = threading.Thread(target=consumer, args=(q, len(files))

    should read

    cons_thread = threading.Thread(target=consumer, args=(q, len(files)))

    Thanks for the helpful article.

    Regards, Faheem.

  7. Some more errors:

    1) < should be <

    2) You need self.url in two places in run().

    With these changes the code works.

    Regards, Faheem.

  8. You claim it is a complete code. Do you need a main program to drive this? Are you missing a statement such as get_files(list of URLs) to start it? Thanks.

  9. It works. However, there are two more threads running in parallel than the number in the Queue() call. Do not know why. Nevertheless, I can live with the difference. Many thanks for the example.

  10. Thanks for the example. Suggestion: test code before posting on the internet (missing closing parentheses, url should be self.url, etc.)

  11. not to self:

    clear all code before submitting immensely helpful tutorials. really, very nice explanation, and the corrections (!) submitted are helpful, just wanted to say thank you.

  12. Correct me if i am wrong: i think there can be 5 FileGetter-threads active (instead of 4), right?

    This is how i think that it works:
    When starting the thread calling the producer-function, 4 FileGetter-threads will be started while 3 will be added to the queue. The 4th call to put will wait because the Queue is full.
    When the thread calling the consumer-function is now started, one of the 3 threads is taken from the queue and the current thread waits for it (because of the join).
    But now the producer-thread can finish its put-call (where it was waiting) and start the 5th FileGetter-thread.
    So the number of parallel downloads is limited to a maximum of 5, not 4, right?

    Moreover i have another question. :)
    When waiting for the first FileGetter-thread (on the join-statement) other FileGetter-threads (e.g. the second and third) may finish while we are still waiting on the join-statement. So there may be unused queue-slots while waiting for the first thread which may take a long time.
    Or do i understand something wrong?

    Best Regards,
    Spookie

  13. This time I tried the application host to host, with TCP / IP connection using python, but sometimes during the same process occurs foult segmentation, it is when the query or insert into the table. whether the process should be the threading? and there are 2 processes in the same connection. A process runs every 5 minutes or so, and I make use of process B, but the response was too late and have been hit by a timeout, after 5 hours and then sent a late response but it did go through the process A. I try also to process B is not in time out but it is just as incurred through the process A. What should I do?

    My email riz33a@yahoo.com

    regard
    riz

  14. I get an error when running this that claims __init__ is never called. I believe you need threading.Thread.__init__(self) to appear first in your def __init__ before anything else, otherwise self.url, etc. doesn’t know what to reference.

    Am I wrong?

  15. Perhaps I am missing something but why are you putting the threads themselves in a queue? This seems like a very strange use of queues. Also you end up creating one thread per task which is very wasteful.

    Personally I would stick the jobs to be performed (in this case URLs to be downloaded) in a queue then create n (in this case 4) threads to work through that queue. If necessary, these threads can place the results in another queue to be processed by some other number of consumer threads.

    This solution would have the huge advantage that jobs can be added as you go along, which is absolutely crucial if you want to chain a few of these things together.

    I also do not understand the point of creating a Thread class to replace the get_files function. Ostensibly you do this to make things simpler, but actually I think it’s more complicated (at least for illustration purposes). Potentially you might want your tasks to be objects so your pool of threads would take task objects from a queue, do something to those objects and put the “finished” objects into another queue for further processing. But I can’t see why you need your tasks to be thread objects.

  16. Pingback: Python Threading | edgeArchitect

  17. I agree with Alastair.

    class Worker(threading.Thread):
        def __init__(self, taskQueue, resultQueue):
            self.taskQueue = taskQueue
    self.resultQueue = resultQueue
            super(threading.Thread, self).__init__()
        def run(self):
    while True: # until end marker found
    try:
    url = self.taskQueue().get(True)
    if url is None: # end-marker?
    self.resultQueue.put(self) # report being finished
    f = urllib.urlopen(url)
    contents = f.read()
    f.close()
    self.resultQueue.put(contents)
    except Exception, e:
    self.resultQueue.put(e)

    def get_files(urls):
    taskQueue = Queue()
    for url in urls:
    taskQueue.put(url)
    resultQueue = Queue()
    workers = set([ Worker(taskQueue, resultQueue) for i in range(4) ])
    for worker in workers:
    worker.start()
    taskQueue.put(None) # one end marker for each worker thread
    while workers:
    result = resultQueue.get(True)
    if result in workers:
    workers.remove(result)
    else:
    yield result

  18. I agree with Alastair.

    class Worker(threading.Thread):
        def __init__(self, taskQueue, resultQueue):
    self.taskQueue = taskQueue
    self.resultQueue = resultQueue
    super(threading.Thread, self).__init__()
    def run(self):
    while True: # until end marker found
    try:
    url = self.taskQueue().get(True)
    if url is None: # end-marker?
    self.resultQueue.put(self) # report being finished
    f = urllib.urlopen(url)
    contents = f.read()
    f.close()
    self.resultQueue.put(contents)
    except Exception, e:
    self.resultQueue.put(e)

    def get_files(urls):
    taskQueue = Queue()
    for url in urls:
    taskQueue.put(url)
    resultQueue = Queue()
    workers = set([ Worker(taskQueue, resultQueue) for i in range(4) ])
    for worker in workers:
    worker.start()
    taskQueue.put(None) # one end marker for each worker thread
    while workers:
    result = resultQueue.get(True)
    if result in workers:
    workers.remove(result)
    else:
    yield result

  19. Hmpf. Sorry for spamming :-(

    The blog does not allow removing old posts and does not handle hard spaces properly at all places. I hope you can decipher my code nevertheless.

    Alfe

    • I think that in his code, he wants one thread for each GET so they can happen concurrently. It would be nice tho to create a thread pool if changes his code to accept urls on demand, from a socket for example,

  20. Well, he writes, »However, we don’t want all of the threads running at the same time,« and that’s where he introduces the use of the Queue (and where my boggling of that use of it began).

    Also, his usecase is downloading a list of files. Only if running any number of downloads in parallel is working well on your system (which not very typical), this makes sense in the way he does it. More typical, one wants to download a bunch of files, but limit the concurrent threads doing this to avoid flooding the server(s) one downloads from and one’s own network cable. The worker pattern (each worker gets his task, performs it, then gets the next task until no tasks are left) fits this need much better I think.

  21. This is, I believe, a more straightforward approach to the problem. The number of concurrent threads is constant, and each thread will continue to pull data from the queue until it is empty. Plus, this approach is very generic, and can be applied a variety of applications.


    class Worker(threading.Thread):
    def __init__(self, function, in_queue, out_queue):
    self.function = function
    self.in_queue, self.out_queue = in_queue, out_queue
    super(Worker, self).__init__()

    def run(self):
    while True:
    if self.in_queue.empty(): break
    data = in_queue.get()
    result = self.function(*data)
    self.out_queue.put(result)
    self.in_queue.task_done()

    def process(data, function, num_workers=1):
    in_queue = Queue()
    for item in data:
    in_queue.put(item)
    out_queue = Queue(maxsize=in_queue.qsize())
    workers = [Worker(function, in_queue, out_queue) for i in xrange(num_workers)]
    for worker in workers: worker.start()
    in_queue.join()
    while not out_queue.empty():
    yield out_queue.get()

    • Like Reggie, i strongly repeat this Suggestion: test code before posting on the internet (missing closing parentheses, url should be self.url, etc.)

      Thank you Joel, could you post your code indented, in some way? (maybe replacing indentation spaces with ‘-‘?)

  22. Pingback: Python (programming language): What is the best way to do multithreading in Python? - Quora

  23. Pingback: Advancing Over the Fall Break « E.C.H.O.

  24. Pingback: Advancing over the Break « ECHO

  25. My developer is trying to persuade me to move to .net from PHP.
    I have always disliked the idea because of the
    costs. But he’s tryiong none the less. I’ve been using Movable-type on several websites for about a year and am worried about switching to another platform.

    I have heard fantastic things about blogengine.net. Is
    there a way I can transfer all my wordpress content into it?
    Any kind of help would be really appreciated!

  26. I’m still learning about threading and I thought your article was geniously simple to understand, especially for beginners, and the code example was very helpful.

    I was curious about one part: I’m not entirely sure about this, but Isn’t it true that Python will not allow multiple threads of PURE Python code to run at the same time due to the GIL? (http://stackoverflow.com/questions/20939299/does-or-doesnt-python-support-multithreading)

    If so, how is it that your example code is supposed to run multiple threads simultaneously, since the threads in your code are calling on functions that you have defined containing several lines of PURE Python code? Wouldn’t the GIL only allow one of your functions to run at a time (which includes having to wait for the urllib download to complete), and make the other threads wait for their turn, thus meaning that only one file is downloaded at a time? I was under the impression that to make multithreading work the urllib function (which is written in some C’ish language and thus compatible with multithreading) itself is what has to be called by the threading module, e.g. Thread(urllib.urlopen(url)).

    But again, I’m not sure, so I’d be happy to hear if there are in fact other ways to make it work, like yours, and why.

    • The GIL is released for I/O operations. Threads let you simulate non-blocking I/O without the asynchronous, event-based workflow (although they do introduce their own warts, too). While one thread is waiting on input from the socket, other threads may run.

Leave a Reply

Your email address will not be published. Required fields are marked *

*

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>