, . , -, . , , .
, , , flatMap
keyBy. stateful mapper Flink, , . , Flink, .
Flink, ,
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple3<String, Date, String>> input = env.fromElements(Tuple3.of("foo", new Date(1000), "bar"), Tuple3.of("foo", new Date(1000), "foobar"));
input.keyBy(0, 1).flatMap(new DuplicateFilter()).print();
env.execute("Test");
}
DuplicateFilter Flink.
>= 1.0
public static class DuplicateFilter extends RichFlatMapFunction<Tuple3<String, Date, String>, Tuple3<String, Date, String>> {
static final ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>("seen", Boolean.class, false);
private ValueState<Boolean> operatorState;
@Override
public void open(Configuration configuration) {
operatorState = this.getRuntimeContext().getState(descriptor);
}
@Override
public void flatMap(Tuple3<String, Date, String> value, Collector<Tuple3<String, Date, String>> out) throws Exception {
if (!operatorState.value()) {
out.collect(value);
operatorState.update(true);
}
}
}
0.10
public static class DuplicateFilter extends RichFlatMapFunction<Tuple3<String, Date, String>, Tuple3<String, Date, String>> {
private OperatorState<Boolean> operatorState;
@Override
public void open(Configuration configuration) {
operatorState = this.getRuntimeContext().getKeyValueState("seen", Boolean.class, false);
}
@Override
public void flatMap(Tuple3<String, Date, String> value, Collector<Tuple3<String, Date, String>> out) throws Exception {
if (!operatorState.value()) {
out.collect(value);
operatorState.update(true);
}
}
}
:
input.keyBy(0, 1).timeWindow(Time.seconds(1)).apply(new WindowFunction<Iterable<Tuple3<String,Date,String>>, Tuple3<String, Date, String>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple3<String, Date, String>> input, Collector<Tuple3<String, Date, String>> out) throws Exception {
out.collect(input.iterator().next());
}
})