Connect to MS SQL Server using Spark

I am trying to use Spark JdbcRDD to load data from a SQL Server database. I am using version 4.0 of the Microsoft JDBC driver. Here is the code snippet:

 public JdbcRDD<Object[]> load(){
    SparkConf conf = new SparkConf().setMaster("local").setAppName("myapp");
    JavaSparkContext context = new JavaSparkContext(conf);
    DbConnection connection = new DbConnection("com.microsoft.sqlserver.jdbc.SQLServerDriver","my-connection-string","test","test");
    JdbcRDD<Object[]> jdbcRDD = new JdbcRDD<Object[]>(context.sc(),connection,"select * from <table>",1,1000,1,new JobMapper(),ClassManifestFactory$.MODULE$.fromClass(Object[].class));
    return jdbcRDD;
}

public static void main(String[] args) {
    JdbcRDD<Object[]> jdbcRDD = load();
    JavaRDD<Object[]> javaRDD = JavaRDD.fromRDD(jdbcRDD, ClassManifestFactory$.MODULE$.fromClass(Object[].class));
    List<String> ids = javaRDD.map(new Function<Object[],String>(){
       public String call(final Object[] record){
           return (String)record[0];
       }
    }).collect();
    System.out.println(ids);
}

I get the following exception:

java.lang.AbstractMethodError: com.microsoft.sqlserver.jdbc.SQLServerPreparedStatement.isClosed()Z
at org.apache.spark.rdd.JdbcRDD$$anon$1.close(JdbcRDD.scala:109)
at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
at org.apache.spark.rdd.JdbcRDD$$anon$1$$anonfun$1.apply(JdbcRDD.scala:74)
at org.apache.spark.rdd.JdbcRDD$$anon$1$$anonfun$1.apply(JdbcRDD.scala:74)
at org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68)
at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:58)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:695)

Here is the definition of JobMapper:

public class JobMapper extends AbstractFunction1<ResultSet, Object[]> implements Serializable {

private static final Logger logger = Logger.getLogger(JobMapper.class);
public Object[] apply(ResultSet row){
    return JdbcRDD.resultSetToObjectArray(row);
}

}

+4
source share
2 answers

I found a problem with what I was doing. There were a few things:

  • It does not work with driver version 4.0. So I changed it to use version 3.0
  • The documentation for JdbcRDD states that the SQL string should contain two parameters that indicate the range for the query. So I had to change the request.

JdbcRDD<Object[]> jdbcRDD = new JdbcRDD<Object[]>(context.sc(),connection,"SELECT * FROM <table> where Id >= ? and Id <= ?",1,20,1,new JobMapper(),ClassManifestFactory$.MODULE$.fromClass(Object[].class));

Parameters 1 and 20 indicate the range for the request.

+1
source

. , Spark (1.3.0). .

, . , jar (sqljdbc40.jar) SQL Server :

YOUR_SPARK_HOME/ ​​//

, Spark .

:

JavaSparkContext sc = new JavaSparkContext("local", appName); //master is set to local
SQLContext sqlContext = new SQLContext(sc);

//This url connection string is not complete (include your credentials or integrated security options)
String url = "jdbc:sqlserver://" + host + ":1433;DatabaseName=" + database;
String driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver";

//Settings for SQL Server jdbc connection
Map<String, String> options = new HashMap<>();
options.put("driver", driver);
options.put("url", url);
options.put("dbtable", tablename);

//Get table from SQL Server and save data in a DataFrame using JDBC
DataFrame jdbcDF = sqlContext.load("jdbc", options);
jdbcDF.printSchema();
long numRecords = jdbcDF.count();
System.out.println("Number of records in jdbcDF: " + numRecords);
jdbcDF.show();
+1

All Articles