How can I feed the standard input of a subprocess from a Python iterator?

I am trying to use the subprocess module in Python to communicate with a process that reads standard input and writes standard output in a streaming manner. I want the subprocess lines to be read from an iterator that creates the input and then reads the output lines from the subprocess. There can be no one-to-one correspondence between the input and output lines. How can I pass a subprocess from an arbitrary iterator that returns strings?

Here is a sample code that gives a simple test case, and some methods that I tried for some reason do not work:

 #!/usr/bin/python from subprocess import * # A really big iterator input_iterator = ("hello %s\n" % x for x in xrange(100000000)) # I thought that stdin could be any iterable, but it actually wants a # filehandle, so this fails with an error. subproc = Popen("cat", stdin=input_iterator, stdout=PIPE) # This works, but it first sends *all* the input at once, then returns # *all* the output as a string, rather than giving me an iterator over # the output. This uses up all my memory, because the input is several # hundred million lines. subproc = Popen("cat", stdin=PIPE, stdout=PIPE) output, error = subproc.communicate("".join(input_iterator)) output_lines = output.split("\n") 

So, how can I have my subprocess read from the iterator line by line while I read its stdout in turn?

+8
python subprocess io
source share
3 answers

A simple way seems to be to fork and feed the input descriptor from the child process. Can anyone clarify any possible disadvantages of this? Or are there python modules that are simpler and safer?

 #!/usr/bin/python from subprocess import * import os def fork_and_input(input, handle): """Send input to handle in a child process.""" # Make sure input is iterable before forking input = iter(input) if os.fork(): # Parent handle.close() else: # Child try: handle.writelines(input) handle.close() # An IOError here means some *other* part of the program # crashed, so don't complain here. except IOError: pass os._exit() # A really big iterator input_iterator = ("hello %s\n" % x for x in xrange(100000000)) subproc = Popen("cat", stdin=PIPE, stdout=PIPE) fork_and_input(input_iterator, subproc.stdin) for line in subproc.stdout: print line, 
+5
source share

To feed standard subprocess input from a Python iterator:

 #!/usr/bin/env python3 from subprocess import Popen, PIPE with Popen("sink", stdin=PIPE, bufsize=-1) as process: for chunk in input_iterator: process.stdin.write(chunk) 

If you want to read the output at the same time, you need topics or async.io:

 #!/usr/bin/env python3 import asyncio import sys from asyncio.subprocess import PIPE from contextlib import closing async def writelines(writer, lines): # NOTE: can't use writer.writelines(lines) here because it tries to write # all at once with closing(writer): for line in lines: writer.write(line) await writer.drain() async def main(): input_iterator = (b"hello %d\n" % x for x in range(100000000)) process = await asyncio.create_subprocess_exec("cat", stdin=PIPE, stdout=PIPE) asyncio.ensure_future(writelines(process.stdin, input_iterator)) async for line in process.stdout: sys.stdout.buffer.write(line) return await process.wait() if sys.platform == 'win32': loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows asyncio.set_event_loop(loop) else: loop = asyncio.get_event_loop() with closing(loop): sys.exit(loop.run_until_complete(main())) 
+2
source share

Follow this recipe. This is an addition to a subprocess that supports asynchronous I / O. This still requires your subprocess to respond to each input line or group of lines with part of its output.

0
source share

All Articles