What is Trident State in Storm?

I am new to Trident in Storm. I puzzle over TridentState. As far as I understand, the trident maintains state (i.e. metadata) for each batch (whether all tuples in the batch are fully processed, supporting the transaction identifier in the database), and I'm not quite sure what the following statement does

TridentState urlToTweeters = topology.newStaticState(getUrlToTweetersState()); 

Can someone explain what actually happens when we define the above code?

+7
apache-storm trident
source share
2 answers

I hope it’s never too late to answer, at least someone might find my answer useful :)

So topology.newStaticState() is a Trident abstraction for the requested data store. The parameter for newStaticState() should be the implementation - based on the method contract - storm.trident.state.StateFactory . factory, in turn, must implement the makeState() method, which returns an instance of storm.trident.state.State . However, if you plan on requesting your status, you should instead return the value tt> t, since a regular storm.trident.state.State does not have methods to query the actual data source (you really get a class exception if you try to use something, except ReadOnlyMapState ).

So let's try it!

Implementation of a fictitious state:

 public static class ExampleStaticState implements ReadOnlyMapState<String> { private final Map<String, String> dataSourceStub; public ExampleStaticState() { dataSourceStub = new HashMap<>(); dataSourceStub.put("tuple-00", "Trident"); dataSourceStub.put("tuple-01", "definitely"); dataSourceStub.put("tuple-02", "lacks"); dataSourceStub.put("tuple-03", "documentation"); } @Override public List<String> multiGet(List<List<Object>> keys) { System.out.println("DEBUG: MultiGet, keys is " + keys); List<String> result = new ArrayList<>(); for (List<Object> inputTuple : keys) { result.add(dataSourceStub.get(inputTuple.get(0))); } return result; } @Override public void beginCommit(Long txid) { // never gets executed... System.out.println("DEBUG: Begin commit, txid=" + txid); } @Override public void commit(Long txid) { // never gets executed... System.out.println("DEBUG: Commit, txid=" + txid); } } 

A factory:

 public static class ExampleStaticStateFactory implements StateFactory { @Override public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { return new ExampleStaticState(); } } 

Simple psvm (aka public static void main ):

 public static void main(String... args) { TridentTopology tridentTopology = new TridentTopology(); FeederBatchSpout spout = new FeederBatchSpout(Arrays.asList(new String[]{ "foo" })); TridentState state = tridentTopology.newStaticState(new ExampleStaticStateFactory()); tridentTopology .newStream("spout", spout) .stateQuery(state, new Fields("foo"), new MapGet(), new Fields("bar")) .each(new Fields("foo", "bar"), new Debug()) ; Config conf = new Config(); conf.setNumWorkers(6); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("tridentTopology", conf, tridentTopology.build()); spout.feed(Arrays.asList(new Values[]{ new Values("tuple-00"), new Values("tuple-01"), new Values("tuple-02"), new Values("tuple-03") })); localCluster.shutdown(); } 

And finally, the conclusion:

 DEBUG: MultiGet, keys is [[tuple-00], [tuple-01], [tuple-02], [tuple-03]] DEBUG: [tuple-00, Trident] DEBUG: [tuple-01, definitely] DEBUG: [tuple-02, lacks] DEBUG: [tuple-03, documentation] 

You see that stateQuery () gets the values ​​from the input batch and matches them with the values ​​found in the "data store".

Diving is a little deeper, you can look at the source of the MapGet class (the guy whose instance is used to query inside the topology) and find the following there:

 public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> { @Override public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) { return map.multiGet((List) keys); } @Override public void execute(TridentTuple tuple, Object result, TridentCollector collector) { collector.emit(new Values(result)); } } 

Thus, under the hood, it simply calls the multiGet() method of your ReadOnlyMapState implementation, and then emits the values ​​found in the data store, adding them to the existing tuple. You can (though perhaps not the best) to create your own implementation of BaseQueryFunction<ReadOnlyMapState, Object> by doing something more complex.

+9
source share

There is good documentation of the status of Trident in a wiki storm . The simple answer to your question is that urlToTweeters is a state object from which you can request. I assume that the above statement applies to the trident textbook reproduced below:

 TridentState urlToTweeters = topology.newStaticState(getUrlToTweetersState()); TridentState tweetersToFollowers = topology.newStaticState(getTweeterToFollowersState()); topology.newDRPCStream("reach") .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")) /* At this point we have the tweeters for each url passed in args */ .shuffle() .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")) .parallelismHint(200) .each(new Fields("followers"), new ExpandList(), new Fields("follower")) .groupBy(new Fields("follower")) .aggregate(new One(), new Fields("one")) .parallelismHint(20) .aggregate(new Count(), new Fields("reach")); 

In this example, urlToTweeters stores the mapping of URLs to tweeters, and the DRPC reach request, defined on the next line (which takes the URLs as arguments), will ultimately give access. But along the way (marked with a comment in the line) you will see a stream of tweeters of each URL, i.e. The result of the urlToTweeters request.

0
source share

All Articles