Writing to Oracle Database using Apache Spark 1.4.0

I am trying to write some data to our Oracle database using the Spark 1.4.0 DataFrame.write.jdbc () function .

The read.jdbc () symmetric function to read data from Oracle Database objects in a DataFrame works well. However, while I am writing the DataFrame back (I was also trying to write exactly the same object that I got from the CverWrite database setting to true) gives the following exception:

Exception in thread "main" java.sql.SQLSyntaxErrorException: ORA-00902: Ungültiger Datentyp at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:450) at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:399) at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1017) at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:655) at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:249) at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:566) at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:215) at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:58) at oracle.jdbc.driver.T4CPreparedStatement.executeForRows(T4CPreparedStatement.java:943) at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1075) at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3820) at oracle.jdbc.driver.OraclePreparedStatement.executeUpdate(OraclePreparedStatement.java:3897) at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeUpdate(OraclePreparedStatementWrapper.java:1361) at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:252) at main3$.main(main3.scala:72) at main3.main(main3.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) 

There are 2 main row columns in the table. When they are an integer, he can also write it.

In fact, when I go deeper, I understand that it maps StringType to "TEXT", which is not recognized by Oracle (should instead be "VARCHAR"). The code follows from jdbc.scala, which can be found on GitHub :

 def schemaString(df: DataFrame, url: String): String = { val sb = new StringBuilder() val dialect = JdbcDialects.get(url) df.schema.fields foreach { field => { val name = field.name val typ: String = dialect.getJDBCType(field.dataType).map(_.databaseTypeDefinition).getOrElse( field.dataType match { case IntegerType => "INTEGER" case LongType => "BIGINT" case DoubleType => "DOUBLE PRECISION" case FloatType => "REAL" case ShortType => "INTEGER" case ByteType => "BYTE" case BooleanType => "BIT(1)" case StringType => "TEXT" case BinaryType => "BLOB" case TimestampType => "TIMESTAMP" case DateType => "DATE" case DecimalType.Unlimited => "DECIMAL(40,20)" case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC") }) val nullable = if (field.nullable) "" else "NOT NULL" sb.append(s", $name $typ $nullable") }} if (sb.length < 2) "" else sb.substring(2) } 

So, the question is, am I mistaken somewhere, or does SparkSQL not support Oracle, and should I install a plug-in to use SparkSQL with Oracle?

My simple principle:

 val conf = new SparkConf().setAppName("Parser").setMaster("local[*]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val reader = sqlContext.read val frame = reader.jdbc(url,"STUDENTS",connectionprop) frame.printSchema() frame.show() val row = Row("3","4") val struct = StructType( StructField("ONE", StringType, true) :: StructField("TWO", StringType, true) :: Nil) val arr = Array(row) val rddRow = sc.parallelize(arr) val dframe = sqlContext.createDataFrame(rddRow,struct ) dframe.printSchema() dframe.show() dframe.write.jdbc(url,"STUDENTS",connectionprop) 
+5
source share
3 answers

The actual answer is that it is impossible to return to Oracle using the existing implementation of DataFrame.write.jdbc () in version 1.4.0. But if you do not mind to upgrade to Spark 1.5, there is a slightly hacky way to do this. As described here , there are two problems:

simple - the spark way of checking the existence of a table is incompatible with the oracle

 SELECT 1 FROM $table LIMIT 1 

which can be easily avoided with the direct save table utility

 org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.saveTable(df, url, table, props) 

and complex (you guessed it right) - there is no Oracle data type dialog type available out of the box. Adopted from the decision of the same article:

 import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect} import org.apache.spark.sql.types._ val OracleDialect = new JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") || url.contains("oracle") override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { case StringType => Some(JdbcType("VARCHAR2(255)", java.sql.Types.VARCHAR)) case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC)) case IntegerType => Some(JdbcType("NUMBER(10)", java.sql.Types.NUMERIC)) case LongType => Some(JdbcType("NUMBER(19)", java.sql.Types.NUMERIC)) case DoubleType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC)) case FloatType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC)) case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC)) case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC)) case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB)) case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE)) case DateType => Some(JdbcType("DATE", java.sql.Types.DATE)) // case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC)) case DecimalType.Unlimited => Some(JdbcType("NUMBER(38,4)", java.sql.Types.NUMERIC)) case _ => None } } JdbcDialects.registerDialect(OracleDialect) 

so finally a working example should look something like this:

  val url: String = "jdbc:oracle:thin:@your_domain:1521/dbname" val driver: String = "oracle.jdbc.OracleDriver" val props = new java.util.Properties() props.setProperty("user", "username") props.setProperty("password", "userpassword") org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.saveTable(dataFrame, url, "table_name", props) 
+7
source

Update: starting with Spark 2.x

There is a problem where each column name is double-quoted in Spark when creating the jdbc table, and therefore, all column names of the Oracle table become case sensitive when trying to query them through sqlPlus.

 select colA from myTable; => doesn't works anymore select "colA" from myTable; => works 

[Solution] Dataframe for Oracle creates a case-sensitive table

0
source

you can use org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils.saveTable . as Ayrondir says.

-1
source

All Articles