Fill numpy array through concurrent.futures multiprocessing

I am trying to populate a large numpy array using multiprocessing. I worked on parallel futures examples in the documentation, but did not get enough understanding to change usage.

Here is a simplified version of what I would like to do:

import numpy
import concurrent.futures

squares = numpy.empty((20, 2))

def make_square(i, squares):
    print('iteration', i)
    squares[i, 0], squares[i, 1] = i, i ** 2

with concurrent.futures.ProcessPoolExecutor(2) as executor: 
    for i in range(20):
        executor.submit(make_square, i, squares)

The result works something like this:

iteration 1
iteration 0
iteration 2
iteration 3
iteration 5
iteration 4
iteration 6
iteration 7
iteration 8
iteration 9
iteration 10
iteration 11
iteration 12
iteration 13
iteration 15
iteration 14
iteration 16
iteration 17
iteration 18
iteration 19

which perfectly demonstrates that the function works simultaneously. But the array of squares is still empty.

What is the correct syntax for filling an array of squares?

Secondly, will .map use a better implementation?

Thanks in advance!

8/2/17 . - , . stackoverflow. @ilia w495 @donkopotamus. reddit, .

The posted code is an analogy of what I'm trying to do, which is populating 
a numpy array with a relatively simple calculation (dot product) involving 
two other arrays. The algorithm depends on a value N which can be anything 
from 1 on up, though we won't likely use a value larger than 24.

I'm currently running the algorithm on a distributed computing system and  
the N = 20 versions take longer than 10 days to complete. I'm using dozens 
of cores to obtain the required memory, but gaining none of the benefits of 
multiple CPUs. I've rewritten the code using numba which makes lower N 
variants superfast on my own laptop which can't handle the memory 
requirements for larger Ns, but alas, our distributed computing environment 
is not currently able to install numba. So I'm attempting concurrent.futures 
to take advantage of the multiple CPUs in our computing environment in the 
hopes of speeding things up.

, , 16 + . N x 2 ** N, .. (16777216) .

, .

+4
2

, ProcessPoolExecutor .

, , , (squares), . , ( ).

:

  • ThreadPoolExecutor, , ;
  • , / - () .

:

squares = numpy.zeros((20, 2))

def make_square(i):
    print('iteration', i)

    # compute expensive data here ...

    # return row number and the computed data
    return i, ([i, i**2])

with concurrent.futures.ProcessPoolExecutor(2) as executor: 
    for row, result in executor.map(make_square, range(20)):
        squares[row] = result

:

[[   0.    0.]
 [   1.    1.]
 [   2.    4.]
 ...
 [  18.  324.]
 [  19.  361.]]
+1

, , :

from concurrent.futures import ProcessPoolExecutor
from time import sleep

def return_after_5_secs(message):
    sleep(5)
    return message

pool = ProcessPoolExecutor(3)

future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(2)
print(future.done())
sleep(2)
print(future.done())
sleep(2)
print(future.done())
print("Result: " + future.result())

- - . :

import concurrent.futures
import itertools
import os
import time

import numpy

SQUARE_LIST_SIZE = 20


def main():
    # Creates empty array.
    square_list = numpy.empty((SQUARE_LIST_SIZE, 2))

    # Creates a sequence (generator) of promises
    future_seq = make_future_seq(square_list)

    # Creates a sequence (generator) of computed square.
    square_seq = make_square_seq(future_seq)

    # Creates a sequence (generator) of computed square.
    square_list = list(square_seq)

    return square_list


def make_future_seq(squares):
    """
        Generates the sequence of empty a promises.
        Creates a new process only on `submit`.
    """

    with concurrent.futures.ProcessPoolExecutor(4) as executor:
        for i in range(SQUARE_LIST_SIZE):
            # Only makes a promise to do something.
            future = executor.submit(make_one_square, i, squares)
            print('future ', i, '= >', future)
            yield future


def make_square_seq(future_seq):
    """
        Generates the sequence of fulfilled a promises.
    """

    # Just to copy iterator
    for_show_1, for_show_2, future_seq = itertools.tee(future_seq, 3)

    # Let check it, May be it withdrawn =)
    for i, future in enumerate(for_show_1):
        print('future ', i, 'done [1] =>', future.done())

    # Try to keep its promises
    for future in future_seq:
        yield future.result()

    # Let check it one more time. It is faithful to!
    for i, future in enumerate(for_show_2):
        print('future ', i, 'done [2] =>', future.done())

    return future_seq


