Shared Data Parallel Processing in Python

This article was first published on The Pleasure of Finding Things Out: A blog by James Triveri , and kindly contributed to python-bloggers. (You can report issue about the content on this page here)
Want to share your content on python-bloggers? click here.

The Python multiprocessing library exposes an interface that simplifies distributing tasks to multiple cores. The multiprocessing.Pool class provides access to a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation. Leveraging multiprocessing.Pool is straightforward. To demonstrate, we will solve Project Euler Problem #14 in a distributed fashion. The problem states:

The following iterative sequence is defined for the set of positive integers:

n ->    n/2 (n is even)
n -> 3n + 1 (n is odd)

Using the rule above and starting with 13, we generate the following sequence:

13 -> 40 -> 20 -> 10 -> 5 -> 16 -> 8 -> 4 -> 2 -> 1

It can be seen that this sequence (starting at 13 and finishing at 1) contains 
10 terms. Although it has not been proved yet (Collatz Problem), it is thought 
that all starting numbers finish at 1.

Which starting number, under one million, produces the longest chain?

NOTE: Once the chain starts the terms are allowed to go above one million.

To start, we define two functions: collatz_test and chain_length. collatz_test contains the logic that either divides the input by 2 (if even) or multiplies it by 3 and adds 1 (if odd). chain_length returns a tuple consisting of the initial integer along with the length of the collatz chain:

def collatz_test(n):
    """
    If n is even, return (n/2), else return (3n+1).
    """
    return((n / 2) if n%2==0 else (3 * n + 1))


def chain_length(n):
    """
    Return the length of the collatz chain along
    with the input value n.
    """
    if n <= 0: 
        return(None)
    cntr, tstint = 0, n
    while tstint != 1:
        cntr+=1
        tstint = collatz_test(tstint)
    return(n, cntr)

One thing to keep in mind when using the multiprocessing library is that instances of the Pool and Process classes can only be initialized after the if __name__ == "__main__" statement, and as a consequence Pool cannot be called from within an interactive Python session.

Next we present our declarations from earlier along with the distributed logic, which sets up chain_length parallel dispatch:

"""
Parallel solution to Project Euler Problem # 14.
"""
import multiprocessing


def collatz_test(n):
    """
    If n is even, return (n/2), else return (3n+1).
    """
    return((n / 2) if n % 2 == 0 else (3 * n + 1))


def chain_length(n):
    """
    Return the length of the collatz chain along
    with the input value `n`.
    """
    if n <= 0: 
        return(None)
    cntr, tstint = 0, n
    while tstint!=1:
        cntr+=1
        tstint = collatz_test(tstint)
    return(n, cntr)


if __name__ == "__main__":

    # Initialize array of values to test.
    arr = multiprocessing.Array('L', range(1, 1000000))
    pool = multiprocessing.Pool()
    all_lengths = pool.map(chain_length, arr, chunksize=1000)
    pool.close()
    pool.join()

    # Search for longest chain.
    longest_chain = max((i for i in all_lengths), key=lambda x: x[1])

We first declare our sequence of test values as multiprocessing.Array, which prevents the same 1,000,000 element sequence from being replicated in each process (only an issue on Windows, where there is no fork system call). Instead, the array will be created once, and all processes will have access to it. The “L” typecode is from the array module in the Python Standard Library, which indicates the datatype of the elements contained in the sequence. We initialize the Pool instance, then call its map method, which works similarly to the builtin map function, only in parallel. Within pool.map, We set chunksize=1000 due to the following commentary in multiprocessing’s documentation:

For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.

Upon execution, we find that 837,799 produces the longest sequence, and it is of length 524. By distributing the tasks to four cores, the script completes in 25 seconds, whereas the sequential implementation requires approx. 55 seconds. This disparity would only grow as the range of evaluation increases from 1M to 5M or 10M.

For more information on the multiprocessing module, be sure to check out the documentation. In addition, the Python Standard Library includes the concurrent.futures module, which exposes an even higher-level interface that facilitates both thread and process-based parallelism via Executor objects.

To leave a comment for the author, please follow the link and comment on their blog: The Pleasure of Finding Things Out: A blog by James Triveri .

Want to share your content on python-bloggers? click here.