Read the hive from the table and write back using spark sql

I am reading a Hive table using Spark SQL and assigning it to scala val

val x = sqlContext.sql("select * from some_table") 

Then I do some processing using dataframe x and finally come up with a DataFrame y that has an exact schema like some_table.

Finally, I am trying to insert a rewrite y-frame into the same hive some_table

 y.write.mode(SaveMode.Overwrite).saveAsTable().insertInto("some_table") 

Then i get an error

org.apache.spark.sql.AnalysisException: Could not insert rewrite into the table, which is also read from

I tried to create an insert sql statement and run it using sqlContext.sql (), but it also gave me the same error.

Is there a way around this error? I need to insert records back into the same table.


Hi, I tried to do as suggested, but still getting the same error.

 val x = sqlContext.sql("select * from incremental.test2") val y = x.limit(5) y.registerTempTable("temp_table") val dy = sqlContext.table("temp_table") dy.write.mode("overwrite").insertInto("incremental.test2") scala> dy.write.mode("overwrite").insertInto("incremental.test2") org.apache.spark.sql.AnalysisException: Cannot insert overwrite into table that is also being read from.; 
+8
source share
5 answers

You must first save your DataFrame y in a temporary table

 y.write.mode("overwrite").saveAsTable("temp_table") 

Then you can overwrite the rows in your target table

 val dy = sqlContext.table("temp_table") dy.write.mode("overwrite").insertInto("some_table") 
+7
source

In fact, you can also use breakpoints to achieve this. Since this violates the origin of the data, Spark cannot detect that you are reading and overwriting the same table:

  sqlContext.sparkContext.setCheckpointDir(checkpointDir) val ds = sqlContext.sql("select * from some_table").checkpoint() ds.write.mode("overwrite").saveAsTable("some_table") 
+5
source

First you must save your DataFrame y as a parquet file:

 y.write.parquet("temp_table") 

After loading it is like:

 val parquetFile = sqlContext.read.parquet("temp_table") 

And finish, insert your data into the table

 parquetFile.write.insertInto("some_table") 
0
source

In the context of Spark 2.2

  1. This error means that our process is reading from the same table and writing to the same table.
  2. This should usually work when the process writes to the .hiveStaging ... directory.
  3. This error occurs in the case of the saveAsTable method, since it overwrites the entire table instead of separate partitions.
  4. This error should not occur with the insertInto method, because it overwrites partitions, not the table.
  5. The reason this happens is because the Hive table has the following Spark TBLProperties definitions. This problem will be solved for the insertInto method if you delete the following Spark TBLProperties -

'spark.sql.partitionProvider' 'spark.sql.sources.provider' 'spark.sql.sources.schema.numPartCols' spark.sql.sources.schema.numParts'' spark.sql.sources.schema.part.0 ' 'spark.sql.sources.schema.part.1' 'spark.sql.sources.schema.part.2' 'spark.sql.sources.schema.partCol.0' 'spark.sql.sources.schema.partCol. 1'

https://querydb.blogspot.com/2019/07/read-from-hive-table-and-write-back-to.html

0
source

Read the data from the hive table in the spark:

val hconfig = new org.apache.hadoop.conf.Configuration () org.apache.hive.hcatalog.mapreduce.HCatInputFormat.setInput (hconfig, "dbname", "tablename")

val inputFormat = (new HCatInputFormat) .asInstanceOf [InputFormat [WritableComparable [_], HCatRecord]]. getClass

val data = sc.newAPIHadoopRDD (hconfig, inputFormat, classOf [WritableComparable [_]], classOf [HCatRecord])

0
source

All Articles