Flink: sharing status in CoFlatMapFunction

Try a little CoFlatMapFunction . It seems to work fine if I put it in a DataStream in front of the window, but it doesn't work if it is placed after using the window function.

I tested two streams, the main "functions" on flatMap1 constantly swallowed data and controlled the "Model" stream on flatMap2 , changing the model on request.

I can set b0 / b1 to flatMap2 , but flatMap1 always sees b0 and b1, as with initialization. 0 / p>

Did I miss something obvious here?

 public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> { private static final long serialVersionUID = 1L; Double b0; Double b1; public applyModel(){ b0=0.0; b1=0.0; } @Override public void flatMap1(Features value, Collector<EnrichedFeatures> out) { System.out.print("Main: " + this + "\n"); } @Override public void flatMap2(Model value, Collector<EnrichedFeatures> out) { System.out.print("Old Model: " + this + "\n"); b0 = value.getB0(); b1 = value.getB1(); System.out.print("New Model: " + this + "\n"); } @Override public String toString(){ return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}"; } } 
+8
apache-flink flink-streaming
source share
1 answer

Here is the answer from the mailing list ...

Does CoFlatMapFunction work in parallel?

If so, you need to somehow deterministically assign which record goes to that parallel instance. In some way, the CoFlatMapFunction parallel function (sections) are connected between the model and the result of the session window, so you need some form of key that selects which sections the elements go to. It makes sense?

If not, try explicitly specifying its parallelism 1.

Cheers, Stefan


A read-only global state is performed via broadcast ().

The global status, accessible to all for reading and updating, is currently unavailable. Successive operations on this would be quite expensive, requiring some form of distributed communication / consensus.

Instead, I would advise you to go with the following:

1) If you can split the state, use keyBy (). mapWithState () - That localizes state operations and does this very quickly.

2) If your state is not organized by a key, your state is probably very small, and you can use a non-parallel operation.

3) If any operation updates the state, and the other to it, you can often implement this using iterations and CoFlatMapFunction (one side is the original input, the other is the feedback input).

All approaches ultimately localize the availability and modification of the state, which is a good model for observation, if possible.

Cheers, Stefan

+2
source share

All Articles