In PySpark, how can I log in to log4j from inside the transform

I want to enter the standard logger inside the artist during the conversion with log levels and respect for the format. Unfortunately, I cannot access the log4j logger object inside the method, since it is not serializable, and the spark context is not available inside the conversion. I could just go outside the conversion of all the objects that I am going to touch, but this does not help debug or control the execution of the code.

def slow_row_contents_fetch(row): rows = fetch_id_row_contents(row) # API fetch, DB fetch, etc # This shows up, but not controllable by log level print "Processed slow row with {} results".format(len(rows)) return rows sc.parallelize(fetchable_ids).flatMap(slow_row_contents_fetch, True) 

Outside the conversion, I can get the logger through:

 logger = sc._jvm.org.apache.log4j.LogManager.getRootLogger() logger.warn('This will show up as expected') 

But sc not available inside the conversion for valid reasons. You will see the following message if you try to call sc directly inside the conversion:

Exception: it seems that you are trying to reference a SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used in the driver, and not in the code that it runs on workers. See SPARK-5063 for more information.

I can just print it, but it’s not easy to filter and just keep track of how unformatted error messages are in log4j logger.

Serialization of the registrar itself, as a rule, fails when the registrar is called in the conversion function:

 ... File "/usr/lib/python2.7/pickle.py", line 306, in save rv = reduce(self.proto) File "/usr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/usr/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36, in deco File "/usr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 304, in get_return_value py4j.protocol.Py4JError: An error occurred while calling o36.__getnewargs__. Trace: py4j.Py4JException: Method __getnewargs__([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342) at py4j.Gateway.invoke(Gateway.java:252) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) 

Is there a way to access the executing registrar during conversions to pyspark?

+8
apache-spark pyspark
source share
2 answers

A few hours after he dug into the spark repository, it seems that this cannot be achieved at this time. The executor does not actually have the jvm instance to which it is attached, the data is simply transmitted over the socket without binding to jvm native for use.

Here is the code to create a working user that passes error messages to stderr:

 private def createSimpleWorker(): Socket = { ... val worker = pb.start() // Redirect worker stdout and stderr redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream) ... } /** * Redirect the given streams to our stderr in separate threads. */ private def redirectStreamsToStderr(stdout: InputStream, stderr: InputStream) { try { new RedirectThread(stdout, System.err, "stdout reader for " + pythonExec).start() new RedirectThread(stderr, System.err, "stderr reader for " + pythonExec).start() } catch { case e: Exception => logError("Exception in redirecting streams", e) } } 

And here is the working_py code for submitting the job. There is no place to retrieve log messages or a message type that indicates a log event.

 try: ... command = pickleSer._read_with_length(infile) if isinstance(command, Broadcast): command = pickleSer.loads(command.value) func, profiler, deserializer, serializer = command init_time = time.time() def process(): iterator = deserializer.load_stream(infile) serializer.dump_stream(func(split_index, iterator), outfile) if profiler: profiler.profile(process) else: process() except Exception: try: write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile) write_with_length(traceback.format_exc().encode("utf-8"), outfile) except IOError: # JVM close the socket pass except Exception: # Write the error to stderr if it happened while serializing print("PySpark worker failed with exception:", file=sys.stderr) print(traceback.format_exc(), file=sys.stderr) exit(-1) finish_time = time.time() report_times(outfile, boot_time, init_time, finish_time) write_long(shuffle.MemoryBytesSpilled, outfile) write_long(shuffle.DiskBytesSpilled, outfile) # Mark the beginning of the accumulators section of the output write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) write_int(len(_accumulatorRegistry), outfile) for (aid, accum) in _accumulatorRegistry.items(): pickleSer._write_with_length((aid, accum._value), outfile) ... 

And finally, the available message types:

 class SpecialLengths(object): END_OF_DATA_SECTION = -1 PYTHON_EXCEPTION_THROWN = -2 TIMING_DATA = -3 END_OF_STREAM = -4 NULL = -5 
+7
source share

Look at this question

Similar situation

You can get your map function to return you an object that may contain a stack trace line or a real object, and a bool flag indicating whether there was an error. This can be useful for debugging a task that has side effects, or if you have specific data conditions that cause crashes.

+1
source share

All Articles