I hope it’s never too late to answer, at least someone might find my answer useful :)
So topology.newStaticState() is a Trident abstraction for the requested data store. The parameter for newStaticState() should be the implementation - based on the method contract - storm.trident.state.StateFactory . factory, in turn, must implement the makeState() method, which returns an instance of storm.trident.state.State . However, if you plan on requesting your status, you should instead return the value tt> t, since a regular storm.trident.state.State does not have methods to query the actual data source (you really get a class exception if you try to use something, except ReadOnlyMapState ).
So let's try it!
Implementation of a fictitious state:
public static class ExampleStaticState implements ReadOnlyMapState<String> { private final Map<String, String> dataSourceStub; public ExampleStaticState() { dataSourceStub = new HashMap<>(); dataSourceStub.put("tuple-00", "Trident"); dataSourceStub.put("tuple-01", "definitely"); dataSourceStub.put("tuple-02", "lacks"); dataSourceStub.put("tuple-03", "documentation"); } @Override public List<String> multiGet(List<List<Object>> keys) { System.out.println("DEBUG: MultiGet, keys is " + keys); List<String> result = new ArrayList<>(); for (List<Object> inputTuple : keys) { result.add(dataSourceStub.get(inputTuple.get(0))); } return result; } @Override public void beginCommit(Long txid) {
A factory:
public static class ExampleStaticStateFactory implements StateFactory { @Override public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { return new ExampleStaticState(); } }
Simple psvm (aka public static void main ):
public static void main(String... args) { TridentTopology tridentTopology = new TridentTopology(); FeederBatchSpout spout = new FeederBatchSpout(Arrays.asList(new String[]{ "foo" })); TridentState state = tridentTopology.newStaticState(new ExampleStaticStateFactory()); tridentTopology .newStream("spout", spout) .stateQuery(state, new Fields("foo"), new MapGet(), new Fields("bar")) .each(new Fields("foo", "bar"), new Debug()) ; Config conf = new Config(); conf.setNumWorkers(6); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("tridentTopology", conf, tridentTopology.build()); spout.feed(Arrays.asList(new Values[]{ new Values("tuple-00"), new Values("tuple-01"), new Values("tuple-02"), new Values("tuple-03") })); localCluster.shutdown(); }
And finally, the conclusion:
DEBUG: MultiGet, keys is [[tuple-00], [tuple-01], [tuple-02], [tuple-03]] DEBUG: [tuple-00, Trident] DEBUG: [tuple-01, definitely] DEBUG: [tuple-02, lacks] DEBUG: [tuple-03, documentation]
You see that stateQuery () gets the values from the input batch and matches them with the values found in the "data store".
Diving is a little deeper, you can look at the source of the MapGet class (the guy whose instance is used to query inside the topology) and find the following there:
public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> { @Override public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) { return map.multiGet((List) keys); } @Override public void execute(TridentTuple tuple, Object result, TridentCollector collector) { collector.emit(new Values(result)); } }
Thus, under the hood, it simply calls the multiGet() method of your ReadOnlyMapState implementation, and then emits the values found in the data store, adding them to the existing tuple. You can (though perhaps not the best) to create your own implementation of BaseQueryFunction<ReadOnlyMapState, Object> by doing something more complex.