By running spark-csv README , there is a sample Java code, like this import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types. *;
SQLContext sqlContext = new SQLContext(sc); StructType customSchema = new StructType( new StructField("year", IntegerType, true), new StructField("make", StringType, true), new StructField("model", StringType, true), new StructField("comment", StringType, true), new StructField("blank", StringType, true)); DataFrame df = sqlContext.read() .format("com.databricks.spark.csv") .option("inferSchema", "true") .option("header", "true") .load("cars.csv"); df.select("year", "model").write() .format("com.databricks.spark.csv") .option("header", "true") .save("newcars.csv");
It did not compile out of the box, so with some arguments I got it to compile with changing the incorrect FooType syntax to DataTypes.FooType and passing StructFields as new StructField[] ; the compiler requested the fourth argument for metadata in the StructField constructor, but I could not find the documentation of what it means (javadocs describes its use cases, but actually not how to decide what to pass during the construction of StructField). It works with the following code until some side effect method, for example collect() :
JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); // Read features. System.out.println("Reading features from " + args[0]); StructType featuresSchema = new StructType(new StructField[] { new StructField("case_id", DataTypes.StringType, false, null), new StructField("foo", DataTypes.DoubleType, false, null) }); DataFrame features = sqlContext.read() .format("com.databricks.spark.csv") .schema(featuresSchema) .load(args[0]); for (Row r : features.collect()) { System.out.println("Row: " + r); }
I get the following exception:
Exception in thread "main" java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.AttributeReference.hashCode(namedExpressions.scala:202) at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210) at scala.collection.immutable.HashSet.elemHashCode(HashSet.scala:65) at scala.collection.immutable.HashSet.computeHash(HashSet.scala:74) at scala.collection.immutable.HashSet.$plus(HashSet.scala:56) at scala.collection.immutable.HashSet.$plus(HashSet.scala:59) at scala.collection.immutable.Set$Set4.$plus(Set.scala:127) at scala.collection.immutable.Set$Set4.$plus(Set.scala:121) at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:24) at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47) at scala.collection.SetLike$class.map(SetLike.scala:93) at scala.collection.AbstractSet.map(Set.scala:47) at org.apache.spark.sql.catalyst.expressions.AttributeSet.foreach(AttributeSet.scala:114) at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:105) at org.apache.spark.sql.catalyst.expressions.AttributeSet.size(AttributeSet.scala:56) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:307) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:282) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:56) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:926) at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:924) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:930) at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:930) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384) ...
Any idea what is wrong?