How can I multithread a function that reads a list of objects in Python? Astrophysics Example Code

This is my first post for. I will try to include all the necessary information, but please let me know if there is additional information that I can provide to clarify my question.

I am trying to multithreadedly use an expensive function for astrophysical code in python using pool.map. The function accepts a list of objects as input. The basic structure of the code is as follows:

There is a class of Stars with physical properties:

Class Stars:
    def __init__(self,mass,metals,positions,age):
        self.mass = mass
        self.metals = metals
        self.positions = positions
        self.age = age
    def info(self):
        return(self.mass,self.metals,self.positions,self.age)

and a list of these objects:

stars_list = []
for i in range(nstars):
                stars_list.append(Stars(mass[i],metals[i],positions[i],age[i]))

(where mass, metals, positions and age are known from another script).

There is an expensive function that I run with these stellar objects that returns a spectrum for each of them:

def newstars_gen(stars_list):
   ....
   return stellar_nu,stellar_fnu

where stellar_nu and stellar_fnu are numpy arrays

, (stars_list) , newstars_gen , . , , , pool.map:

p = Pool(processes = 3)
nchunks = 3
chunk_start_indices = []
chunk_start_indices.append(0) #the start index is 0

delta_chunk_indices = nstars / nchunks

for n in range(1,nchunks):
    chunk_start_indices.append(chunk_start_indices[n-1]+delta_chunk_indices)

for n in range(nchunks):
    stars_list_chunk = stars_list[chunk_start_indices[n]:chunk_start_indices[n]+delta_chunk_indices]
    #if we're on the last chunk, we might not have the full list included, so need to make sure that we have that here
    if n == nchunks-1: 
        stars_list_chunk = stars_list[chunk_start_indices[n]:-1]


    chunk_sol = p.map(newstars_gen,stars_list_chunk)

, :

File "/Users/[username]/python2.7/multiprocessing/pool.py", line 250, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/Users/[username]/python2.7/multiprocessing/pool.py", line 554, in get
    raise self._value
AttributeError: Stars instance has no attribute '__getitem__'

, , Stars. , __getitem__ . - ( python).

!

0
2

, , , . , pool.multiprocessing.Pool.map , . newstars_gen , p.map , , Star . , newstars_gen , , . newstars_gen Stars (, ), .

, . , (, set, dict) , .

+1

(, ) , , , .

from multiprocessing import Process, cpu_count, Lock
from sys import stdout 
from time import clock

def run_multicore_function(iterable, function, func_args = [], max_processes = 0):
    #directly pass in a function that is going to be looped over, and fork those 
    #loops onto independant processors. Any arguments the function needs must be provided as a list.     
    if max_processes == 0:
        cpus = cpu_count()
        if cpus > 7:
            max_processes = cpus - 3
        elif cpus > 3:
            max_processes = cpus - 2
        elif cpus > 1:
            max_processes = cpus - 1
        else:
            max_processes = 1

    running_processes = 0
    child_list = []
    start_time = round(clock())
    elapsed = 0
    counter = 0
    print "Running function %s() on %s cores" % (function.__name__,max_processes)
    #fire up the multi-core!!
    stdout.write("\tJob 0 of %s" % len(iterable),)
    stdout.flush()
    for next_iter in iterable:
       if type(iterable) is dict:
           next_iter = iterable[next_iter]
       while 1:     #Only fork a new process when there is a free processor. 
            if running_processes < max_processes:
                #Start new process                  
                stdout.write("\r\tJob %s of %s (%i sec)" % (counter,len(iterable),elapsed),)
                stdout.flush()                  
                if len(func_args) == 0: 
                    p = Process(target=function, args=(next_iter,))
                else:
                    p = Process(target=function, args=(next_iter,func_args))
                p.start()
                child_list.append(p)
                running_processes += 1
                counter += 1
                break
            else:
                #processor wait loop
                while 1:
                    for next in range(len(child_list)):
                        if child_list[next].is_alive():
                            continue
                        else:
                            child_list.pop(next)
                            running_processes -= 1
                            break
                    if (start_time + elapsed) < round(clock()):
                        elapsed = round(clock()) - start_time
                        stdout.write("\r\tJob %s of %s (%i sec)" % (counter,len(iterable),elapsed),)
                        stdout.flush()

                    if running_processes < max_processes:
                        break

    #wait for remaining processes to complete --> this is the same code as the processor wait loop above
    while len(child_list) > 0:
        for next in range(len(child_list)):
            if child_list[next].is_alive():
                continue
            else:
                child_list.pop(next)
                running_processes -= 1
                break  #need to break out of the for-loop, because the child_list index is changed by pop 
        if (start_time + elapsed) < round(clock()):
            elapsed = round(clock()) - start_time
            stdout.write("\r\tRunning job %s of %s (%i sec)" % (counter,len(iterable),elapsed),)
            stdout.flush()

    print " --> DONE\n"
    return  

, star_list newstars_gen . , .

   star_list = []
   for i in range(nstars):
        stars_list.append(Stars(mass[i],metals[i],positions[i],age[i]))

   outfile = "some/where/output.txt"
   file_lock = Lock()

:

def newstars_gen(stars_list_item,args):   #args = [outfile,file_lock]
    outfile,file_lock = args

        ....

    with file_lock:
        with open(outfile,"a") as handle:
             handle.write(stellar_nu,stellar_fnu)

run_multicore_function()

run_multicore_function(star_list, newstars_gen, [outfile,file_lock])

, , , . , , . . , , , , , , , - , .

, !
,
-

0

All Articles