blog

'Taxi Dog' swimming by Wayan Vota on Flickr

Pool.map is Handy.

by | Jan 27, 2014 | Developer Blog | 0 comments

Processes and queues are the way to go to scale software performance. But I also recently found out about pool.map, which allows you to write less code (you don’t have to do anything with Queue) and get similar effects.

The basic usage is pool.map(some_func, some_iterable) and it works like map, returning a list of the results. So,pool.map(is_prime, [2, 3, 4, 5]) would return [True, True, False, True]. There’s also an itertools.imap equivalent which returns an iterable and lets you work on any size iterable you want.

Given a primes_for_list function that returns a list containing the primes in the supplied list. And a chunks function which splits a list into sublists of size n:

Calculating Primes With pool.map

def use_pool_for_primes(l, chunk_size=1000):
    Finds primes in the provided list and returns a list of
    those primes in the order provided.
    Uses pool.map
    pool = multiprocessing.Pool()
    parts = pool.map(primes_for_list, chunks(l, chunk_size))
    return list(itertools.chain(*parts))
$ python use_of_pool.py multi
78498 primes &< 1000000. Calculated in 2.426864s
148933 primes &< 2000000. Calculated in 6.764274s
283146 primes &< 4000000. Calculated in 19.203458s
412849 primes &< 6000000. Calculated in 38.047622s
539777 primes &< 8000000. Calculated in 60.967163s
664579 primes &< 10000000. Calculated in 87.283632s

Using Unordered Map

imap_unordered (and then sorting them), seems to squeeze a little more performance out:

def use_pool_unordered_for_primes(l, chunk_size=1000):
    """
    Finds primes in the provided list and returns an ordered list
    of those primes.
    Uses pool.imap_unordered
    """
    pool = multiprocessing.Pool()
    parts = pool.imap_unordered(primes_for_list,
        chunks(l, chunk_size))
    return sorted(list(itertools.chain(*parts)))
$ python use_of_pool.py unordered
78498 primes &< 1000000. Calculated in 2.222123s
148933 primes &< 2000000. Calculated in 6.13618s
283146 primes &< 4000000. Calculated in 17.588184s
412849 primes &< 6000000. Calculated in 35.159868s
539777 primes &< 8000000. Calculated in 56.629323s
664579 primes &< 10000000. Calculated in 75.582824s

How Fast Would It Be In A Single Process?

def primes_for_list(l):
    return [i for i in l if is_prime(i)]
78498 primes &< 1000000. Calculated in 9.630087s
148933 primes &< 2000000. Calculated in 26.764138s
283146 primes &< 4000000. Calculated in 76.268092s
412849 primes &< 6000000. Calculated in 139.234455s
539777 primes &< 8000000. Calculated in 217.067968s
664579 primes &< 10000000. Calculated in 302.099861s

The Full Code

from datetime import datetime
import itertools
import multiprocessing
import sys
def is_prime(n):
    if n < 2:
        return False
    if n < 4:
        return True
    check_factors_max = int(n ** 0.5) + 1
    for i in range(2, check_factors_max):
        is_factor = n % i == 0
        if is_factor:
            return False
    return True
def chunks(l, chunk_size=1000):
    i = 0
    keep_going = True
    while keep_going:
        start = i
        end = i + chunk_size
        if end > len(l):
            end = len(l)
            keep_going = False
        yield l[start:end]
        i += chunk_size
def primes_for_list(l):
    return [i for i in l if is_prime(i)]
def use_pool_for_primes(l, chunk_size=1000):
    """
    Finds primes in the provided list and returns a list of those primes in the
    order provided.
    Uses pool.map
    """
    pool = multiprocessing.Pool()
    parts = pool.map(primes_for_list, chunks(l, chunk_size))
    return list(itertools.chain(*parts))
def use_pool_unordered_for_primes(l, chunk_size=1000):
    """
    Finds primes in the provided list and returns an ordered list of those
    primes.
    Uses pool.imap_unordered
    """
    pool = multiprocessing.Pool()
    parts = pool.imap_unordered(primes_for_list, chunks(l, chunk_size))
    return sorted(list(itertools.chain(*parts)))
if __name__ == '__main__':
    method = sys.argv[1]
    method_dict = {
            'one': primes_for_list,
            'multi': use_pool_for_primes,
            'unordered': use_pool_unordered_for_primes,
            }
    for n in range(1000000, 10000001, 1000000):
        before = datetime.now()
        primes = method_dict[method](range(n))
        seconds_elapsed = (datetime.now() - before).total_seconds()
        print '%s primes < %s. Calculated in %ss' % (len(primes), n,
                seconds_elapsed)

and I tested it with these tests:

class TestPrimes(object):
    primes_lt_1000 = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47,
            53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113,
            127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191,
            193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263,
            269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347,
            349, 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421,
            431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499,
            503, 509, 521, 523, 541, 547, 557, 563, 569, 571, 577, 587, 593,
            599, 601, 607, 613, 617, 619, 631, 641, 643, 647, 653, 659, 661,
            673, 677, 683, 691, 701, 709, 719, 727, 733, 739, 743, 751, 757,
            761, 769, 773, 787, 797, 809, 811, 821, 823, 827, 829, 839, 853,
            857, 859, 863, 877, 881, 883, 887, 907, 911, 919, 929, 937, 941,
            947, 953, 967, 971, 977, 983, 991, 997,]
    def test_primes_is_accurate(self):
        primes = primes_for_list(range(1000))
        assert(primes == TestPrimes.primes_lt_1000)
    def test_use_pool_for_primes(self):
        primes = use_pool_for_primes(range(1000), 100)
        assert(primes == TestPrimes.primes_lt_1000)
    def test_use_pool_unordered_for_primes(self):
        primes = use_pool_unordered_for_primes(range(1000), 100)
        assert(primes == TestPrimes.primes_lt_1000)
    def test_chunks(self):
        l = [1, 2, 3, 4, 5, 6, 7]
        assert([[1, 2], [3, 4], [5, 6], [7]] == [c for c in chunks(l, 2)])
        assert([[1, 2, 3, 4, 5, 6, 7]] == [c for c in chunks(l, 8)])

So there you have it pool.map is a simple way to use all those cores. This almost devolved into a how big a game of life can we compute, but maybe I’ll do that next time.

+ more

Accurate Timing

Accurate Timing

In many tasks we need to do something at given intervals of time. The most obvious ways may not give you the best results. Time? Meh. The most basic tasks that don't have what you might call CPU-scale time requirements can be handled with the usual language and...

read more