Convert JavaRDD to DataFrame in Spark java

I am trying to process a logfile. I first read the log file and split them according to my requirement and saved each column in a separate JavaRDD. Now I need to convert these JavaRDDs to DataFrames for future operations. This is the code I've tried so far:

SparkConf conf = new SparkConf().setAppName("AuctionBid").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> diskfile = sc.textFile("/Users/karuturi/Downloads/log.txt"); JavaRDD<String> urlrdd=diskfile.flatMap(line -> Arrays.asList(line.split("\t")[0])); System.out.println(urlrdd.take(1)); SQLContext sql = new SQLContext(sc); 

and that is exactly how I am trying to convert JavaRDD to DataFrame:

 DataFrame fileDF = sqlContext.createDataFrame(urlRDD, Model.class); 

But the above line does not work. I am confused in Model.class.

can someone suggest me.

Thanks.

+7
java hadoop apache-spark rdd spark-dataframe
source share
4 answers

Import

 import java.io.Serializable; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; 

Create a POJO class for the URL. I would recommend you write for a log line, which consists of URL, date, time, method, target, etc. As members

 public static class Url implements Serializable { private String value; public String getValue() { return value; } public void setValue(String value) { this.value = value; } } 

Create RDD Url Objects from a Text File

 JavaRDD<Url> urlsRDD = spark.read() .textFile("/Users/karuturi/Downloads/log.txt") .javaRDD() .map(new Function<String, Url>() { @Override public Url call(String line) throws Exception { String[] parts = line.split("\\t"); Url url = new Url(); url.setValue(parts[0].replaceAll("[", "")); return url; } }); 

Create DataFrame from RDD

 Dataset<Row> urlsDF = spark.createDataFrame(urlsRDD, Url.class); 

RDD to DataFrame - Spark 2.0
RDD to DataFrame - Spark 1.6

+9
source share

Just copy your data according to the 7 column table and use the code snippet below

 String[] columns = new String[7] {"clumn1","column2","column3","column4","column5","column6","column7"}; List<String> tableColumns = Arrays.asList(columns); StrucType schema = createSchema(tableColumns); public StructType createSchema(List<String> tableColumns){ List<StructField> fields = new ArrayList<StructField>(); for(String column : tableColumns){ fields.add(DataTypes.createStructField(column, DataTypes.StringType, true)); } return DataTypes.createStructType(fields); } sqlContext.createDataFrame(urlRDD, schema); 
+4
source share

You can do something like (I convert on the fly from scala, so sorry for any typos):

 import org.apache.spark.sql.Row import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; JavaRDD<Row> rowRDD = urlrdd.map(new Function<String, Row>() { @Override public Row call(String record) throws Exception { return RowFactory.create(record()); } } // now you wish to create the target schema. This is basically a list of // fields (each field would be a column) which you are adding to a StructType List<StructField> fields = new ArrayList<>(); StructField field = DataTypes.createStructField("url", DataTypes.StringType, true); fields.add(field); StructType schema = DataTypes.createStructType(fields); // now you can create the dataframe: DataFrame df= sqlContext.createDataFrame(rowRDD, schema); 

A few additional notes:

  • Why do you plan when you take only the first element? You could just do:

    JavaRDD<String> urlrdd=diskfile.flatMap(line -> line.split("\t")[0]);

  • I guess in real life you would like to remove the "[" from the URL (you can easily do this on the map).

  • If you move to zero or later, you should use a spark session (spark) instead of sqlContext.

  • You can create a single data block with all columns. You can do this by adding all the fields to the schema (i.e., instead of simply adding to the fields, add all). Instead of using urlrdd, use diskfile and partition inside the creation of a "public row call". It will be something like this:

    JavaRDD<Row> rowRDD = diskfile.map(new Function<String, Row>() { @override public Row call(String record) throws Exception { String[] recs = record.split("\t") return RowFactory.create(recs[0], recs[1], ...); } });

  • You can create it directly: just use

    sqlContext.read.option("sep","\t").csv.load(filename,schema)

+3
source share

You can read the file directly using sqlContext directly

Use sqlContext read method

For more information, you can follow this link.

https://spark.apache.org/docs/1.6.0/sql-programming-guide.html#creating-dataframes

Or you can import

 import sqlContext.implicits.*; 

Then use the toDF() method on rdd to convert to a dataframe.

+1
source share

All Articles