Solving problematic parallel tasks using Python multiprocessing

How to use multiprocessing to solve embarrassing parallel problems ?

Alarming parallel problems usually consist of three main parts:

  • Read the input data (from a file, database, connection to tcp, etc.).
  • Perform calculations on the input data, where each calculation is independent of other calculations.
  • Write the results of the calculations (to the file, database, tcp connection, etc.).

We can parallelize a program in two dimensions:

  • Part 2 can work on several cores, since each calculation is independent; the processing order does not matter.
  • Each part can work independently. Part 1 can put data in the input queue, part 2 can pull data from the input queue and put the results in the output queue, and part 3 can output the results from the output queue and write them.

This seems like the most basic pattern in parallel programming, but I'm still lost in trying to solve it, so let me write a canonical example to illustrate how this is done using multiprocessing .

Here is an example of a problem: Given a CSV file with integers as input, calculate their sums. Divide the problem into three parts that can run in parallel:

  • Processing input file into raw data (lists / iterations of integers)
  • Calculate data amounts in parallel
  • Withdraw Amounts

The following is a traditional single-process Python program that solves these three tasks:

#!/usr/bin/env python # -*- coding: UTF-8 -*- # basicsums.py """A program that reads integer values from a CSV file and writes out their sums to another CSV file. """ import csv import optparse import sys def make_cli_parser(): """Make the command line interface parser.""" usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV", __doc__, """ ARGUMENTS: INPUT_CSV: an input CSV file with rows of numbers OUTPUT_CSV: an output file that will contain the sums\ """]) cli_parser = optparse.OptionParser(usage) return cli_parser def parse_input_csv(csvfile): """Parses the input CSV and yields tuples with the index of the row as the first element, and the integers of the row as the second element. The index is zero-index based. :Parameters: - `csvfile`: a `csv.reader` instance """ for i, row in enumerate(csvfile): row = [int(entry) for entry in row] yield i, row def sum_rows(rows): """Yields a tuple with the index of each input list of integers as the first element, and the sum of the list of integers as the second element. The index is zero-index based. :Parameters: - `rows`: an iterable of tuples, with the index of the original row as the first element, and a list of integers as the second element """ for i, row in rows: yield i, sum(row) def write_results(csvfile, results): """Writes a series of results to an outfile, where the first column is the index of the original row of data, and the second column is the result of the calculation. The index is zero-index based. :Parameters: - `csvfile`: a `csv.writer` instance to which to write results - `results`: an iterable of tuples, with the index (zero-based) of the original row as the first element, and the calculated result from that row as the second element """ for result_row in results: csvfile.writerow(result_row) def main(argv): cli_parser = make_cli_parser() opts, args = cli_parser.parse_args(argv) if len(args) != 2: cli_parser.error("Please provide an input file and output file.") infile = open(args[0]) in_csvfile = csv.reader(infile) outfile = open(args[1], 'w') out_csvfile = csv.writer(outfile) # gets an iterable of rows that not yet evaluated input_rows = parse_input_csv(in_csvfile) # sends the rows iterable to sum_rows() for results iterable, but # still not evaluated result_rows = sum_rows(input_rows) # finally evaluation takes place as a chain in write_results() write_results(out_csvfile, result_rows) infile.close() outfile.close() if __name__ == '__main__': main(sys.argv[1:]) 

Take this program and rewrite it to use multiprocessing to parallelize the three parts described above. Below is the skeleton of this new, parallelized program, which should be aligned with the details in the comments:

 #!/usr/bin/env python # -*- coding: UTF-8 -*- # multiproc_sums.py """A program that reads integer values from a CSV file and writes out their sums to another CSV file, using multiple processes if desired. """ import csv import multiprocessing import optparse import sys NUM_PROCS = multiprocessing.cpu_count() def make_cli_parser(): """Make the command line interface parser.""" usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV", __doc__, """ ARGUMENTS: INPUT_CSV: an input CSV file with rows of numbers OUTPUT_CSV: an output file that will contain the sums\ """]) cli_parser = optparse.OptionParser(usage) cli_parser.add_option('-n', '--numprocs', type='int', default=NUM_PROCS, help="Number of processes to launch [DEFAULT: %default]") return cli_parser def main(argv): cli_parser = make_cli_parser() opts, args = cli_parser.parse_args(argv) if len(args) != 2: cli_parser.error("Please provide an input file and output file.") infile = open(args[0]) in_csvfile = csv.reader(infile) outfile = open(args[1], 'w') out_csvfile = csv.writer(outfile) # Parse the input file and add the parsed data to a queue for # processing, possibly chunking to decrease communication between # processes. # Process the parsed data as soon as any (chunks) appear on the # queue, using as many processes as allotted by the user # (opts.numprocs); place results on a queue for output. # # Terminate processes when the parser stops putting data in the # input queue. # Write the results to disk as soon as they appear on the output # queue. # Ensure all child processes have terminated. # Clean up files. infile.close() outfile.close() if __name__ == '__main__': main(sys.argv[1:]) 

