I want to enter the standard logger inside the artist during the conversion with log levels and respect for the format. Unfortunately, I cannot access the log4j logger object inside the method, since it is not serializable, and the spark context is not available inside the conversion. I could just go outside the conversion of all the objects that I am going to touch, but this does not help debug or control the execution of the code.
def slow_row_contents_fetch(row): rows = fetch_id_row_contents(row) # API fetch, DB fetch, etc # This shows up, but not controllable by log level print "Processed slow row with {} results".format(len(rows)) return rows sc.parallelize(fetchable_ids).flatMap(slow_row_contents_fetch, True)
Outside the conversion, I can get the logger through:
logger = sc._jvm.org.apache.log4j.LogManager.getRootLogger() logger.warn('This will show up as expected')
But sc not available inside the conversion for valid reasons. You will see the following message if you try to call sc directly inside the conversion:
Exception: it seems that you are trying to reference a SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used in the driver, and not in the code that it runs on workers. See SPARK-5063 for more information.
I can just print it, but itβs not easy to filter and just keep track of how unformatted error messages are in log4j logger.
Serialization of the registrar itself, as a rule, fails when the registrar is called in the conversion function:
... File "/usr/lib/python2.7/pickle.py", line 306, in save rv = reduce(self.proto) File "/usr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ File "/usr/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36, in deco File "/usr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 304, in get_return_value py4j.protocol.Py4JError: An error occurred while calling o36.__getnewargs__. Trace: py4j.Py4JException: Method __getnewargs__([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342) at py4j.Gateway.invoke(Gateway.java:252) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
Is there a way to access the executing registrar during conversions to pyspark?