Basic terminal emulation in python

I am trying to write a basic terminal emulation script, because for some reason I do not have access to the terminal on my mac. But for writing scripts for the game engine in a blender, the console is important, which usually opens in the terminal from which you started working with the blender. To perform simple actions such as deleting, renaming, etc. I used commands using stream = os.popen(command) and then print (stream.read()) . This works great for most things, but not for anything interactive.
Soon I discovered a new way:
sp = subprocess.Popen(["/bin/bash", "-i"], stdout = subprocess.PIPE, stdin = subprocess.PIPE, stderr = subprocess.PIPE) , and then print(sp.communicate(command.encode())) . That should spawn an interactive shell that I can use as a terminal, right?

But in any case, I cannot keep the connection open, and using the last example, I can call sp.communicate once, giving me the following output (in this case for "ls /") and some errors:
(b'Applications\n[...]usr\nvar\n', b'bash: no job control in this shell\nbash-3.2$ ls /\nbash-3.2$ exit\n') . The second time it gives me ValueError: I/O operation on closed file. Sometimes (for example, for "ls") I get only this error: b'ls\nbash-3.2$ exit\n' .

What does it mean? How can I emulate a terminal using python that allows me to control an interactive shell or launch a blender and communicate with the console?

+4
source share
3 answers

Assuming you want the interactive shell to continue to request input, you can try the following:

 import subprocess import re while True: # prevents lots of python error output try: s = raw_input('> ') except: break # check if you should exit if s.strip().lower() == 'exit': break # try to run command try: cmd = subprocess.Popen(re.split(r'\s+', s), stdout=subprocess.PIPE) cmd_out = cmd.stdout.read() # Process output print cmd_out except OSError: print 'Invalid command' 
+9
source

Here is what I did to do what you want in the windows. Much more complicated, because the windows do not comply with any standard other than their own. A slight modification to this code should give you exactly what you are looking for.

 ''' Created on Mar 2, 2013 @author: rweber ''' import subprocess import Queue from Queue import Empty import threading class Process_Communicator(): def join(self): self.te.join() self.to.join() self.running = False self.aggregator.join() def enqueue_in(self): while self.running and self.p.stdin is not None: while not self.stdin_queue.empty(): s = self.stdin_queue.get() self.p.stdin.write(str(s) + '\n\r') pass def enqueue_output(self): if not self.p.stdout or self.p.stdout.closed: return out = self.p.stdout for line in iter(out.readline, b''): self.qo.put(line) def enqueue_err(self): if not self.p.stderr or self.p.stderr.closed: return err = self.p.stderr for line in iter(err.readline, b''): self.qe.put(line) def aggregate(self): while (self.running): self.update() self.update() def update(self): line = "" try: while self.qe.not_empty: line = self.qe.get_nowait() # or q.get(timeout=.1) self.unbblocked_err += line except Empty: pass line = "" try: while self.qo.not_empty: line = self.qo.get_nowait() # or q.get(timeout=.1) self.unbblocked_out += line except Empty: pass while not self.stdin_queue.empty(): s = self.stdin_queue.get() self.p.stdin.write(str(s) + '\n\r') def get_stdout(self, clear=True): ret = self.unbblocked_out if clear: self.unbblocked_out = "" return ret def has_stdout(self): ret = self.get_stdout(False) if ret == '': return None else: return ret def get_stderr(self, clear=True): ret = self.unbblocked_err if clear: self.unbblocked_err = "" return ret def has_stderr(self): ret = self.get_stderr(False) if ret == '': return None else: return ret def __init__(self, subp): '''This is a simple class that collects and aggregates the output from a subprocess so that you can more reliably use the class without having to block for subprocess.communicate.''' self.p = subp self.unbblocked_out = "" self.unbblocked_err = "" self.running = True self.qo = Queue.Queue() self.to = threading.Thread(name="out_read", target=self.enqueue_output, args=()) self.to.daemon = True # thread dies with the program self.to.start() self.qe = Queue.Queue() self.te = threading.Thread(name="err_read", target=self.enqueue_err, args=()) self.te.daemon = True # thread dies with the program self.te.start() self.stdin_queue = Queue.Queue() self.aggregator = threading.Thread(name="aggregate", target=self.aggregate, args=()) self.aggregator.daemon = True # thread dies with the program self.aggregator.start() pass def write_stdin(p,c): while p.poll() == None: i = raw_input("send to process:") if i is not None: c.stdin_queue.put(i) p = subprocess.Popen("cmd.exe", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) c = Process_Communicator(p) stdin = threading.Thread(name="write_stdin", target=write_stdin, args=(p,c)) stdin.daemon = True # thread dies with the program stdin.start() while p.poll() == None: if c.has_stdout(): print c.get_stdout() if c.has_stderr(): print c.get_stderr() c.join() print "Exit" 
+4
source

It seems that you should run it on the new control terminal highlighted with "forkpty". To suppress the warning "no job control ...", you need to call "setsid".

+1
source

All Articles