How to collect sql source output into a file?

Below is my spark sql script that downloads a file and uses SQL on top of it, I want to collect the output from the SQL query and write it to a file, not sure how to help anyone.

//import classes for sql import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} val sqlContext = new org.apache.spark.sql.SQLContext(sc) // createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD. import sqlContext.createSchemaRDD //hdfs paths val warehouse="hdfs://quickstart.cloudera/user/hive/warehouse/" val customers_path=warehouse+"people/people.txt" customers_path //create rdd file called file val file=sc.textFile(customers_path) val schemaString="name age" import org.apache.spark.sql._ val schema = StructType( schemaString.split(",").map(fieldName => StructField(fieldName, StringType, true))) val rowRDD=file.map(_.split(",")).map(p => Row(p(0),p(1).trim)) val peopleSchemRDD=sqlContext.applySchema(rowRDD, schema) // Register the SchemaRDD as a table. peopleSchemRDD.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. sqlContext.sql("select count(*) from people").collect().foreach(println) System.exit(0) 
+6
source share
3 answers

If you just want to count the number of lines in a large file on HDFS and write it to another file:

 import java.nio.file.{ Files, Paths } val path = "hdfs://quickstart.cloudera/user/hive/warehouse/people/people.txt" val rdd = sc.textFile(path) val linesCount = rdd.count Files.write(Paths.get("line_count.txt"), linesCount.toString.getBytes) 
+4
source

 //import classes for sql import sqlContext.implicits._ import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext} val sqlContext = new org.apache.spark.sql.SQLContext(sc) // createSchemaRDD is used to implicitly convert an RDD to a SchemaRDD. import sqlContext.createSchemaRDD import sqlContext.implicits._ //hdfs paths val warehouse="hdfs://quickstart.cloudera/user/hive/warehouse/" val customers_path=warehouse+"people/people.txt" customers_path //create rdd file called file val file=sc.textFile(customers_path) val schemaString="name age" import org.apache.spark.sql._ val schema = StructType( schemaString.split(",").map(fieldName => StructField(fieldName, StringType, true))) val rowRDD=file.map(_.split(",")).map(p => Row(p(0),p(1).trim)) val peopleSchemRDD=sqlContext.applySchema(rowRDD, schema) // Register the SchemaRDD as a table. peopleSchemRDD.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val op=sqlContext.sql("select count(*) from people") val c=op.collect() val rdd=sc.parallelize(c) rdd.saveAsTextFile("/home/cloudera/op") System.exit(0) 
0
source
 peopleSchemaRDD.registerTempTable("people") val op=sqlContext.sql("select * from people").count().toString val pw=new PrintWriter(new File("path")) pw.write("count of people:"+op+"\n") pw.close() 

create a temporary table called people, then write a query to get the required output and count function, which counts the number of rows after the output converted to tostring. this stored value in the op object is invoked by the print engine to write it to a text file. if in case the people column consists of duplicate values, use a different keyword to distinguish unique values ​​in the sql query.

0
source

Source: https://habr.com/ru/post/1216394/


All Articles