Like the nextTuple storm handle in a bolt

I am new to Storm and created a program to read the added numbers over time. I used the counter in Spout and in the nextTuple () method, the counter is emitted and incremented

_collector.emit(new Values(new Integer(currentNumber++))); /* how this method is being continuously called...*/ 

and the execute () method of the Tuple class has

 public void execute(Tuple input) { int number = input.getInteger(0); logger.info("This number is (" + number + ")"); _outputCollector.ack(input); } /*this part I am clear as Bolt would receive the input from Spout*/ 

In my Main class execution, I have the following code

 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("NumberSpout", new NumberSpout()); builder.setBolt("NumberBolt", new PrimeNumberBolt()) .shuffleGrouping("NumberSpout"); Config config = new Config(); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("NumberTest", config, builder.createTopology()); Utils.sleep(10000); localCluster.killTopology("NumberTest"); localCluster.shutdown(); 

Programs work great fine. I am currently looking at how the Storm environment internally calls the nextTuple () method continuously . I am sure that in my understanding something is missing, and because of this gap, I can not connect to the internal logic of this structure.

Can any of you guys help me in understanding this part, then this will be a big help for me, since I will have to implement this concept in my project. If I am conceptually clear here, I can make significant progress. Appreciate if anyone can quickly help me here. Waiting for replies ...

+8
java apache-storm
source share
2 answers

how the Storm environment internally calls the nextTuple () method continuously.

I believe that this actually includes a very detailed discussion of the entire life cycle of the storm topology, as well as clear concepts of various objects, such as employees, performers, tasks, etc. Actual topology processing is done using StormSubmitter with its submitTopology method.

The very first thing he does is to start loading the jar using the Nimbus Thrift interface , and then calls submitTopology, which ultimately transfers the Nimbus topology.

Nimbus then starts by normalizing the topology (from doc: The main goal of normalization is to ensure that each individual task has the same serialization logs, which is crucial for the serialization to work correctly), followed by serialization, manual jitter, the boss loader and the workflow and etc. It is too wide to discuss, but if you really want to dig more, you can go through the life cycle of the storm topology , where it perfectly explains the step-by-step actions that are performed all the time.
(quick note from the documentation)

First, a couple of important topology notes:

The actual topology that works is different than the user topology defines. The actual topology has implicit flows and implicit. Added acker bolt to control the acking frame (used to guarantee data processing).

Implicit topology is created through System-Topologies! function. Topology System! used in two places:
- - when Nimbus creates tasks for the topology code
- - in the worker he knows where he needs to send messages to the code

Now here are some tips I could try to share ...
Tanks or bolts are actually components that perform real processing (logic). In stormy terminology, they perform so many tasks throughout the structure.
On the doc page: Each task corresponds to one thread of execution

Now, among many others, one typical worker process responsibility (read here ) in a storm is to track the weather, whether active or not, and stores the specific state in a variable called storm-active-atom . This variable is used by tasks to determine whether the nextTuple method should be called. As long as your topology is live (you haven't specified your spout code, but assume) until your timer is active (as you said for a certain amount of time), it will continue to call the nextTuple method. You can dig even further to understand the storm of the Acking framework , to understand how it understands and confirms once the tuple has been successfully processed and the Guaranteed message is processed

I am sure that in my understanding something is missing, and because of this gap, I can not connect to the internal logic of this structure

Having said that, I think its more important to get a clear idea of ​​how to work with the storm, rather than understand the storm at an early stage. for example, instead of studying the internal mechanism of the storm, it is important to understand that if we install a spout to read a line by line, it continues to emit each line using the _collector.emit method until it reaches EOF. And the bolt connected to it gets the same in its execute(tuple input) method

Hope this help will help you share with us in the future.

+13
source share

Regular noses

In the noise of the executor there is a loop that repeatedly calls nextTuple (as well as ack and fail when necessary) in the corresponding spout instance.

No tuples awaiting processing. Spout simply gets fail for tuples that could not be processed at the specified timeout. This can be easily modeled with a fast nose topology and a slow processing bolt: the nose will receive many fail calls.

See also ISpout javadoc :

nextTuple, ack and fail are called in a closed loop in one thread in the spout task. When there are no tuples to emit, it is polite to have the next batch sleep for a short period of time (for example, one millisecond) so as not to waste too much CPU.


Trident spouts

The situation is completely different for Trident-spouts :

By default, Trident processes one batch at a time, waiting for a packet to succeed or fail before trying another batch. You can get significantly higher throughput - and lower processing latency each batch - pipelined processing . You set the maximum number of batches to be processed simultaneously with topology.max.spout.pending .

Even while processing multiple packages at the same time, Trident will order any status updates that occur in the topology between batches.

+3
source share

All Articles