When to use ProcessPoolExecutor

Using the ProcessPoolExecutor in concurrent.futures is a quick way to divide your workload over multiple processes. This is useful if you have a couple of tasks that you want to run in parallel to save time. Compared to the ThreadPoolExecutor, the process pool is a bit more primitive, basically, the whole process is forked into multiple copies that each do their own business, with the concurrent.futures.ProcessPoolExecutor taking care of cleaning up and basic communication between the tasks.

  ProcessPoolExecutor ThreadPoolExecutor
strength Parallel CPU bound tasks Parellel IO bound tasks
weakness Memory usage Limited to single CPU due to GIL

You should use the ProcessPoolExecutor over the ThreadPoolExecutor if your tasks are CPU bound. The weakness of copying the process is that you also copy it's memory which may add up, there is a bit more overhead when compared to splitting into threads, but, the big advantage is that multiple processes each can use up to 100% of a single CPU core, while, due to the limitations of the Global Interpreter Lock, multiple threads will not saturate multiple CPU's. There's a couple of holes in this simplified model, for example, python code can sometimes release the GIL, notably numpy code, in which case threads can also effectively use multiple GPU's. But for now, let's not worry about those details too much and learn how to use multiprocessing easily in python.

Basic multiprocessing with os.fork

First, to get started, have a look at this demonstration of os.fork. In practical terms, this duplicates the running process. Two copies of the same program will run, basically identical, except for their pid, their process id.

cat ./src/fork.py
import os

os.fork()

print(os.getpid())
python3 ./src/fork.py
22896
22897

It is definitely possible to write some multiprocessing code directly on this primitive system.

cat ./src/fork_mp.py
import os
import time
import sys

tasks = list(range(10))

part_a = tasks[:5]
part_b = tasks[5:]

res = os.fork()

if res == 0:
    # main process
    for task in part_a:
        print(f"I am process {os.getpid()} working on task {task}")
        time.sleep(.2)
        sys.stdout.flush()
else:
    # child process
    for task in part_b:
        print(f"I am process {os.getpid()} working on task {task}")
        time.sleep(.2)
        sys.stdout.flush()
        

This program divides the tasks between the two processes. I added a time.sleep(0.2) for every task, so the tasks in total take two seconds. The script however takes approximately 1 second to run, saving exactly one second over running the tasks in a single process.

We use the output of os.fork to determine which of the processes we are, if the result is 0, we are the main process, and if the result is different, we know we are in the child process.

python3 ./src/fork_mp.py
I am process 22901 working on task 5
I am process 22902 working on task 0
I am process 22901 working on task 6
I am process 22902 working on task 1
I am process 22902 working on task 2
I am process 22901 working on task 7
I am process 22901 working on task 8
I am process 22902 working on task 3
I am process 22901 working on task 9
I am process 22902 working on task 4

These examples show at a lower level than the ProcessPoolExecutor how multiprocessing works. However, if you want to extend the latter approach into working code that deals with failures, passes the tasks to the processes consistently and also collects the results, you can see it'll get quite a bit more complicated. Enter the ProcessPoolExecutor, which does all those hard things for you!

cat ./src/ppool_demo.py
from concurrent.futures import ProcessPoolExecutor
import time
import os

tasks = range(10)
start = time.time()


def do_work(task):
    print(f"I am process {os.getpid()} working on task {task}")
    time.sleep(.2)


with ProcessPoolExecutor(max_workers=4) as pool:
    for task in tasks:
        pool.submit(do_work, task)

print(f"Main process done after {time.time() - start:.2f}s")

In only three lines you can start 4 workers that divide the work as evenly as possible.

  • pool.submit sends a task to one of the workers
  • The context manager (with block) waits until all the workers are done before proceeding

This example takes 0.6 seconds. We have four workers, 10 tasks, so some workers get 2 tasks and some get 3 tasks, and the workers with 3 tasks take 3 * 0.2 seconds to finish.

python ./src/ppool_demo.py
I am process 22908 working on task 1
I am process 22908 working on task 6
I am process 22910 working on task 3
I am process 22910 working on task 7
I am process 22907 working on task 0
I am process 22907 working on task 4
I am process 22907 working on task 8
I am process 22909 working on task 2
I am process 22909 working on task 5
I am process 22909 working on task 9
Main process done after 0.61s

