Reading a binary using Spark Streaming

Does anyone know how to configure `

streamingContext.fileStream [KeyClass, ValueClass, InputFormatClass] (dataDirectory)

to actually use binary files.

  • Where can I find all inputformatClass? The documentation does not provide a link for this. I suppose ValueClass is connected to inputformatClass somehow.

  • In the non-streaming version using the binaryfiles method, I can get ByteArrays for each file. Is there a way that I can get the same with sparking? If not, where can I find these details. Value support inputformat and the value class that it produces. Finally one can select any KeyClass, aren't all of these elements connected?

If someone clarifies the use of the method.

EDIT1

I tried the following:

val bfiles = ssc.fileStreamBytesWritable, BytesWritable, SequenceFileAsBinaryInputFormat

However, the compiler complains as such:

[error] /xxxxxxxxx/src/main/scala/EstimatorStreamingApp.scala:14: type arguments [org.apache.hadoop.io.BytesWritable,org.apache.hadoop.io.BytesWritable,org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat] conform to the bounds of none of the overloaded alternatives of [error] value fileStream: [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path => Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence$10: scala.reflect.ClassTag[K], implicit evidence$11: scala.reflect.ClassTag[V], implicit evidence$12: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path => Boolean, newFilesOnly: Boolean)(implicit evidence$7: scala.reflect.ClassTag[K], implicit evidence$8: scala.reflect.ClassTag[V], implicit evidence$9: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence$4: scala.reflect.ClassTag[K], implicit evidence$5: scala.reflect.ClassTag[V], implicit evidence$6: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] [error] val bfiles = ssc.fileStream[BytesWritable, BytesWritable, SequenceFileAsBinaryInputFormat]("/xxxxxxxxx/Casalini_streamed") 

What am I doing wrong?

+1
apache-spark spark-streaming
source share
2 answers

Follow the link to read about all hadoop input formats

I found here a well-documented answer about the sequence file format.

You are facing a compilation issue due to missmatch imports. Hadoop Mapred vs mapreduce

eg.

Java

 JavaPairInputDStream<Text,BytesWritable> dstream= sc.fileStream("/somepath",org.apache.hadoop.io.Text.class, org.apache.hadoop.io.BytesWritable.class, org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat.class); 

I did not try in scala, but it should be something similar;

 val dstream = sc.fileStream("/somepath", classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.BytesWritable], classOf[org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat] ) ; 
+1
source share

I finally got it for compilation.

The compilation problem was in import. I used

  • import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat

I replaced it with

  • import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat

Then it works. However, I have no idea why. I do not understand the difference between the two hierarchies. Both files seem to have the same content. So hard to say. If someone can help clarify this here, I think it will help a lot.

0
source share

All Articles