The threading module was first introduced in Python 1.5.2 as an enhancement of the low-level thread module. The threading module makes working with threads much easier and allows the program to run multiple operations at once.
Note that the threads in Python work best with I/O operations, such as downloading resources from the Internet or reading files and directories on your computer. If you need to do something that will be CPU intensive, then you will want to look at Python’s multiprocessing module instead. The reason for this is that Python has the Global Interpreter Lock (GIL) that basically makes all threads run inside of one master thread. Because of this, when you go to run multiple CPU intensive operations with threads, you may find that it actually runs slower. So we will be focusing on what threads do best: I/O operations!
Intro to Threads
A thread let’s you run a piece of long running code as if it were a separate program. It’s kind of like calling subprocess except that you are calling a function or class instead of a separate program. I always find it helpful to look at a concrete example. Let’s take a look at something that’s really simple:
import threading def doubler(number): """ A function that can be used by a thread """ print(threading.currentThread().getName() + '\n') print(number * 2) print() if __name__ == '__main__': for i in range(5): my_thread = threading.Thread(target=doubler, args=(i,)) my_thread.start()
Here we import the threading module and create a regular function called doubler. Our function takes a value and doubles it. It also prints out the name of the thread that is calling the function and prints a blank line at the end. Then in the last block of code, we create five threads and start each one in turn. You will note that when we instantiate a thread, we set its target to our doubler function and we also pass an argument to the function. The reason the args parameter looks a bit odd is that we need to pass a sequence to the doubler function and it only takes one argument, so we need to put a comma on the end to actually create a sequence of one.
Note that if you’d like to wait for a thread to terminate, you would need to call its join() method.
When you run this code, you should get the following output:
Thread-1 0 Thread-2 2 Thread-3 4 Thread-4 6 Thread-5 8
Of course, you normally wouldn’t want to print your output to stdout. This can end up being a really jumbled mess when you do. Instead, you should use Python’s logging module. It’s thread-safe and does an excellent job. Let’s modify the example above to use the logging module and name our threads while we’ll at it:
import logging import threading def get_logger(): logger = logging.getLogger("threading_example") logger.setLevel(logging.DEBUG) fh = logging.FileHandler("threading.log") fmt = '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s' formatter = logging.Formatter(fmt) fh.setFormatter(formatter) logger.addHandler(fh) return logger def doubler(number, logger): """ A function that can be used by a thread """ logger.debug('doubler function executing') result = number * 2 logger.debug('doubler function ended with: {}'.format( result)) if __name__ == '__main__': logger = get_logger() thread_names = ['Mike', 'George', 'Wanda', 'Dingbat', 'Nina'] for i in range(5): my_thread = threading.Thread( target=doubler, name=thread_names[i], args=(i,logger)) my_thread.start()
The big change in this code is the addition of the get_logger function. This piece of code will create a logger that’s set to the debug level. It will save the log to the current working directory (i.e. where the script is run from) and then we set up the format for each line logged. The format includes the time stamp, the thread name, the logging level and the message logged.
In the doubler function, we change our print statements to logging statements. You will note that we are passing the logger into the doubler function when we create the thread. The reason we do this is that if you instantiated the logging object in each thread, you would end up with multiple logging singletons and your log would have a lot of duplicate lines in it.
Lastly, we name our threads by creating a list of names and then setting each thread to a specific name using the name parameter. When you run this code, you should get a log file with the following contents:
2016-07-24 20:39:50,055 - Mike - DEBUG - doubler function executing 2016-07-24 20:39:50,055 - Mike - DEBUG - doubler function ended with: 0 2016-07-24 20:39:50,055 - George - DEBUG - doubler function executing 2016-07-24 20:39:50,056 - George - DEBUG - doubler function ended with: 2 2016-07-24 20:39:50,056 - Wanda - DEBUG - doubler function executing 2016-07-24 20:39:50,056 - Wanda - DEBUG - doubler function ended with: 4 2016-07-24 20:39:50,056 - Dingbat - DEBUG - doubler function executing 2016-07-24 20:39:50,057 - Dingbat - DEBUG - doubler function ended with: 6 2016-07-24 20:39:50,057 - Nina - DEBUG - doubler function executing 2016-07-24 20:39:50,057 - Nina - DEBUG - doubler function ended with: 8
That output is pretty self-explanatory, so let’s move on. I want to cover one more topic in this section. Namely, subclassing threading.Thread. Let’s take this last example and instead of calling Thread directly, we’ll create our own custom subclass. Here is the updated code:
import logging import threading class MyThread(threading.Thread): def __init__(self, number, logger): threading.Thread.__init__(self) self.number = number self.logger = logger def run(self): """ Run the thread """ logger.debug('Calling doubler') doubler(self.number, self.logger) def get_logger(): logger = logging.getLogger("threading_example") logger.setLevel(logging.DEBUG) fh = logging.FileHandler("threading_class.log") fmt = '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s' formatter = logging.Formatter(fmt) fh.setFormatter(formatter) logger.addHandler(fh) return logger def doubler(number, logger): """ A function that can be used by a thread """ logger.debug('doubler function executing') result = number * 2 logger.debug('doubler function ended with: {}'.format( result)) if __name__ == '__main__': logger = get_logger() thread_names = ['Mike', 'George', 'Wanda', 'Dingbat', 'Nina'] for i in range(5): thread = MyThread(i, logger) thread.setName(thread_names[i]) thread.start()
In this example, we just subclassed threading.Thread. We pass in the number that we want to double and the logging object as before. But this time, we set the name of the thread differently by calling setName on the thread object. We still need to call start on each thread, but you will notice that we didn’t need to define that in our subclass. When you call start, it will run your thread by calling the run method. In our class, we call the doubler function to do our processing. The output is pretty much the same except that I added an extra line of output. Go ahead and run it to see what you get.
Locks and Synchronization
When you have more than one thread, then you may find yourself needing to consider how to avoid conflicts. What I mean by this is that you may have a use case where more than one thread will need to access the same resource at the same time. If you don’t think about these issues and plan accordingly, then you will end up with some issues that always happen at the worst of times and usually in production.
The solution is to use locks. A lock is provided by Python’s threading module and can be held by either a single thread or no thread at all. Should a thread try to acquire a lock on a resource that is already locked, that thread will basically pause until the lock is released. Let’s look at a fairly typical example of some code that doesn’t have any locking functionality but that should have it added:
import threading total = 0 def update_total(amount): """ Updates the total by the given amount """ global total total += amount print (total) if __name__ == '__main__': for i in range(10): my_thread = threading.Thread( target=update_total, args=(5,)) my_thread.start()
What would make this an even more interesting example would be to add a time.sleep call that is of varying length. Regardless, the issue here is that one thread might call update_total and before it’s done updating it, another thread might call it and attempt to update it too. Depending on the order of operations, the value might only get added to once.
Let’s add a lock to the function. There are two ways to do this. The first way would be to use a try/finally as we want to ensure that the lock is always released. Here’s an example:
import threading total = 0 lock = threading.Lock() def update_total(amount): """ Updates the total by the given amount """ global total lock.acquire() try: total += amount finally: lock.release() print (total) if __name__ == '__main__': for i in range(10): my_thread = threading.Thread( target=update_total, args=(5,)) my_thread.start()
Here we just acquire the lock before we do anything else. Then we attempt to update the total and finally, we release the lock and print the current total. We can actually eliminate a lot of this boilerplate using Python’s with statement:
import threading total = 0 lock = threading.Lock() def update_total(amount): """ Updates the total by the given amount """ global total with lock: total += amount print (total) if __name__ == '__main__': for i in range(10): my_thread = threading.Thread( target=update_total, args=(5,)) my_thread.start()
As you can see, we no longer need the try/finally as the context manager that is provided by the with statement does all of that for us.
Of course you will also find yourself writing code where you need multiple threads accessing multiple functions. When you first start writing concurrent code, you might do something like this:
import threading total = 0 lock = threading.Lock() def do_something(): lock.acquire() try: print('Lock acquired in the do_something function') finally: lock.release() print('Lock released in the do_something function') return "Done doing something" def do_something_else(): lock.acquire() try: print('Lock acquired in the do_something_else function') finally: lock.release() print('Lock released in the do_something_else function') return "Finished something else" if __name__ == '__main__': result_one = do_something() result_two = do_something_else()
This works alright in this circumstance, but suppose you have multiple threads calling both of these functions. While one thread is running over the functions, another one could be modifying the data too and you’ll end up with some incorrect results. The problem is that you might not even notice the results are wrong immediately. What’s the solution? Let’s try to figure that out.
A common first thought would be to add a lock around the two function calls. Let’s try modifying the example above to look like the following:
import threading total = 0 lock = threading.RLock() def do_something(): with lock: print('Lock acquired in the do_something function') print('Lock released in the do_something function') return "Done doing something" def do_something_else(): with lock: print('Lock acquired in the do_something_else function') print('Lock released in the do_something_else function') return "Finished something else" def main(): with lock: result_one = do_something() result_two = do_something_else() print (result_one) print (result_two) if __name__ == '__main__': main()
When you actually go to run this code, you will find that it just hangs. The reason is that we just told the threading module to acquire the lock. So when we call the first function, it finds that the lock is already held and blocks. It will continue to block until the lock is released, which will never happen.
The real solution here is to use a Re-Entrant Lock. Python’s threading module provides one via the RLock function. Just change the line lock = threading.Lock() to lock = threading.RLock() and try re-running the code. Your code should work now!
If you want to try the code above with actual threads, then we can replace the call to main with the following:
if __name__ == '__main__': for i in range(10): my_thread = threading.Thread( target=main) my_thread.start()
This will run the main function in each thread, which will in turn call the other two functions. You’ll end up with 10 sets of output too.
Timers
The threading module has a neat class called Timer that you can use to represent an action that should take place after a specified amount of time. They actually spin up their own custom thread and are started using the same start() method that a regular thread uses. You can also stop a timer using its cancel method. It should be noted that you can even cancel the timer before it’s even started.
The other day I ran into a use case where I needed to communicate with a subprocess I had started but I needed it to timeout. While there are lots of different approaches to this particular problem, my favorite solution was using the threading module’s Timer class.
For this example, we will look at using the ping command. In Linux, the ping command will run until you kill it. So the Timer class becomes especially handy in Linux-land. Here’s an example:
import subprocess from threading import Timer kill = lambda process: process.kill() cmd = ['ping', 'www.google.com'] ping = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) my_timer = Timer(5, kill, [ping]) try: my_timer.start() stdout, stderr = ping.communicate() finally: my_timer.cancel() print (str(stdout))
Here we just set up a lambda that we can use to kill the process. Then we start our ping job and create a Timer object. You will note that the first argument is the time in seconds to wait, then the function to call and the argument to pass to the function. In this case, our function is a lambda and we pass it a list of arguments where the list happens to only have one element. If you run this code, it should run for about 5 seconds and then print out the results of the ping.
Other Thread Components
The threading module includes support for other items too. For example, you can create a Semaphore which is one of the oldest synchronization primitives in computer science. Basically, a Semaphore manages an internal counter which will be decremented whenever you call acquire on it and incremented when you call release. The counter is designed in such a way that it cannot go below zero. So if you happen to call acquire when it’s zero, then it will block.
Another useful tool that’s included is the Event. It will allow you to communicate between threads using signals. We will be looking at an example that uses an Event in the next section.
Finally in Python 3.2, the Barrier object was added. The Barrier is a primitive that basically manages a thread pool wherein the threads have to wait for each other. To pass the barrier, the thread needs to call the wait() method which will block until all the threads have made the call. Then it will release all the threads simultaneously.
Thread Communication
There are some use cases where you will want to have your threads communicate with each other. As we mentioned earlier, you can use create an Event for this purpose. But a more common method is to use a Queue. For our example, we’ll actually use both! Let’s see what that looks like:
import threading from queue import Queue def creator(data, q): """ Creates data to be consumed and waits for the consumer to finish processing """ print('Creating data and putting it on the queue') for item in data: evt = threading.Event() q.put((item, evt)) print('Waiting for data to be doubled') evt.wait() def my_consumer(q): """ Consumes some data and works on it In this case, all it does is double the input """ while True: data, evt = q.get() print('data found to be processed: {}'.format(data)) processed = data * 2 print(processed) evt.set() q.task_done() if __name__ == '__main__': q = Queue() data = [5, 10, 13, -1] thread_one = threading.Thread(target=creator, args=(data, q)) thread_two = threading.Thread(target=my_consumer, args=(q,)) thread_one.start() thread_two.start() q.join()
Let’s break this down a bit. First off, we have a creator (AKA a producer) function that we use to create data that we want to work on (or consume). Then we have another function that we use for processing the data that we are calling my_consumer. The creator function will use the Queue’s put method to put the data into the Queue and the consumer will continually check for more data and process it when it becomes available. The Queue handles all the acquires and releases of the locks so you don’t have to.
In this example, we create a list of values that we want to double. Then we create two threads, one for the creator / producer and one for the consumer. You will note that we pass a Queue object to each thread which is the magic behind how the locks get handled. The queue will have the first thread feed data to the second. When the first puts some data into the queue, it also passes in an Event and then waits for the event to finish. Then in the consumer, the data is processed and when it’s done, it calls the set method of the Event which tells the first thread that the second is done processing and it can continue.
The very last line of code call’s the Queue object’s join method which tells the Queue to wait for the threads to finish. The first thread ends when it runs out of items to put into the Queue.
Wrapping Up
We covered a lot of material here. You have learned the following:
- The basics of threading
- How locking works
- What Events are and how they can be used
- How to use a Timer
- Inter-Thread Communication using Queues / Events
Now that you know how threads are used and what they are good for, I hope you will find many good uses for them in your own code.
Related Reading
- Python documentation on the threading module
- Eli Bendersky – Python threads: communication and stopping
Pingback: Python 201: A Multiprocessing Tutorial – scribdbook
Pingback: Python 201: A multiprocessing tutorial - The Mouse Vs. The Python