Converting mysql table to source dataset is very slow compared to CSV file

I have a csv file on Amazon s3 with a size of 62 MB (114,000 lines). I convert it to a spark dataset and take the first 500 lines. The code is as follows:

DataFrameReader df = new DataFrameReader(spark).format("csv").option("header", true);
Dataset<Row> set=df.load("s3n://"+this.accessId.replace("\"", "")+":"+this.accessToken.replace("\"", "")+"@"+this.bucketName.replace("\"", "")+"/"+this.filePath.replace("\"", "")+"");

 set.take(500)

The whole operation takes from 20 to 30 seconds.

Now I am trying to do the same, but using csv, I am using the mySQL table with 119,000 rows. The MySQL server is located in amazon ec2. The code is as follows:

String url ="jdbc:mysql://"+this.hostName+":3306/"+this.dataBaseName+"?user="+this.userName+"&password="+this.password;

SparkSession spark=StartSpark.getSparkSession();

SQLContext sc = spark.sqlContext();

DataFrameReader df = new DataFrameReader(spark).format("csv").option("header", true);
Dataset<Row> set = sc
            .read()
            .option("url", url)
            .option("dbtable", this.tableName)
            .option("driver","com.mysql.jdbc.Driver")
            .format("jdbc")
            .load();
set.take(500);

It takes 5 to 10 minutes. I am running a spark inside jvm. Using the same configuration in both cases.

I can use partitionColumn, numParttition, etc., but I don’t have a numeric column, and another problem - I don’t know the table layout.

, , , , , ?

+5
2

StackOverflow:

:

- DataFrameReader.jdbc . , exectuor.

:

  • lowerBound/upperBound:

    Properties properties;
    Lower
    
    Dataset<Row> set = sc
        .read()
        .option("partitionColumn", "foo")
        .option("numPartitions", "3")
        .option("lowerBound", 0)
        .option("upperBound", 30)
        .option("url", url)
        .option("dbtable", this.tableName)
        .option("driver","com.mysql.jdbc.Driver")
        .format("jdbc")
        .load();
    
  • predicates

    Properties properties;
    Dataset<Row> set = sc
        .read()
        .jdbc(
            url, this.tableName,
            {"foo < 10", "foo BETWWEN 10 and 20", "foo > 20"},
            properties
        )
    
+6

1. JDBC mysql. , .

wget http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.38/mysql-connector-java-5.1.38.jar

2. db-properties.flat

jdbcUrl=jdbc:mysql://${jdbcHostname}:${jdbcPort}/${jdbcDatabase}
user=<username>
password=<password>

3. , .

spark-shell --driver-class-path  <your path to mysql jar>

import java.io.{File, FileInputStream}
import java.util.Properties
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

hive sql

val sQLContext = new HiveContext(sc)
import sQLContext.implicits._
import sQLContext.sql

sQLContext.setConf("hive.exec.dynamic.partition", "true")
sQLContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")

mysql db

val dbProperties = new Properties()
dbProperties.load(new FileInputStream(new File("your_path_to/db-        properties.flat")))
val jdbcurl = dbProperties.getProperty("jdbcUrl")

#sqlcontext. where

val df1 = "(SELECT  * FROM your_table_name) as s1" 

jdbcurl, db

val df2 = sQLContext.read.jdbc(jdbcurl, df1, dbProperties)

df2.write.format("orc").partitionBy("your_partition_column_name").mode(SaveMode.Append).saveAsTable("your_target_table_name")
-1

All Articles