Does anyone know how to configure `
streamingContext.fileStream [KeyClass, ValueClass, InputFormatClass] (dataDirectory)
to actually use binary files.
Where can I find all inputformatClass? The documentation does not provide a link for this. I suppose ValueClass is connected to inputformatClass somehow.
In the non-streaming version using the binaryfiles method, I can get ByteArrays for each file. Is there a way that I can get the same with sparking? If not, where can I find these details. Value support inputformat and the value class that it produces. Finally one can select any KeyClass, aren't all of these elements connected?
If someone clarifies the use of the method.
EDIT1
I tried the following:
val bfiles = ssc.fileStreamBytesWritable, BytesWritable, SequenceFileAsBinaryInputFormat
However, the compiler complains as such:
[error] /xxxxxxxxx/src/main/scala/EstimatorStreamingApp.scala:14: type arguments [org.apache.hadoop.io.BytesWritable,org.apache.hadoop.io.BytesWritable,org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat] conform to the bounds of none of the overloaded alternatives of [error] value fileStream: [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path => Boolean, newFilesOnly: Boolean, conf: org.apache.hadoop.conf.Configuration)(implicit evidence$10: scala.reflect.ClassTag[K], implicit evidence$11: scala.reflect.ClassTag[V], implicit evidence$12: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String, filter: org.apache.hadoop.fs.Path => Boolean, newFilesOnly: Boolean)(implicit evidence$7: scala.reflect.ClassTag[K], implicit evidence$8: scala.reflect.ClassTag[V], implicit evidence$9: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] <and> [K, V, F <: org.apache.hadoop.mapreduce.InputFormat[K,V]](directory: String)(implicit evidence$4: scala.reflect.ClassTag[K], implicit evidence$5: scala.reflect.ClassTag[V], implicit evidence$6: scala.reflect.ClassTag[F])org.apache.spark.streaming.dstream.InputDStream[(K, V)] [error] val bfiles = ssc.fileStream[BytesWritable, BytesWritable, SequenceFileAsBinaryInputFormat]("/xxxxxxxxx/Casalini_streamed")
What am I doing wrong?
apache-spark spark-streaming
Maatdeamon
source share