How to match persistent tuples in Trident?

I am learning Trident . There are several methods for Trident Stream s for aggregation tuples in a package, including this one that allows you to pre-map the state of tuples using an Aggregator . But, unfortunately, the built-in analogue additionally saves the state of the map, like the other 9 overloads of persistentAggregate() , only with Aggregator as an argument, no.

So, how can I implement the desired functionality by combining the abstractions and lower-level Trident and Storm tools? Learning the API is quite difficult, because there is almost no Javadoc documentation.

In other words, the persistentAggregate() methods allow you to complete the processing of a stream by updating some constant state:

 stream of tuples ---> persistent state 

I want to update a constant state and emit different tuples, by the way:

 stream of tuples ------> stream of different tuples with persistent state 

Stream.aggregate(Fields, Aggregator, Fields) does not provide fault tolerance:

 stream of tuples ------> stream of different tuples with simple in-memory state 
+7
stream state apache-storm trident
source share
1 answer

You can create a new stream from a state using the TridentState # newValuesStream () method. This will allow you to get a stream of aggregated values.

To illustrate, we can improve the example in the Trident documentation by adding this method and a debug filter:

 FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3, new Values("the cow jumped over the moon"), new Values("the man went to the store and bought some candy"), new Values("four score and seven years ago"), new Values("how many apples can you eat")); spout.setCycle(true); TridentTopology topology = new TridentTopology(); topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) .newValuesStream().each(new Fields("count"), new Debug()); 

Running this topology will output (to the console) aggregated calculations.

Hope this helps

+3
source share

All Articles