How to use the flink fold function in scala

This is a failed attempt to use Flink fold with the anonymous scala function:

val myFoldFunction = (x: Double, t:(Double,String,String)) => x + t._1 env.readFileStream(...). ... .groupBy(1) .fold(0.0, myFoldFunction : Function2[Double, (Double,String,String), Double]) 

It compiles well, but upon execution I get a "type erase problem" (see below). Doing it in Java is good, but, of course, more verbose. I like the concise and clear lambdas. How to do it in scala?

 Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type of TypeVariable 'R' in 'public org.apache.flink.streaming.api.scala.DataStream org.apache.flink.streaming.api.scala.DataStream.fold(java.lang.Object,scala.Function2,org.apache.flink.api.common.typeinfo.TypeInformation,scala.reflect.ClassTag)' could not be determined. This is most likely a type erasure problem. The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s). 
+5
source share
1 answer

The problem you are facing is a bug in Flink [1]. The problem comes from the Flink TypeExtractor and the way the Scala DataStream API is implemented on top of the Java implementation. TypeExtractor cannot generate a TypeInformation for a Scala type and thus returns a MissingTypeInformation . This missing type information is manually set after creating the StreamFold . However, the StreamFold operator StreamFold implemented in such a way that it does not accept MissingTypeInformation and therefore does not work before setting up the correct type information.

I opened a pull request [2] to fix this problem. It should be combined over the next two days. Using the latest version of snapshot 0.10, your problem should be fixed.

+3
source

All Articles