BZip2 Compressed Input for Apache Flink

I have a wikipedia dump compressed with bzip2 (downloaded from http://dumps.wikimedia.org/enwiki/ ), but I donโ€™t want to unpack it: I want to process it when unpacking on the fly.

I know that this can be done in simple Java (see, for example, Java - read the BZ2 file and unzip / unravel on the fly ), but I was wondering how to do it in Apache Flink? I probably need something like https://github.com/whym/wikihadoop , but for Flink, not Hadoop.

+7
apache-flink bzip2
source share
1 answer

Apache Flink can read compressed files in the following formats:

org.apache.hadoop.io.compress.BZip2Codec org.apache.hadoop.io.compress.DefaultCodec org.apache.hadoop.io.compress.DeflateCodec org.apache.hadoop.io.compress.GzipCodec org.apache.hadoop.io.compress.Lz4Codec org.apache.hadoop.io.compress.SnappyCodec 

As you can see from the package names, Flink does this with the Hadoop InputFormats. This is an example for reading gz files using the Flink Scala API: (You need at least Flink 0.8.1)

 def main(args: Array[String]) { val env = ExecutionEnvironment.getExecutionEnvironment val job = new JobConf() val hadoopInput = new TextInputFormat() FileInputFormat.addInputPath(job, new Path("/home/robert/Downloads/cawiki-20140407-all-titles.gz")) val lines = env.createHadoopInput(hadoopInput, classOf[LongWritable], classOf[Text], job) lines.print env.execute("Read gz files") } 

Apache Flink has built-in support for .deflate files. Adding support for more compression codecs is easy to do, but not done yet.

Using HadoopInputFormats with Flink does not result in performance loss. Flink has built-in support for Hadoop Writable type serialization.

+5
source share

All Articles