Apache Flink can read compressed files in the following formats:
org.apache.hadoop.io.compress.BZip2Codec org.apache.hadoop.io.compress.DefaultCodec org.apache.hadoop.io.compress.DeflateCodec org.apache.hadoop.io.compress.GzipCodec org.apache.hadoop.io.compress.Lz4Codec org.apache.hadoop.io.compress.SnappyCodec
As you can see from the package names, Flink does this with the Hadoop InputFormats. This is an example for reading gz files using the Flink Scala API: (You need at least Flink 0.8.1)
def main(args: Array[String]) { val env = ExecutionEnvironment.getExecutionEnvironment val job = new JobConf() val hadoopInput = new TextInputFormat() FileInputFormat.addInputPath(job, new Path("/home/robert/Downloads/cawiki-20140407-all-titles.gz")) val lines = env.createHadoopInput(hadoopInput, classOf[LongWritable], classOf[Text], job) lines.print env.execute("Read gz files") }
Apache Flink has built-in support for .deflate files. Adding support for more compression codecs is easy to do, but not done yet.
Using HadoopInputFormats with Flink does not result in performance loss. Flink has built-in support for Hadoop Writable type serialization.
Robert Metzger
source share