Python namedtuple as apply_async (..) callback argument

I am writing a short program where I want to call a function asynchronously so that it does not block the caller. For this, I use Pool from the python multiprocessing module.

In a function called asynchronously, I want to return namedtuple to match the logic of the rest of my program, but I found that namedtuple not a supported type for moving from the generated callback process (possibly because it cannot be pickled) . Here is a minimal registry problem.

 from multiprocessing import Pool from collections import namedtuple logEntry = namedtuple("LogEntry", ['logLev', 'msg']) def doSomething(x): # Do actual work here logCode = 1 statusStr = "Message Here" return logEntry(logLev=logCode, msg=statusStr) def callbackFunc(result): print(result.logLev) print(result.msg) def userAsyncCall(): pool = Pool() pool.apply_async(doSomething, [1,2], callback=callbackFunc) if __name__ == "__main__": userAsyncCall() # Nothing is printed # If this is uncommented, the logLev and status are printed as expected: # y = logEntry(logLev=2, msg="Hello World") # callbackFunc(y) 

Does anyone know if there is a way to pass the namedtuple return value from the async process for a callback? Is there a better / more pythonic approach to what I'm doing?

+8
python python-multithreading
source share
2 answers

The problem is that the case is different from the return value of namedtuple() and its typename parameter. That is, there is a mismatch between the class definition of the named tuple and the variable name that you gave it. You need two to match:

 LogEntry = namedtuple("LogEntry", ['logLev', 'msg']) 

And update the return in doSomething() accordingly.

Full code:

 from multiprocessing import Pool from collections import namedtuple LogEntry = namedtuple("LogEntry", ['logLev', 'msg']) def doSomething(x): # Do actual work here logCode = 1 statusStr = "Message Here" return LogEntry(logLev=logCode, msg=statusStr) def callbackFunc(result): print(result.logLev) print(result.msg) def userAsyncCall(): pool = Pool() return pool.apply_async(doSomething, [1], callback=callbackFunc) if __name__ == "__main__": c = userAsyncCall() # To see whether there was an exception, you can attempt to get() the AsyncResult object. # print c.get() 

(To see the class definition, add verbose=True to namedtuple() .)

+3
source share

The reason that nothing is printed is because apply_async not working. By the way, I think this is bad behavior that just makes people embarrassed. You can pass error_callback to handle errors.

 def errorCallback(exception): print(exception) def userAsyncCall(): pool = Pool() pool.apply_async(doSomething, [1], callback=callbackFunc, error_callback=errorCallback) # You passed wrong arguments. doSomething() takes 1 positional argument. # I replace [1,2] with [1]. if __name__ == "__main__": userAsyncCall() import time time.sleep(3) # You need this, otherwise you will never see the output. 

When you come here, the exit will be

 Error sending result: 'LogEntry(logLev=1, msg='Message Here')'. Reason: 'PicklingError("Can't pickle <class '__mp_main__.LogEntry'>: attribute lookup LogEntry on __mp_main__ failed",)' 

PicklingError! You are right, namedtuple cannot be passed from a spawned process to a callback.

This may not be a more affordable way, but you can send a dict as the result instead of namedtuple.

As fixed by Dag Høidahl, namedtuple can be passed. The next line works.

 LogEntry = namedtuple("LogEntry", ['logLev', 'msg']) 
+3
source share

All Articles