Storm: spout for reading data from the port

I need to write a stormy nose to read data from a port. I wanted to know if this is possible logically.

With that in mind, I developed a simple topology designed for the same nose and one bolt. The spout will collect the HTTP requests sent using wget, and the bolt will display the request. Exactly this.

My nose structure is as follows:

public class ProxySpout extends BaseRichSpout{ //The O/P collector SpoutOutputCollector sc; //The socket Socket clientSocket; //The server socket ServerSocket sc; public ProxySpout(int port){ this.sc=new ServerSocket(port); try{ clientSocket=sc.accept(); }catch(IOException ex){ //Handle it } } public void nextTuple(){ try{ InputStream ic=clientSocket.getInputStream(); byte b=new byte[8196]; int len=ic.read(b); sc.emit(new Values(b)); ic.close(); }catch(//){ //Handle it }finally{ clientSocket.close(); } } } 

I also used the rest of the methods.

When I turn this into a topology and run it, I get an error when sending the first request:

java.lang.RuntimeException: java.io.NotSerializableException: java.net.Socket

You just need to know that something is wrong with the way I implement this nose. Is it even possible for the spout to collect data from the port? Or for a nose to act as an instance of a proxy?

Edit

It worked.

Code:

  public class ProxySpout extends BaseRichSpout{ //The O/P collector static SpoutOutputCollector _collector; //The socket static Socket _clientSocket; static ServerSocket _serverSocket; static int _port; public ProxySpout(int port){ _port=port; } public void open(Map conf,TopologyContext context, SpoutOutputCollector collector){ _collector=collector; _serverSocket=new ServerSocket(_port); } public void nextTuple(){ _clientSocket=_serverSocket.accept(); InputStream incomingIS=_clientSocket.getInputStream(); byte[] b=new byte[8196]; int len=b.incomingIS.read(b); _collector.emit(new Values(b)); } } 

As suggested by @Shaw, I tried to initialize _serverSocket in the open() method, and _clientSocket works in the nextTuple() method to listen for requests.

I don't know the performance metrics for this, but it works .. :-)

+6
source share
1 answer

In the constructor, just assign variables. Try to create an instance of ServerSocket in the preparation method, do not write any new ones ... in the constructor. And rename the variables, you have two sc variables.

 public class ProxySpout extends BaseRichSpout{ int port; public ProxySpout(int port){ this.port=port; } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { //new ServerSocket } @Override public void nextTuple() { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } 

If you put it in the preparation method, it will be called only after the nozzle is already deployed, so it does not need to be serialized, and it will be called only once during the life of the nozzle, so it is not inefficient.

+6
source

All Articles