These code snippets, as well as another piece of code that can generate sample CSV files for testing purposes, can be found on github .

I would appreciate your understanding of how you concurrency gurus approached this issue.




Here are some questions that I had when I thought about this issue. Bonus points for addressing all / all:

  • Should I have child processes for reading in the data and placing them in the queue, or can the main process do this without blocking until all the input data has been read?
  • Similarly, should I have a child process to write the results from the processed queue, or can the main process do this without having to wait for all the results?
  • Should I use a process pool for sum operations?
  • Suppose we didn’t have to switch from the input and output queues as the data entered, but we could wait until all the data was analyzed and all the results were calculated (for example, because we know that all the input and output data will fit into memory system). Do we have to change the algorithm in any way (for example, not to start any processes simultaneously with I / O)?
+71
python concurrency multiprocessing embarrassingly-parallel
Mar 01 '10 at 21:38
source share
5 answers

My solution has an extra call and whistle to make sure that the output order has the same order as the input order. I use multiprocessing.queue to send data between processes, send stop messages, so that every process knows to complete queue checking. I think that the comments in the source should make it clear what is happening, but if not to tell me.

 #!/usr/bin/env python # -*- coding: UTF-8 -*- # multiproc_sums.py """A program that reads integer values from a CSV file and writes out their sums to another CSV file, using multiple processes if desired. """ import csv import multiprocessing import optparse import sys NUM_PROCS = multiprocessing.cpu_count() def make_cli_parser(): """Make the command line interface parser.""" usage = "\n\n".join(["python %prog INPUT_CSV OUTPUT_CSV", __doc__, """ ARGUMENTS: INPUT_CSV: an input CSV file with rows of numbers OUTPUT_CSV: an output file that will contain the sums\ """]) cli_parser = optparse.OptionParser(usage) cli_parser.add_option('-n', '--numprocs', type='int', default=NUM_PROCS, help="Number of processes to launch [DEFAULT: %default]") return cli_parser class CSVWorker(object): def __init__(self, numprocs, infile, outfile): self.numprocs = numprocs self.infile = open(infile) self.outfile = outfile self.in_csvfile = csv.reader(self.infile) self.inq = multiprocessing.Queue() self.outq = multiprocessing.Queue() self.pin = multiprocessing.Process(target=self.parse_input_csv, args=()) self.pout = multiprocessing.Process(target=self.write_output_csv, args=()) self.ps = [ multiprocessing.Process(target=self.sum_row, args=()) for i in range(self.numprocs)] self.pin.start() self.pout.start() for p in self.ps: p.start() self.pin.join() i = 0 for p in self.ps: p.join() print "Done", i i += 1 self.pout.join() self.infile.close() def parse_input_csv(self): """Parses the input CSV and yields tuples with the index of the row as the first element, and the integers of the row as the second element. The index is zero-index based. The data is then sent over inqueue for the workers to do their thing. At the end the input process sends a 'STOP' message for each worker. """ for i, row in enumerate(self.in_csvfile): row = [ int(entry) for entry in row ] self.inq.put( (i, row) ) for i in range(self.numprocs): self.inq.put("STOP") def sum_row(self): """ Workers. Consume inq and produce answers on outq """ tot = 0 for i, row in iter(self.inq.get, "STOP"): self.outq.put( (i, sum(row)) ) self.outq.put("STOP") def write_output_csv(self): """ Open outgoing csv file then start reading outq for answers Since I chose to make sure output was synchronized to the input there is some extra goodies to do that. Obviously your input has the original row number so this is not required. """ cur = 0 stop = 0 buffer = {} # For some reason csv.writer works badly across processes so open/close # and use it all in the same process or else you'll have the last # several rows missing outfile = open(self.outfile, "w") self.out_csvfile = csv.writer(outfile) #Keep running until we see numprocs STOP messages for works in range(self.numprocs): for i, val in iter(self.outq.get, "STOP"): # verify rows are in order, if not save in buffer if i != cur: buffer[i] = val else: #if yes are write it out and make sure no waiting rows exist self.out_csvfile.writerow( [i, val] ) cur += 1 while cur in buffer: self.out_csvfile.writerow([ cur, buffer[cur] ]) del buffer[cur] cur += 1 outfile.close() def main(argv): cli_parser = make_cli_parser() opts, args = cli_parser.parse_args(argv) if len(args) != 2: cli_parser.error("Please provide an input file and output file.") c = CSVWorker(opts.numprocs, args[0], args[1]) if __name__ == '__main__': main(sys.argv[1:]) 
