Getting NullPointerException using spark-csv with DataFrames

By running spark-csv README , there is a sample Java code, like this import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types. *;

SQLContext sqlContext = new SQLContext(sc); StructType customSchema = new StructType( new StructField("year", IntegerType, true), new StructField("make", StringType, true), new StructField("model", StringType, true), new StructField("comment", StringType, true), new StructField("blank", StringType, true)); DataFrame df = sqlContext.read() .format("com.databricks.spark.csv") .option("inferSchema", "true") .option("header", "true") .load("cars.csv"); df.select("year", "model").write() .format("com.databricks.spark.csv") .option("header", "true") .save("newcars.csv"); 

It did not compile out of the box, so with some arguments I got it to compile with changing the incorrect FooType syntax to DataTypes.FooType and passing StructFields as new StructField[] ; the compiler requested the fourth argument for metadata in the StructField constructor, but I could not find the documentation of what it means (javadocs describes its use cases, but actually not how to decide what to pass during the construction of StructField). It works with the following code until some side effect method, for example collect() :

 JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); // Read features. System.out.println("Reading features from " + args[0]); StructType featuresSchema = new StructType(new StructField[] { new StructField("case_id", DataTypes.StringType, false, null), new StructField("foo", DataTypes.DoubleType, false, null) }); DataFrame features = sqlContext.read() .format("com.databricks.spark.csv") .schema(featuresSchema) .load(args[0]); for (Row r : features.collect()) { System.out.println("Row: " + r); } 

I get the following exception:

 Exception in thread "main" java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.AttributeReference.hashCode(namedExpressions.scala:202) at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210) at scala.collection.immutable.HashSet.elemHashCode(HashSet.scala:65) at scala.collection.immutable.HashSet.computeHash(HashSet.scala:74) at scala.collection.immutable.HashSet.$plus(HashSet.scala:56) at scala.collection.immutable.HashSet.$plus(HashSet.scala:59) at scala.collection.immutable.Set$Set4.$plus(Set.scala:127) at scala.collection.immutable.Set$Set4.$plus(Set.scala:121) at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:24) at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) at scala.collection.SetLike$class.map(SetLike.scala:93) at scala.collection.AbstractSet.map(Set.scala:47) at org.apache.spark.sql.catalyst.expressions.AttributeSet.foreach(AttributeSet.scala:114) at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:105) at org.apache.spark.sql.catalyst.expressions.AttributeSet.size(AttributeSet.scala:56) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:307) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:282) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:56) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:926) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:924) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:930) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:930) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384) ... 

Any idea what is wrong?

+6
source share
2 answers

README seems to be very outdated and needs significant editing for a Java example. I discovered the actual JIRA that added the metadata field , and indicates the use of the default Map.empty value for Scala cases and whoever wrote the documentation should just translate Scala directly to Java, despite not having the same default value for the input parameter .

In the 1.5 branch of SparkSQL code, we see that it refers to metadata.hashCode() without checking, which causes a NullPointerException . The existence of the Metadata.empty () method, combined with discussions about using empty maps by default in Scala, seems to imply a proper implementation to go ahead and go through Metadata.empty() if you are not interested. The revised example should be:

 SQLContext sqlContext = new SQLContext(sc); StructType customSchema = new StructType(new StructField[] { new StructField("year", DataTypes.IntegerType, true, Metadata.empty()), new StructField("make", DataTypes.StringType, true, Metadata.empty()), new StructField("model", DataTypes.StringType, true, Metadata.empty()), new StructField("comment", DataTypes.StringType, true, Metadata.empty()), new StructField("blank", DataTypes.StringType, true, Metadata.empty()) }); DataFrame df = sqlContext.read() .format("com.databricks.spark.csv") .schema(customSchema) .option("header", "true") .load("cars.csv"); df.select("year", "model").write() .format("com.databricks.spark.csv") .option("header", "true") .save("newcars.csv"); 
+6
source

Even I get the same exception. I fixed it by providing metadata.

so change the code like

 StructType customSchema = new StructType( new StructField("year", IntegerType, true,Metadata.empty()), new StructField("make", StringType, true,Metadata.empty()), new StructField("model", StringType, true,Metadata.empty()), new StructField("comment", StringType, true,Metadata.empty()), new StructField("blank", StringType, true,Metadata.empty())); 

this will fix the problem.

0
source

All Articles