Note that in the output, the processes seem ordered. However, this is a subtle effect of buffering of the stdout. Each process has its own buffer and they flush all their output in one go, making it look like they run in succession. You can surpress this behaviour by manually triggering the flushes as I showed earlier with sys.stdout.flush().

Here, it is crucial that pool.submit is non-blocking, if you call the function the main process doesn't wait until the worker is done. This allows us to schedule all the work to the workers quickly.

There are three things I want to explain in this post

  • How you can collect return values of the tasks (using concurrent.futures.Future)
  • What happens if workers run into an exception and how you can deal with it

Collecting return values

In the examples above, I simply fired off the tasks and showed that they were doing something by printing statements to stdout. But in a practical situation, you typically want to collect the results of the work. To do that with a ProcessPoolExecutor, you will need to deal with concurrent.futures.Future. If you have experience with Javascript for example, dealing with futures is very common, but you can write quite a bit of python code without encountering these.

from concurrent.futures import ProcessPoolExecutor

def work(word):
    print(word)
    return len(word)

with ProcessPoolExecutor(max_workers=1) as pool:
    result = pool.submit(work, "hello")
    print(result)

The result of a pool.submit is an instance of Future. A Future is a reference to work in progress. Its most fundamental method is Future.result. From the official documentation

Return the value returned by the call. If the call hasn’t yet completed then this method will wait up to timeout seconds. If the call hasn’t completed in timeout seconds, then a concurrent.futures.TimeoutError will be raised. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

If the future is cancelled before completing then CancelledError will be raised.

If the call raised an exception, this method will raise the same exception.

It is important to understand that this method is blocking, as opposed to the pool.submit method I used earlier. A mistake I have seen often is the following:

cat ./src/block_mistake.py
from concurrent.futures import ProcessPoolExecutor
import time

tasks = range(10)
results = list()
start = time.time()


def do_work(task):
    time.sleep(0.1)
    return task ** 2


with ProcessPoolExecutor(max_workers=10) as pool:
    for task in tasks:
        future = pool.submit(do_work, task)
        results.append(future.result())  # collect results

print(f"Done after {time.time() - start}")

Can you spot the mistake? The problem is that before scheduling the next task, the main process waits the result of the task just scheduled. This script takes 1 second to run, because in effect, all tasks are run in succession.

python ./src/block_mistake.py
Done after 1.0320849418640137

Instead, the results should be collected after the process pool context is done scheduling the tasks.

cat ./src/block_fixed.py
from concurrent.futures import ProcessPoolExecutor
import time

start = time.time()

tasks = range(10)
futures = list()


def do_work(task):
    time.sleep(0.1)
    return task ** 2


with ProcessPoolExecutor(max_workers=10) as pool:
    for task in tasks:
        futures.append(pool.submit(do_work, task))

results = [future.result() for future in futures]

print(results)
print(f"Done after {time.time() - start}")
python ./src/block_fixed.py
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Done after 0.1123654842376709

Dealing with exceptions in child processes

If a child process raises a unhandled Exception, this exception is passed to the main process when calling Future.result. In the example below, you can see how to catch those errors when unpacking the results of the process workers.

cat ./src/error_example.py
from concurrent.futures import ProcessPoolExecutor
from random import random

tasks = range(10)
futures = list()


def do_work(task):
    if random() > .5:
        return task ** 2
    else:
        raise Exception("OW!")


with ProcessPoolExecutor(max_workers=10) as pool:
    for task in tasks:
        futures.append(pool.submit(do_work, task))

results = list()

for future in futures:
    try:
        results.append(future.result())
    except Exception as e:
        results.append(f"Failed with {e}!")

print(results)
python ./src/error_example.py
[0, 'Failed with OW!!', 'Failed with OW!!', 9, 'Failed with OW!!', 25, 36, 'Failed with OW!!', 'Failed with OW!!', 81]

Thanks for reading! If you want to reach out, post an issue to the Github repository of this website or contact me on Twitter!