I am trying to load a CSV file into a Spark data frame using spark-csv [1] using an Apache Zeppelin laptop, and when loading a number field that does not matter, the parser does not work for this line and the line gets skipped.
I would expect the row to be loaded, and the value in the data frame will load the row and be NULL so that aggregations simply ignore the value.
%dep z.reset() z.addRepo("my-nexus").url("<my_local_nexus_repo_that_is_a_proxy_of_public_repos>") z.load("com.databricks:spark-csv_2.10:1.1.0") %spark import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types._ import com.databricks.spark.csv._ import org.apache.spark.sql.functions._ val schema = StructType( StructField("identifier", StringType, true) :: StructField("name", StringType, true) :: StructField("height", DoubleType, true) :: Nil) val sqlContext = new SQLContext(sc) val df = sqlContext.read.format("com.databricks.spark.csv") .schema(schema) .option("header", "true") .load("file:///home/spark_user/data.csv") df.describe("height").show()
Here is the contents of the data file: /home/spark_user/data.csv
identifier,name,height 1,sam,184 2,cath,180 3,santa, <-- note that there is not height recorded for Santa !
Here is the result:
+-------+------+ |summary|height| +-------+------+ | count| 2| <- 2 of 3 lines loaded, ie. sam and cath | mean| 182.0| | stddev| 2.0| | min| 180.0| | max| 184.0| +-------+------+
In zeppelin logs, I see the following error while parsing santa string:
ERROR [2015-07-21 16:42:09,940] ({Executor task launch worker-45} CsvRelation.scala[apply]:209) - Exception while parsing line: 3,santa,. java.lang.NumberFormatException: empty String at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842) at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110) at java.lang.Double.parseDouble(Double.java:538) at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:232) at scala.collection.immutable.StringOps.toDouble(StringOps.scala:31) at com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:42) at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV$1.apply(CsvRelation.scala:198) at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV$1.apply(CsvRelation.scala:180) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:129) at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
So you can tell me what is so good ... and you would be right;)
Now I want to add an extra column, say age, and I always have data in this field.
identifier,name,height,age 1,sam,184,30 2,cath,180,32 3,santa,,70
Now politely ask some age statistics:
%spark import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types._ import com.databricks.spark.csv._ import org.apache.spark.sql.functions._ val schema = StructType( StructField("identifier", StringType, true) :: StructField("name", StringType, true) :: StructField("height", DoubleType, true) :: StructField("age", DoubleType, true) :: Nil) val sqlContext = new SQLContext(sc) val df = sqlContext.read.format("com.databricks.spark.csv") .schema(schema) .option("header", "true") .load("file:///home/spark_user/data2.csv") df.describe("age").show()
results
+-------+----+ |summary| age| +-------+----+ | count| 2| | mean|31.0| | stddev| 1.0| | min|30.0| | max|32.0| +-------+----+
ALL WRONG! Since Santaβs height is unknown, the entire line is lost, and age calculation is based only on Sam and Kat, and Santa has the right age.
My question is, what value do I need to connect the Santa height so that the CSV can be loaded. I tried to set the scheme as all StringType, but then
Next question is more about
I found in the API that it is possible to process N / A values ββusing a spark. I thought that maybe I can load my data with all the columns set to StringType and then do some cleanup and then set the schema correctly, as follows:
%spark import org.apache.spark.sql.SQLContext import org.apache.spark.sql.types._ import com.databricks.spark.csv._ import org.apache.spark.sql.functions._ val schema = StructType( StructField("identifier", StringType, true) :: StructField("name", StringType, true) :: StructField("height", StringType, true) :: StructField("age", StringType, true) :: Nil) val sqlContext = new SQLContext(sc) val df = sqlContext.read.format("com.databricks.spark.csv").schema(schema).option("header", "true").load("file:///home/spark_user/data.csv") // eg. for each column of my dataframe, replace empty string by null df.na.replace( "*", Map("" -> null) ) val toDouble = udf[Double, String]( _.toDouble) df2 = df.withColumn("age", toDouble(df("age"))) df2.describe("age").show()
But df.na.replace () throws an exception and stops:
java.lang.IllegalArgumentException: Unsupported value type java.lang.String (). at org.apache.spark.sql.DataFrameNaFunctions.org$apache$spark$sql$DataFrameNaFunctions$$convertToDouble(DataFrameNaFunctions.scala:417) at org.apache.spark.sql.DataFrameNaFunctions$$anonfun$4.apply(DataFrameNaFunctions.scala:337) at org.apache.spark.sql.DataFrameNaFunctions$$anonfun$4.apply(DataFrameNaFunctions.scala:337) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.DataFrameNaFunctions.replace0(DataFrameNaFunctions.scala:337) at org.apache.spark.sql.DataFrameNaFunctions.replace(DataFrameNaFunctions.scala:304)
Any help and advice is greatly appreciated!
[1] https://github.com/databricks/spark-csv