Skip to main content
  1. Posts/

Using Multiprocessing in Python

··604 words·3 mins·
Python
Table of Contents

One way to achieve parallelism is to use multi-processing, where we can execute tasks in different cores of the CPU to reduce the total processing time. Python provides the multiprocessing package to facilitate this.

This post summarizes some of the questions I have when I learn to use multiprocessing in Python.

How many processes should we use?
#

For CPU-bound tasks, set the process number to the number of CPU cores is perhaps appropriate. To get the number of CPUs in your system:

import multiprocessing as mp

print(mp.cpu_count())

However, the time to complete our task will stop decreasing when the number of processes reach a certain number due to other factors. So it is a good idea to do some benchmark to find the optimal number of processes to use.

Ref:

Do I need to use pool.close() and pool.join() after finishing my tasks?
#

pool.close() makes sure that process pool does not accept new processes, and pool.join() waits for the processes to properly finish their work and return. So it is a good idea to use pool.close() and pool.join() explicitly.

Otherwise, the processes may not be released properly. In our application, we have seen error related to memory usage:

OSError: [Errno 12] Cannot allocate memory

Ref:

Difference between pool.map() and pool.map_aysnc()?
#

pool.map is blocking until it gets the actual results, that is, it will block the execution of code until it gets all the result from the processes. pool.map_async, on the other hand, is non-blocking and will return immediately and all we get is an AsyncResult object. To get the actual result, we use result.get() to retrieve them. To illustrate their differences:

import multiprocessing as mp

def toy(x):
    print("function gets executed.")
    return x*x

def main():
    p = mp.Pool()

    result = p.map(toy, [1])
    print("after p.map")
    print(result)

    res = p.map_async(toy, [1])
    print("after p.map_async()")
    print(res.get())


if __name__ == "__main__":
    main()

If you use p.map(), you will see the following result:

function gets executed.
after p.map()
[1]

This is because p.map() is blocking. So the statement following it won’t be executed if p.map() hasn’t got the results back.

Unlike p.map, p.map_async is non-blocking. The output for second part of the code is:

after p.map_async()
function gets executed.
[1]

That is because p.map_async will not wait for the function to be executed and returned. So you see the output after p.map_async() first. Then you see function gets executed..

pool.map() without argument?
#

The function pool.map() is used to feed the element of an iterable to a function one by one. We can not use it to run functions without argument. However, we may change the function to accept an argument and ignore that argument. Or we can write a wrap function to accept argument and invoke the original function in the wrap function.

import multiprocessing as mp

def f():
    print("function without argument")

def wrap_func(x):
    f()

p = mp.Pool()
p.map(wrap_func, range(10))

Ref:

functions with multiple arguments?
#

pool.map() can only execute functions that accept one argument, to run a function that accepts multiple arguments, we can use pool.starmap():

import multiprocessing as mp


def func(x, y):
    return x * y

p = mp.Pool()

l1 = range(1, 10)
l2 = range(10, 19)

res = p.starmap(func, zip(l1, l2))
print(res)

If the other parameters of the function are constants, it may be convenient to use partial functions instead:

from functools import partial

l1 = range(1, 10)
# If one of the parameters is constant.
partial_func = partial(func, y=2)
res = p.map(partial_func, l1)
print(res)

Related

Run the Job Immediately after Starting Scheduler in Python APScheduler
·322 words·2 mins
Python APScheduler
Retry for Google Cloud Client
·197 words·1 min
Python GCP
Make Python logging Work in GCP
·570 words·3 mins
Python Logging GCP