Intro
Per the standard library documentation:
The
concurrent.futuresmodule provides a high-level interface for asynchronously executing callables.The asynchronous execution can be performed with threads, using
ThreadPoolExecutor, or separate processes, usingProcessPoolExecutor. Both implement the same interface, which is defined by the abstractExecutorclass.
Threads or Processes
Currently, Python uses something called a Global Interpreter Lock (GIL). This means that whenever a python object is accessed in a process, the accessing thread has exclusive access to all python objects. This is a heavy-handed design choice, that prevents data corruption (race conditions etc.) that can arise from simultaneous access to memory from multiple threads.
The disadvantage is that pure python code often cannot benefit from using multiple threads, since only one thread can access python objects and variables at a time.
So what is the point? Some tasks do not involve significant amounts of computation involving python objects, for example:
- Code using compiled extensions: If the computation is done in compiled code that spends most of its time not interacting with python data structures, then the GIL is less relevant and multithreading can achieve parallel execution
- Code waiting on IO: If most of the time is spent on IO, such as reading/writing to disk or particularly making requests over the network, then the code can be sped up by making parallel requests
For everything else, we need to rely on multiprocessing, via ProcessPoolExecutor. This method spawns entire new Python interpreters, which allows true parallel execution. That said, the multiprocessing approach has a few disadvantages:
- Spawning a new interpreter takes time and RAM, so there is significant overhead
- All the required data needs to be sent to and from the new processes. This can add further overhead and limits usage to handling objects that can be pickled.
- Care must be taken with code using mutable singleton classes and global variables, as the mutations will not be reflected in the new processes. (I think mutable singleton classes may be a bad idea in general)
API
The two key objects are Executor, which comes in ThreadPoolExecutor and ProcessPoolExecutor; and Future.
Executors should usually be created via a context manager. When the context scope is exited, the executor runs the shutdown method, which waits for any submitted tasks to complete, then releases the resources managed by the executor (i.e. processes or threads).
In order to run a task (encapsulated by a callable plus args and kwargs), you use the submit method of the executor instance.
The submit method returns a Future object, which is like a ticket or receipt that can be used to collect the result of the task when it is ready.
The result of the computation must be extracted using the result method of the future. This method will block or wait until the task is complete and then return the result.
Let's put this together with an example
import time
from concurrent.futures import ThreadPoolExecutor
def long_computation(x: float) -> float:
time.sleep(x / 2)
return x * x
with ThreadPoolExecutor() as pool:
future10 = pool.submit(long_computation, 10)
future11 = pool.submit(long_computation, 11)
future12 = pool.submit(long_computation, 12)
print("10:", future10.result())
print("11:", future11.result())
print("12:", future12.result())
The code simulates a long computation or request using sleep in the long_computation function. The tasks are submitted to the pool and the results are collected via result.
You can try editing the code so that the results are collected outside the executor context: i.e. at zero indentation. It will still work, but recall what happens when the context exits: we wait for all submitted tasks to complete. This means you will see all the results at t=6s when the longest-running task is complete. Whereas, if the results are collected within the pool context, we can collect earlier ones before everything is finished. In our case we will see future10 at the 5s mark, future11 at 5.5s and future12 at the 6s mark. You might prefer the latter if you are impatient!
Recipe: Storing Results in a Dictionary
A quite normal pattern is to build a dictionary mapping inputs to results. This is how it would work in concurrent code.
results = {}
for i in range(10):
results[i] = long_computation(i)
print(results)
The problem in concurrent.futures is that the task submission is separated from the result. There are 2 ways to tackle this
- Write the task function so that the output also contains the input
- Use a dictionary mapping futures to the corresponding inputs
This is how the pass-through approach could work:
def long_computation_pass(x: float) -> tuple[float, float]:
return x, long_computation(x)
results = {}
futures = []
with ThreadPoolExecutor() as pool:
for i in range(10):
futures.append(pool.submit(long_computation_pass, i))
for future in futures:
i, res = future.result()
results[i] = res
This is an example of the futures dictionary approach:
results = {}
futures = {}
with ThreadPoolExecutor() as pool:
for i in range(10):
futures[pool.submit(long_computation, i)] = i
for future in futures:
res = future.result()
i = futures[future]
results[i] = res
The second approach is a bit strange because you might expect inputs to be mapped to futures in the futures dictionary, but actually the futures are the keys. It has the advantage that you don't have to define another function.
Progress Bars
Since concurrent processing is used to make things faster, we often want a progress bar. I will focus on tqdm.
The code below shows how tqdm works with synchronous code: you wrap an iterable (here: range(10)) in tqdm and loop through it as usual. We use a dictionary to map the inputs to results for later use.
from tqdm.auto import tqdm
results = {}
for i in tqdm(range(10)):
results[i] = long_computation(i)
print(results)
This will take ages of course!
When translating the code to concurrent mode, remember that submitting the task is instantaneous, but getting the result takes time. Therefore, the tqdm should be applied to the loop that collects the results.
Let's adapt the pass-through approach:
results = {}
futures = []
with ThreadPoolExecutor() as pool:
for i in range(10):
futures.append(pool.submit(long_computation_pass, i))
for future in tqdm(futures):
i, res = future.result()
results[i] = res
Now we have a progress bar!
Now, one issue with this setup as a measure of progress is that the futures are always collected in order of submission. If the first task takes a very long time and the others are quick, the progress bar will be stuck on the first task for a long time, but by the time that task is complete, the other tasks are completed already, so the progress bar will do nothing, then go from 0-100% instantly.
as_completed comes to the rescue! It is a function in concurrent.futures which applies to a collection of futures and yields each one as it completes.
from concurrent.futures import ThreadPoolExecutor, as_completed
results = {}
futures = []
with ThreadPoolExecutor() as pool:
for i in range(10):
futures.append(pool.submit(long_computation_pass, i))
for future in tqdm(as_completed(futures), total=len(futures)):
i, res = future.result()
results[i] = res
The snag is that after the collection of futures has been wrapped, the length becomes unknown, which means that you don't get a proper progress bar by default. To fix this, use the total keyword argument of tqdm to inform the length of the progress bar.
Note that as_completed is also compatible with the futures dictionary approach, because iterating through a dictionary means iterating through keys, which is where we are storing the futures.
Advanced Progress Bars
The tqdm package also allows creating a progress bar manually, rather than via wrapping an iterator. Why do this?
- You want to change the description on the bar e.g. with the last processed filename
- You want to update the bar by different amounts each iteration. For example, you iterate through files but update the row count or total file size
Once again, you should use a context manager for the progress bar:
results = {}
futures = []
n_tasks = 10
with tqdm(total=n_tasks) as pbar, ThreadPoolExecutor() as pool:
for i in range(n_tasks):
futures.append(pool.submit(long_computation_pass, i))
for future in as_completed(futures):
i, res = future.result()
results[i] = res
pbar.set_description(f"Calculated {i}")
pbar.update()
Two things to note:
- The progress bar definition comes before the pool definition, which means it gets cleaned up after. This ensures that the progress bar only gets shut down after the pool is finished with its tasks. It doesn't matter here, but is relevant with the callback recipe.
- We need to use
pbar.update()manually.
Finally, here is another recipe that gives maximum flexibility: callbacks. A callback is a function that is supplied as an argument to another function, which, in turn calls the callback.
In our context, the callback is a function that takes a future as its only argument. If we supply this function to the add_done_callback method of a future, then that function will be called on that future when it is completed.
The code below shows an example:
results = {}
n_tasks = 10
with tqdm(total=n_tasks) as pbar, ThreadPoolExecutor() as pool:
def _callback(future: Future[tuple[float, float]]) -> None:
i, res = future.result()
results[i] = res
pbar.set_description(f"Calculated {i}")
pbar.update()
for i in range(n_tasks):
future = pool.submit(long_computation_pass, i)
future.add_done_callback(_callback)
The callback in our example adds the result to the dictionary and updates the progress bar. Note that we eliminated a loop, and we also don't need to keep a list of futures around any more.
The callback approach also lets us get around having to make a pass-through function, by adding the input to the _callback function and using functools.partial to supply the input.
results = {}
n_tasks = 10
with tqdm(total=n_tasks) as pbar, ThreadPoolExecutor() as pool:
def _callback(future: Future[float], x: float) -> None:
res = future.result()
results[x] = res
pbar.set_description(f"Calculated {x}")
pbar.update()
for i in range(n_tasks):
future = pool.submit(long_computation, i)
future.add_done_callback(partial(_callback, x=i))
Note that using lambdas won't do because of late binding issues.
As the most elaborate solution if you don't like how _callback needs to be defined in the context block so it can access pbar and results, you can make a class that gets initialized with that data and pass one of its methods to add_done_callback. Here is an example below, but it is definitely overkill:
class Callbacker:
def __init__(self, results: dict[float, float], pbar: tqdm, x: float) -> None:
self.results = results
self.pbar = pbar
self.x = x
def update(self, future: Future[float]) -> None:
res = future.result()
self.results[self.x] = res
self.pbar.set_description(f"Calculated {self.x}")
self.pbar.update()
results = {}
n_tasks = 10
with tqdm(total=n_tasks) as pbar, ThreadPoolExecutor() as pool:
for i in range(n_tasks):
callbacker = Callbacker(results, pbar, i)
future = pool.submit(long_computation, i)
future.add_done_callback(callbacker.update)
Caveats with tqdm
By default, tqdm uses heuristics to decide when it wants to actually redraw the progress bar to reflect the progress.
If you use set_description, that short-circuits this logic and redraws the bar immediately, unless you set refresh=False. If iterations are fast, and you want to set the description on every iteration, you should set refresh=False or call set_description less often.