How to link many files in S3 using Spark

I have 20 million files in S3 spanning approximately 8000 days.

Files are sorted by UTC, for example: s3://mybucket/path/txt/YYYY/MM/DD/filename.txt.gz . Each file is a UTF-8 text containing between 0 (empty) and 100 KB of text (95th percentile, although there are several files up to several MB in size).

Using Spark and Scala (I am new to both and want to study), I would like to save the "daily packages" (8000 of them), each of which contains any number of files for this day. Ideally, I would like to keep the original file names, as well as their contents. The output should also be in S3 and be compressed in some format that is suitable for input in subsequent Spark steps and experiments.

One idea was to store packages as a bunch of JSON objects (one per line and '\n' -separated), for example

 {id:"doc0001", meta:{x:"blah", y:"foo", ...}, content:"some long string here"} {id:"doc0002", meta:{x:"foo", y:"bar", ...}, content: "another long string"} 

Alternatively, I could try the Hadoop SequenceFile, but again I'm not sure how to set it up elegantly.

Using the Spark shell, for example, I saw that reading files is very easy, for example:

 val textFile = sc.textFile("s3n://mybucket/path/txt/1996/04/09/*.txt.gz") // or even val textFile = sc.textFile("s3n://mybucket/path/txt/*/*/*/*.txt.gz") // which will take for ever 

But how can I β€œintercept” the reader to provide a file name?

Or maybe I should get the RDD of all files divided by day, and at the stage of reduction write K=filename, V=fileContent ?

+7
scala amazon-s3 hadoop apache-spark
source share
3 answers

Have you tried something along the lines of sc.wholeTextFiles?

It creates an RDD where the key is the name of the file, and the value is an array of bytes of the entire file. Then you can match this so that the key is the date of the file and then groupByKey?

http://spark.apache.org/docs/latest/programming-guide.html

+2
source share

You can use this

First you can get the S3 buffer / path list:

 import scala.collection.JavaConverters._ import java.util.ArrayList import com.amazonaws.services.s3.AmazonS3Client import com.amazonaws.services.s3.model.ObjectListing import com.amazonaws.services.s3.model.S3ObjectSummary import com.amazonaws.services.s3.model.ListObjectsRequest def listFiles(s3_bucket:String, base_prefix : String) = { var files = new ArrayList[String] //S3 Client and List Object Request var s3Client = new AmazonS3Client(); var objectListing: ObjectListing = null; var listObjectsRequest = new ListObjectsRequest(); //Your S3 Bucket listObjectsRequest.setBucketName(s3_bucket) //Your Folder path or Prefix listObjectsRequest.setPrefix(base_prefix) //Adding s3:// to the paths and adding to a list do { objectListing = s3Client.listObjects(listObjectsRequest); for (objectSummary <- objectListing.getObjectSummaries().asScala) { files.add("s3://" + s3_bucket + "/" + objectSummary.getKey()); } listObjectsRequest.setMarker(objectListing.getNextMarker()); } while (objectListing.isTruncated()); //Removing Base Directory Name files.remove(0) //Creating a Scala List for same files.asScala } 

Now pass this List object to the following code snippet, note: sc is an SQLContext object

 var df: DataFrame = null; for (file <- files) { val fileDf= sc.textFile(file) if (df!= null) { df= df.unionAll(fileDf) } else { df= fileDf } } 

You have now received the final unified RDD, i.e. df

Optional, and you can also remake it in one BigRDD

 val files = sc.textFile(filename, 1).repartition(1) 

Redistribution always works: D

+4
source share

On your scale, an elegant solution will stretch.

I would recommend using sc.textFile("s3n://mybucket/path/txt/*/*/*/*.txt.gz") , as required forever. What you can do is use AWS DistCp or something like moving files to HDFS. Once in HDFS, the spark pretty quickly absorbs information as it suits you.

Please note: most of these processes require a specific list of files, so you need to generate them somehow. for files with 20 miles, this list of files will be the neck of the bottle. I would recommend creating a file that is added to the file path, each time the file is uploaded to s3.

The same for output, it is placed in hdfs and then go to s3 (although a direct copy can be equally effective).

+1
source share

All Articles