I found a problem and solution: Starting with Spark 2.x, each column name gets a double quote when creating the table, and therefore, the resulting column table of the Oracle table becomes case sensitive when trying to query them through sqlPlus.
dialect.quoteIdentifier
[ https://github.com/apache/spark/blob/branch-2.1/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala#L645]
and this .quoteIdentifier dialect is double quotation marks [ " ]
def quoteIdentifier(colName: String): String = { s""""$colName"""" }
[ https://github.com/apache/spark/blob/branch-2.1/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala#L90]
Solution: Unregister the existing OracleDialect and Reregister while redefining dialect.quoteIdentifier along with other necessary materials needed to work with Oracle Dialect
import java.sql.Types import org.apache.spark.sql.types._ import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils import org.apache.spark.sql.jdbc.{ JdbcDialects, JdbcType, JdbcDialect } val url= "jdbc:oracle:thin:@HOST:1567/SID" val dialect = JdbcDialects JdbcDialects.unregisterDialect(dialect.get(url)) val OracleDialect = new JdbcDialect { override def canHandle(url: String): Boolean = url.startsWith("jdbc:oracle") || url.contains("oracle") override def getCatalystType(sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { // Handle NUMBER fields that have no precision/scale in special way because JDBC ResultSetMetaData converts this to 0 procision and -127 scale if (sqlType == Types.NUMERIC && size == 0) { // This is sub-optimal as we have to pick a precision/scale in advance whereas the data in Oracle is allowed // to have different precision/scale for each value. This conversion works in our domain for now though we // need a more durable solution. Look into changing JDBCRDD (line 406): // FROM: mutableRow.update(i, Decimal(decimalVal, p, s)) // TO: mutableRow.update(i, Decimal(decimalVal)) Some(DecimalType(DecimalType.MAX_PRECISION, 10)) } // Handle Timestamp with timezone (for now we are just converting this to a string with default format) //else if (sqlType == -101) { // Some(StringType) // } else None } override def getJDBCType(dt: DataType): Option[JdbcType] = dt match { case StringType => Some(JdbcType("VARCHAR2(2000)", java.sql.Types.VARCHAR)) case BooleanType => Some(JdbcType("NUMBER(1)", java.sql.Types.NUMERIC)) case IntegerType => Some(JdbcType("NUMBER(10)", java.sql.Types.NUMERIC)) case LongType => Some(JdbcType("NUMBER(19)", java.sql.Types.NUMERIC)) case DoubleType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC)) case FloatType => Some(JdbcType("NUMBER(19,4)", java.sql.Types.NUMERIC)) case ShortType => Some(JdbcType("NUMBER(5)", java.sql.Types.NUMERIC)) case ByteType => Some(JdbcType("NUMBER(3)", java.sql.Types.NUMERIC)) case BinaryType => Some(JdbcType("BLOB", java.sql.Types.BLOB)) case TimestampType => Some(JdbcType("DATE", java.sql.Types.TIMESTAMP)) case DateType => Some(JdbcType("DATE", java.sql.Types.DATE)) //case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC)) //case DecimalType.Unlimited => Some(JdbcType("NUMBER(38,4)", java.sql.Types.NUMERIC)) case _ => None } //Imp from Spark2.0 since otherwise oracle table columns would be case-sensitive override def quoteIdentifier(colName: String): String = { colName } } JdbcDialects.registerDialect(OracleDialect)