Python strace function

Is it possible to restrict the python function to open files and distinguish if they were opened using python or a subprocess?

read_python, read_external = [], [] @strace_read(read_python, read_external) function test(): file = open("foo.txt", "r") subprocess.call(["cat", "bar.txt"]) for file in read_python: print("python: ", file) for file in read_external: print("external: ", file) 

So the output looks like this:

 >>> python: foo.txt >>> external: bar.txt 

Most of all I like to use a decorator. Differentiation is not a priority.

It’s clear that it’s best to replace load_function(open) instances with wrappers ... in fact, I have no idea there are too many ways to access open .

+6
source share
2 answers

This is the solution I used:

 #!/usr/bin/env python3 import multiprocessing import selectors import os import array import fcntl import termios import subprocess import decorator import locale import io import codecs import re import collections def strace(function): StraceReturn = collections.namedtuple("StraceReturn", ["return_data", "pid", "strace_data"]) def strace_filter(stracefile, pid, exclude_system=False): system = ( "/bin" , "/boot" , "/dev" , "/etc" , "/lib" , "/proc" , "/root" , "/run" , "/sbin" , "/srv" , "/sys" , "/tmp" , "/usr" , "/var" ) encoding = locale.getpreferredencoding(False) for line in stracefile: match = re.search(r'^(?:\[pid\s+(\d+)\]\s+)?open\(\"((?:\\x[0-9a-f]{2})+)\",', line, re.IGNORECASE) if match: p, f = match.groups(pid) f = codecs.escape_decode(f.encode("ascii"))[0].decode(encoding) if exclude_system and f.startswith(system): continue yield (p, f) def strace_reader(conn_parent, conn_child, barrier, pid): conn_parent.close() encoding = locale.getpreferredencoding(False) strace_args = ["strace", "-e", "open", "-f", "-s", "512", "-xx", "-p", str(pid)] process_data = io.StringIO() process = subprocess.Popen\ ( strace_args , stdout = subprocess.DEVNULL , stderr = subprocess.PIPE , universal_newlines = True ) selector = selectors.DefaultSelector() selector.register(process.stderr, selectors.EVENT_READ) selector.select() barrier.wait() selector.register(conn_child, selectors.EVENT_READ) while len(selector.get_map()): events = selector.select() for key, mask in events: if key.fd == conn_child.fileno(): conn_child.recv() selector.unregister(key.fd) process.terminate() try: process.wait(5) except TimeoutError: process.kill() process.wait() else: ioctl_buffer = array.array("i", [0]) try: fcntl.ioctl(key.fd, termios.FIONREAD, ioctl_buffer) except OSError: read_bytes = 1024 else: read_bytes = max(1024, ioctl_buffer[0]) data = os.read(key.fd, read_bytes) if data: # store all data, simpler but not as memory-efficient # as: # result, leftover_line = strace_filter\ # ( leftover_line + data.decode(encoding) # , pid # ) # process_data.append(result) # with, after this loop, a final: # result = strace_filter(leftover_line + "\n", pid) # process_data.append(result) process_data.write(data.decode(encoding)) else: selector.unregister(key.fd) selector.close() process_data.seek(0, io.SEEK_SET) for pidfile in strace_filter(process_data, pid): conn_child.send(pidfile) conn_child.close() def strace_wrapper(function, *args, **kw): strace_data = list() barrier = multiprocessing.Barrier(2) conn_parent, conn_child = multiprocessing.Pipe(duplex = True) process = multiprocessing.Process\ ( target=strace_reader , args=(conn_parent, conn_child, barrier, os.getpid()) ) process.start() conn_child.close() barrier.wait() function_return = function() conn_parent.send(None) while True: try: strace_data.append(conn_parent.recv()) except EOFError: break process.join(5) if process.is_alive(): process.terminate() process.join(5) if process.is_alive(): os.kill(process.pid, signal.SIGKILL) process.join() conn_parent.close() return StraceReturn(function_return, os.getpid(), strace_data) return decorator.decorator(strace_wrapper, function) @strace def test(): print("Entering test()") process = subprocess.Popen("cat +μυρτιὲς.txt", shell=True) f = open("test\"test", "r") f.close() process.wait() print("Exiting test()") return 5 print(test()) 

Note that all information received after the completion event is generated is generated. To avoid this, use the while not signaled and end the subprocess after the loop (FIONREAD ioctl is a break in this case, I did not see any reason to remove it).

Looking back, the decorator could be greatly simplified if I used a temporary file rather than a multiprocessor / pipe.

The child process forks into fork strace - in other words, strace tracks its grandfather. Some Linux distributions allow strace keep track of their children. I'm not sure how to get around this limitation - if the main program continues execution in the child fork (while the parent execs strace ), it is probably a bad idea - the program will trade PIDs like hot potatoes if decorated functions are used too often.

0
source

I would decide it is much simpler, but with a similar result. Instead of understanding how to include strace in one function:

  • Create a decorator as follows: (untested)

-

 def strace_mark(f): def wrapper(*args, **kwargs): try: open('function-%s-start' % f.__name__, 'r') except: pass ret = f(*args, **kwargs) try: open('function-%s-end' % f.__name__, 'r') except: pass return ret 
  1. Run the entire application under strace -e file .
  2. Get only the parts between calls to open(function-something-start) and open(function-something-end) .

If you run strace -f , you get the free python / external split. Just look what pid calls the function.

+1
source

All Articles