Using Futures and the ProcessPoolExecutor in python
When to use ProcessPoolExecutor
Using the ProcessPoolExecutor
in concurrent.futures
is a quick way
to divide your workload over multiple processes. This is useful if you
have a couple of tasks that you want to run in parallel to save
time. Compared to the ThreadPoolExecutor
, the process pool is a bit
more primitive, basically, the whole process is forked into multiple
copies that each do their own business, with the
concurrent.futures.ProcessPoolExecutor
taking care of cleaning up
and basic communication between the tasks.
ProcessPoolExecutor |
ThreadPoolExecutor |
|
---|---|---|
strength | Parallel CPU bound tasks | Parellel IO bound tasks |
weakness | Memory usage | Limited to single CPU due to GIL |
You should use the ProcessPoolExecutor
over the ThreadPoolExecutor
if your tasks are CPU bound. The weakness of copying the process is
that you also copy it's memory which may add up, there is a bit more
overhead when compared to splitting into threads, but, the big
advantage is that multiple processes each can use up to 100% of a
single CPU core, while, due to the limitations of the Global
Interpreter Lock, multiple threads will not saturate multiple
CPU's. There's a couple of holes in this simplified model, for
example, python code can sometimes release the GIL, notably numpy
code, in which case threads can also effectively use multiple
GPU's. But for now, let's not worry about those details too much and
learn how to use multiprocessing easily in python.
Basic multiprocessing with os.fork
First, to get started, have a look at this demonstration of
os.fork
. In practical terms, this duplicates the running
process. Two copies of the same program will run, basically identical,
except for their pid
, their process id.
cat ./src/fork.py
import os os.fork() print(os.getpid())
python3 ./src/fork.py
18359 18360
It is definitely possible to write some multiprocessing code directly on this primitive system.
cat ./src/fork_mp.py
import os import time import sys tasks = list(range(10)) part_a = tasks[:5] part_b = tasks[5:] res = os.fork() if res == 0: # main process for task in part_a: print(f"I am process {os.getpid()} working on task {task}") time.sleep(.2) sys.stdout.flush() else: # child process for task in part_b: print(f"I am process {os.getpid()} working on task {task}") time.sleep(.2) sys.stdout.flush()
This program divides the tasks between the two processes. I added a
time.sleep(0.2)
for every task, so the tasks in total take two
seconds. The script however takes approximately 1 second to run,
saving exactly one second over running the tasks in a single process.
We use the output of os.fork
to determine which of the processes we
are, if the result is 0, we are the main process, and if the result is
different, we know we are in the child process.
python3 ./src/fork_mp.py
I am process 18377 working on task 0 I am process 18376 working on task 5 I am process 18377 working on task 1 I am process 18376 working on task 6 I am process 18377 working on task 2 I am process 18376 working on task 7 I am process 18376 working on task 8 I am process 18377 working on task 3 I am process 18376 working on task 9 I am process 18377 working on task 4
These examples show at a lower level than the ProcessPoolExecutor
how multiprocessing works. However, if you want to extend the latter
approach into working code that deals with failures, passes the tasks
to the processes consistently and also collects the results, you can
see it'll get quite a bit more complicated. Enter the
ProcessPoolExecutor
, which does all those hard things for you!
cat ./src/ppool_demo.py
from concurrent.futures import ProcessPoolExecutor import time import os tasks = range(10) start = time.time() def do_work(task): print(f"I am process {os.getpid()} working on task {task}") time.sleep(.2) with ProcessPoolExecutor(max_workers=4) as pool: for task in tasks: pool.submit(do_work, task) print(f"Main process done after {time.time() - start:.2f}s")
In only three lines you can start 4 workers that divide the work as evenly as possible.
pool.submit
sends a task to one of the workers- The context manager (
with
block) waits until all the workers are done before proceeding
This example takes 0.6 seconds. We have four workers, 10 tasks, so some workers get 2 tasks and some get 3 tasks, and the workers with 3 tasks take 3 * 0.2 seconds to finish.
python ./src/ppool_demo.py
Note that in the output, the processes seem ordered. However, this is
a subtle effect of buffering of the stdout
. Each process has its own
buffer and they flush all their output in one go, making it look like
they run in succession. You can surpress this behaviour by manually
triggering the flushes as I showed earlier with sys.stdout.flush()
.
Here, it is crucial that pool.submit
is non-blocking, if you call
the function the main process doesn't wait until the worker is
done. This allows us to schedule all the work to the workers quickly.
There are three things I want to explain in this post
- How you can collect return values of the tasks (using
concurrent.futures.Future
) - What happens if workers run into an exception and how you can deal with it
Collecting return values
In the examples above, I simply fired off the tasks and showed that
they were doing something by printing statements to stdout. But in a
practical situation, you typically want to collect the results of the
work. To do that with a ProcessPoolExecutor
, you will need to deal
with concurrent.futures.Future
. If you have experience with
Javascript for example, dealing with futures is very common, but you
can write quite a bit of python code without encountering these.
from concurrent.futures import ProcessPoolExecutor def work(word): print(word) return len(word) with ProcessPoolExecutor(max_workers=1) as pool: result = pool.submit(work, "hello") print(result)
The result of a pool.submit
is an instance of Future
. A Future
is a
reference to work in progress. Its most fundamental method is
Future.result
. From the official documentation
Return the value returned by the call. If the call hasn’t yet completed then this method will wait up to timeout seconds. If the call hasn’t completed in timeout seconds, then a concurrent.futures.TimeoutError will be raised. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.
If the future is cancelled before completing then CancelledError will be raised.
If the call raised an exception, this method will raise the same exception.
It is important to understand that this method is blocking, as
opposed to the pool.submit
method I used earlier. A mistake I have
seen often is the following:
cat ./src/block_mistake.py
from concurrent.futures import ProcessPoolExecutor import time tasks = range(10) results = list() start = time.time() def do_work(task): time.sleep(0.1) return task ** 2 with ProcessPoolExecutor(max_workers=10) as pool: for task in tasks: future = pool.submit(do_work, task) results.append(future.result()) # collect results print(f"Done after {time.time() - start}")
Can you spot the mistake? The problem is that before scheduling the next task, the main process waits the result of the task just scheduled. This script takes 1 second to run, because in effect, all tasks are run in succession.
python ./src/block_mistake.py
Instead, the results should be collected after the process pool context is done scheduling the tasks.
cat ./src/block_fixed.py
from concurrent.futures import ProcessPoolExecutor import time start = time.time() tasks = range(10) futures = list() def do_work(task): time.sleep(0.1) return task ** 2 with ProcessPoolExecutor(max_workers=10) as pool: for task in tasks: futures.append(pool.submit(do_work, task)) results = [future.result() for future in futures] print(results) print(f"Done after {time.time() - start}")
python ./src/block_fixed.py
Dealing with exceptions in child processes
If a child process raises a unhandled Exception
, this exception is
passed to the main process when calling Future.result
. In the
example below, you can see how to catch those errors when unpacking
the results of the process workers.
cat ./src/error_example.py
from concurrent.futures import ProcessPoolExecutor from random import random tasks = range(10) futures = list() def do_work(task): if random() > .5: return task ** 2 else: raise Exception("OW!") with ProcessPoolExecutor(max_workers=10) as pool: for task in tasks: futures.append(pool.submit(do_work, task)) results = list() for future in futures: try: results.append(future.result()) except Exception as e: results.append(f"Failed with {e}!") print(results)
python ./src/error_example.py
[0, 1, 4, 9, 16, 'Failed with OW!!', 'Failed with OW!!', 'Failed with OW!!', 64, 81]
Thanks for reading! If you want to reach out, post an issue to the Github repository of this website or contact me on Twitter!