Testing modules using Spark information blocks

I am trying to test the part of my program that performs conversions on data frames. I want to test several different variations of these data frames that exclude the ability to read a specific DF from a file.

And here are my questions:

  1. Is there any good guide on how to do unit testing with Spark and data frames, especially regarding creating data frames?
  2. How can I create these different multiple rows of data without a lot of patterns and without reading them from a file?
  3. Are there any utility classes for checking specific values ​​inside a data frame?

I obviously googled it before, but could not find anything that would be very useful. Among the more useful links I found were:

It would be great if the examples / tutorials were in Scala, but I will use whatever language you have

thank you in advance

+7
source share
3 answers

This link shows how we can programmatically create a data frame with a schema. You can store data in separate lines and mix them with your tests. For example,

// This example assumes CSV data. But same approach should work for other formats as well.

trait TestData {
  val data1 = List(
    "this,is,valid,data",
    "this,is,in-valid,data",
  )
  val data2 = ...  
}

Then with ScalaTest we can do something similar.

class MyDFTest extends FlatSpec with Matchers {

  "method" should "perform this" in new TestData {
     // You can access your test data here. Use it to create the DataFrame.
     // Your test here.
  }
}

You can have several utilitarian methods to create a DataFrame, as shown below.

  def schema(types: Array[String], cols: Array[String]) = {
    val datatypes = types.map {
      case "String" => StringType
      case "Long" => LongType
      case "Double" => DoubleType
      // Add more types here based on your data.
      case _ => StringType
    }
    StructType(cols.indices.map(x => StructField(cols(x), datatypes(x))).toArray)
  }

  def df(data: List[String], types: Array[String], cols: Array[String]) = {
    val rdd = sc.parallelize(data)
    val parser = new CSVParser(',')
    val split = rdd.map(line => parser.parseLine(line))
    val rdd = split.map(arr => Row(arr(0), arr(1), arr(2), arr(3)))
    sqlContext.createDataFrame(rdd, schema(types, cols))
  }

- DataFrame. , API DataFrame.

+7

, - Java, start, SparkContext : https://github.com/holdenk/spark-testing-base

AVRO. Avro-tools (https://avro.apache.org/docs/1.8.2/gettingstartedjava.html#download_install), , :

java -jar $AVRO_HOME/avro tojson largeAvroFile.avro | head -3

, , JSON DataFrame .

private DataFrame getDataFrameFromList() {
    SQLContext sqlContext = new SQLContext(jsc());
    ImmutableList<String> elements = ImmutableList.of(
        {"header":{"appId":"myAppId1","clientIp":"10.22.63.3","createdDate":"2017-05-10T02:09:59.984Z"}}
        {"header":{"appId":"myAppId1","clientIp":"11.22.63.3","createdDate":"2017-05-11T02:09:59.984Z"}}
        {"header":{"appId":"myAppId1","clientIp":"12.22.63.3","createdDate":"2017-05-11T02:09:59.984Z"}}
    );
    JavaRDD<String> parallelize = jsc().parallelize(elements);
    return sqlContext.read().json(parallelize);
}
0

You can use and which Spark uses for its own unit tests. Check my answer for examples. SharedSQLContext SharedSparkSession

0
source

All Articles