+59
Mar 02 '10 at 16:16
source share

Late party ...

joblib has a layer on top of multiprocessing to help create parallel loops. This gives you features such as lazy job dispatching, and better reporting of errors in addition to a very simple syntax.

As a disclaimer, I am the original author of joblib.

+5
Jan 15 '14 at 8:13
source share

I understand that I'm a little late for the party, but I recently discovered GNU parallel and want to show how easy it is to complete this typical task with it.

 cat input.csv | parallel ./sum.py --pipe > sums 

Something like this will do for sum.py :

 #!/usr/bin/python from sys import argv if __name__ == '__main__': row = argv[-1] values = (int(value) for value in row.split(',')) print row, ':', sum(values) 

In parallel, it will run sum.py for each line in input.csv (in parallel, of course), and then output the results to sums . Clearly better than multiprocessing hassle

+4
Aug 23 '13 at 11:00
source share

Old school.

p1.py

 import csv import pickle import sys with open( "someFile", "rb" ) as source: rdr = csv.reader( source ) for line in eumerate( rdr ): pickle.dump( line, sys.stdout ) 

p2.py

 import pickle import sys while True: try: i, row = pickle.load( sys.stdin ) except EOFError: break pickle.dump( i, sum(row) ) 

p3.py

 import pickle import sys while True: try: i, row = pickle.load( sys.stdin ) except EOFError: break print i, row 

Here is the final structure of multiprocessing.

 python p1.py | python p2.py | python p3.py 

Yes, the shell brings them together at the OS level. It seems to me that it is easier for me, and it works very well.

Yes, a bit more overhead when using pickle (or cPickle). However, simplification seems to be worth the effort.

If you want the file name to be an argument to p1.py , this is an easy change.

More importantly, a feature such as the following is very convenient.

 def get_stdin(): while True: try: yield pickle.load( sys.stdin ) except EOFError: return 

This allows you to do this:

 for item in get_stdin(): process item 



It is very simple, but it does not easily allow you to have multiple copies of P2.py.

You have two problems: a fan and a fan. P1.py must somehow deploy several P2.py. And P2.py must somehow combine its results into one P3.py.

The old school branching approach is a push architecture that is very efficient.

Theoretically, multiple P2.py pulling out of a shared queue is an optimal allocation of resources. This is often ideal, but it is also quite a bit of programming. Is programming really necessary? Or will circular machining be good enough?

In practice, you will find that creating P1.py makes a simple round robin that deals with several P2.py can be pretty good. You will have P1.py configured to work with n copies of P2.py through named pipes. Each P2.py will be read from the corresponding pipe.

What if one P2.py gets all the β€œworst case” data and starts up? Yes, circular motion is not perfect. But this is better than just one P2.py, and you can eliminate this bias with simple randomization.

Turning on a fan from several P2.py to one P3.py is a bit more complicated. At the moment, the old-school approach is no longer profitable. P3.py needs to be read from multiple named pipes using the select library to alternate reads.

+3
Mar 01 '10 at 21:55
source share

It is probably possible to introduce a bit of parallelism in part 1. This is probably not a problem with a format that is as simple as CSV, but if the input processing is noticeably slower than reading the data, you can read large chunks and then continue reading until you will not find a "line separator" (newline in the case of CSV, but again, which depends on the reading format, it does not work if the format is quite complicated).

These pieces, each of which probably contains several records, can then be processed to a crowd of parallel processes reading tasks from the queue, where they are disassembled and broken, and then placed in the queue for stage 2.

0
Mar 10 '10 at 16:39
source share



All Articles