How can I process xml asynchronously in python?

I have a large XML data file (> 160M) for processing, and it seems that parsing SAX / expat / pulldom is the way to go. I would like to have a thread that seeps through the nodes and pushes the nodes to be processed into the queue, and then other worker threads pull the next available node from the queue and process it.

I have the following (it should have locks, I know it will be, later)

import sys, time import xml.parsers.expat import threading q = [] def start_handler(name, attrs): q.append(name) def do_expat(): p = xml.parsers.expat.ParserCreate() p.StartElementHandler = start_handler p.buffer_text = True print("opening {0}".format(sys.argv[1])) with open(sys.argv[1]) as f: print("file is open") p.ParseFile(f) print("parsing complete") t = threading.Thread(group=None, target=do_expat) t.start() while True: print(q) time.sleep(1) 

The problem is that the body of the while block is called only once, and then I can’t even ctrl-C interrupt it. In smaller files, the result looks as expected, but this seems to indicate that the handler receives the call only when the document is completely parsed, which seems to have exceeded the goal of the SAX parser.

I am sure this is my own ignorance, but I do not see where I am making a mistake.

PS: I also tried changing start_handler like this:

 def start_handler(name, attrs): def app(): q.append(name) u = threading.Thread(group=None, target=app) u.start() 

No love, however.

+6
python multithreading xml sax
source share
4 answers

ParseFile , as you noticed, simply “swallows everything” - not useful for the incremental analysis that you want to do! Thus, just load the file into the parser at a time, making sure to conditionally give control to other threads when you go - for example:

 while True: data = f.read(BUFSIZE) if not data: p.Parse('', True) break p.Parse(data, False) time.sleep(0.0) 

calling time.sleep(0.0) is Python's way of saying "exit to other threads if they are ready and waiting"; Parse method is described here .

Secondly, forget the locks for this use! - Use Queue.Queue instead , which is essentially thread-safe and almost always the best and easiest way to coordinate multiple threads in Python. Just create an instance of Queue q , q.put(name) on it and process the q.get() block on q.get() , waiting for one more job to be done - it's easy!

(There are several helper strategies that you can use to coordinate the completion of workflows when there is no more work for them, but the simplest, missing special requirements are simply to make them daemon threads, so they all stop when the main thread - see documents ).

+7
source share

I am not too sure about this problem. I assume that the ParseFile call is blocked and only the parsing flow is triggered due to the GIL. In this case, you can use multiprocessing . In any case, it is designed to work with queues.

You create a Process , and you can pass it to Queue :

 import sys, time import xml.parsers.expat import multiprocessing import Queue def do_expat(q): p = xml.parsers.expat.ParserCreate() def start_handler(name, attrs): q.put(name) p.StartElementHandler = start_handler p.buffer_text = True print("opening {0}".format(sys.argv[1])) with open(sys.argv[1]) as f: print("file is open") p.ParseFile(f) print("parsing complete") if __name__ == '__main__': q = multiprocessing.Queue() process = multiprocessing.Process(target=do_expat, args=(q,)) process.start() elements = [] while True: while True: try: elements.append(q.get_nowait()) except Queue.Empty: break print elements time.sleep(1) 

I have included a list of elements to reproduce the original script. Your final solution is likely to use get_nowait and Pool or something similar.

+7
source share

The only thing I see is wrong is that you are accessing q from different threads at the same time - without blocking, since you are really writing. This takes trouble - and you are probably getting problems in the form of a Python interpreter blocking you. :)

Try to block, it really is not very difficult:

 import sys, time import xml.parsers.expat import threading q = [] q_lock = threading.Lock() <--- def start_handler(name, attrs): q_lock.acquire() <--- q.append(name) q_lock.release() <--- def do_expat(): p = xml.parsers.expat.ParserCreate() p.StartElementHandler = start_handler p.buffer_text = True print("opening {0}".format(sys.argv[1])) with open(sys.argv[1]) as f: print("file is open") p.ParseFile(f) print("parsing complete") t = threading.Thread(group=None, target=do_expat) t.start() while True: q_lock.acquire() <--- print(q) q_lock.release() <--- time.sleep(1) 

You see, it was very simple, we just created a lock variable to protect our object and each time we acquire this lock before we use the object and release it every time after we finish our task on the object. Thus, we ensured that q.append(name) would never overlap with print(q) .


(With newer versions of Python, there is also a "c ...." syntax that helps you not to release locks or close files or other cleanups that you often forget.)

+1
source share

I don’t know much about the implementation, but if the parsing is C code that runs until completion, other Python threads will not execute. If the parser accesses the Python code, GIL may be released to start other threads, but I'm not sure about that. You can check these details.

0
source share

All Articles