def make_one_square(i, squares):
    print('inside [1] = >', i, 'pid = ', os.getpid())
    squares[i, 0], squares[i, 1] = i, i ** 2

    time.sleep(1)  # Long and hard computation.

    print('inside [2]= >', i, 'pid = ', os.getpid())
    return squares


if __name__ == '__main__':
    main()

. . , future.result(). : concurrent.futures.html

, - :

$ python test_futures_1.py 
future  0 = > <Future at 0x7fc0dc758278 state=running>
future  0 done [1] => False
future  1 = > <Future at 0x7fc0dc758da0 state=pending>
inside [1] = > 0 pid =  19364
future  1 done [1] => False
inside [1] = > 1 pid =  19365
future  2 = > <Future at 0x7fc0dc758e10 state=pending>
future  2 done [1] => False
future  3 = > <Future at 0x7fc0dc758cc0 state=pending>
inside [1] = > 2 pid =  19366
future  3 done [1] => False
future  4 = > <Future at 0x7fc0dc769048 state=pending>
future  4 done [1] => False
inside [1] = > 3 pid =  19367
future  5 = > <Future at 0x7fc0dc758f60 state=running>
future  5 done [1] => False
future  6 = > <Future at 0x7fc0dc758fd0 state=pending>
future  6 done [1] => False
future  7 = > <Future at 0x7fc0dc7691d0 state=pending>
future  7 done [1] => False
future  8 = > <Future at 0x7fc0dc769198 state=pending>
future  8 done [1] => False
future  9 = > <Future at 0x7fc0dc7690f0 state=pending>
future  9 done [1] => False
future  10 = > <Future at 0x7fc0dc769438 state=pending>
future  10 done [1] => False
future  11 = > <Future at 0x7fc0dc7694a8 state=pending>
future  11 done [1] => False
future  12 = > <Future at 0x7fc0dc769550 state=pending>
future  12 done [1] => False
future  13 = > <Future at 0x7fc0dc7695f8 state=pending>
future  13 done [1] => False
future  14 = > <Future at 0x7fc0dc7696a0 state=pending>
future  14 done [1] => False
future  15 = > <Future at 0x7fc0dc769748 state=pending>
future  15 done [1] => False
future  16 = > <Future at 0x7fc0dc7697f0 state=pending>
future  16 done [1] => False
future  17 = > <Future at 0x7fc0dc769898 state=pending>
future  17 done [1] => False
future  18 = > <Future at 0x7fc0dc769940 state=pending>
future  18 done [1] => False
future  19 = > <Future at 0x7fc0dc7699e8 state=pending>
future  19 done [1] => False
inside [2]= > 0 pid =  19364
inside [2]= > 1 pid =  19365
inside [1] = > 4 pid =  19364
inside [2]= > 2 pid =  19366
inside [1] = > 5 pid =  19365
inside [1] = > 6 pid =  19366
inside [2]= > 3 pid =  19367
inside [1] = > 7 pid =  19367
inside [2]= > 4 pid =  19364
inside [2]= > 5 pid =  19365
inside [2]= > 6 pid =  19366
inside [1] = > 8 pid =  19364
inside [1] = > 9 pid =  19365
inside [1] = > 10 pid =  19366
inside [2]= > 7 pid =  19367
inside [1] = > 11 pid =  19367
inside [2]= > 8 pid =  19364
inside [2]= > 9 pid =  19365
inside [2]= > 10 pid =  19366
inside [2]= > 11 pid =  19367
inside [1] = > 13 pid =  19366
inside [1] = > 12 pid =  19364
inside [1] = > 14 pid =  19365
inside [1] = > 15 pid =  19367
inside [2]= > 14 pid =  19365
inside [2]= > 13 pid =  19366
inside [2]= > 12 pid =  19364
inside [2]= > 15 pid =  19367
inside [1] = > 16 pid =  19365
inside [1] = > 17 pid =  19364
inside [1] = > 18 pid =  19367
inside [1] = > 19 pid =  19366
inside [2]= > 16 pid =  19365
inside [2]= > 18 pid =  19367
inside [2]= > 17 pid =  19364
inside [2]= > 19 pid =  19366
future  0 done [2] => True
future  1 done [2] => True
future  2 done [2] => True
future  3 done [2] => True
future  4 done [2] => True
future  5 done [2] => True
future  6 done [2] => True
future  7 done [2] => True
future  8 done [2] => True
future  9 done [2] => True
future  10 done [2] => True
future  11 done [2] => True
future  12 done [2] => True
future  13 done [2] => True
future  14 done [2] => True
future  15 done [2] => True
future  16 done [2] => True
future  17 done [2] => True
future  18 done [2] => True
future  19 done [2] => True
-1

All Articles