Approach to insert and delete values ​​in HBase from the Apache Storm bolt

I have a Hormop Storm topology configured in pseudo-distributed mode. The topology contains a bolt that must write data to Hbase. My first approach for testing was to create (and close) a connection and write data directly in my execute bolt method. However, it looks like there are not many resources on my local machine to handle all requests coming to HBase. After about 30 successfully processed requests, I see the following entries in the Storm Workers logs:

 oazClientCnxn [INFO] Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) oazClientCnxn [INFO] Socket connection established to localhost/127.0.0.1:2181, initiating session oazClientCnxn [INFO] Unable to read additional data from server sessionid 0x0, likely server has closed socket, closing socket connection and attempting reconnect oahhzRecoverableZooKeeper [WARN] Possibly transient ZooKeeper, quorum=localhost:2181, exception=org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase/hbaseid 

My thought was to reduce the number of connections to HBase by creating one connection for each instance of my open bolt connection in the prepare method and close it in cleanup . However, according to the documentation, cleanup not guaranteed to be called in distributed mode.

After that I discovered the Storm environment for working with Hbase - storm-hbase . Unfortunately, there is almost no information about this, just readme on it github repo.

  • So, my first question is: do you use an assault base for Storm-Hbase? is integration a good solution? What could be the best way to do this?

Also, I need to be able to delete cells from the HBase table. But I did not find anything about this in the storm-hbase document.

  1. Can this be done with storm-hbase? Or back to the previous question, is there any other way to do all this?

Thanks in advance!

+7
hbase apache-storm
source share
2 answers

Oh boy, my time is shining! I had to spend a ton of optimization on HBase from Storm, so hopefully this helps you.

If you're just starting out storm-hbase is a great way to start streaming data conversion to hbase. You can simply clone the project, perform the maven installation, and then reference it in your topology.

However, if you start to get more complex logic, then creating your own classes for communicating with HBase is probably the way out. This is what I will show in my answer here.

Project setup

I assume you are using maven and the maven-shade plugin. You will need to reference the hbase-client:

 <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> 

Also remember to pack hbase-site.xml in your topology tray. You can download this file from your cluster and just put it in src/main/resources . I also have one for testing in dev named hbase-site.dev.xml . Then just use the shadow plugin to move it to the root of the can.

 <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4</version> <configuration> <createDependencyReducedPom>true</createDependencyReducedPom> <artifactSet> <excludes> <exclude>classworlds:classworlds</exclude> <exclude>junit:junit</exclude> <exclude>jmock:*</exclude> <exclude>*:xml-apis</exclude> <exclude>org.apache.maven:lib:tests</exclude> <exclude>log4j:log4j:jar:</exclude> <exclude>org.testng:testng</exclude> </excludes> </artifactSet> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer"> <resource>core-site.xml</resource> <file>src/main/resources/core-site.xml</file> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer"> <resource>hbase-site.xml</resource> <file>src/main/resources/hbase-site.xml</file> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer"> <resource>hdfs-site.xml</resource> <file>src/main/resources/hdfs-site.xml</file> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> <exclude>junit/*</exclude> <exclude>webapps/</exclude> <exclude>testng*</exclude> <exclude>*.js</exclude> <exclude>*.png</exclude> <exclude>*.css</exclude> <exclude>*.json</exclude> <exclude>*.csv</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> 

Note. I have strings for other configurations that I use to remove them if you don't need them. As an aside, I don’t really like to pack configurations like this, but ... it simplifies the HBase connection setup and solves a bunch of strange connection errors.

Managing Storm HBase Connections

Update

3/19/2018: The API for HBase has changed a lot since I wrote this answer, but the concepts are the same.

The most important thing is to create one HConnection for each instance of your bolt in the prepare method, and then reuse this connection throughout the life of the bolt!

 Configuration config = HBaseConfiguration.create(); connection = HConnectionManager.createConnection(config); 

To get started, you can do single PUTs in HBase. You can open / close the table each call in this way.

 // single put method private HConnection connection; @SuppressWarnings("rawtypes") @Override public void prepare(java.util.Map stormConf, backtype.storm.task.TopologyContext context) { Configuration config = HBaseConfiguration.create(); connection = HConnectionManager.createConnection(config); } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { try { // do stuff // call putFruit } catch (Exception e) { LOG.error("bolt error", e); collector.reportError(e); } } // example put method you'd call from within execute somewhere private void putFruit(String key, FruitResult data) throws IOException { HTableInterface table = connection.getTable(Constants.TABLE_FRUIT); try { Put p = new Put(key.getBytes()); long ts = data.getTimestamp(); p.add(Constants.FRUIT_FAMILY, Constants.COLOR, ts, data.getColor().getBytes()); p.add(Constants.FRUIT_FAMILY, Constants.SIZE, ts, data.getSize().getBytes()); p.add(Constants.FRUIT_FAMILY, Constants.WEIGHT, ts, Bytes.toBytes(data.getWeight())); table.put(p); } finally { try { table.close(); } finally { // nothing } } } 

Please note that I am reusing the connection here. I recommend starting here because it is easier to work and debug. In the end, it will not scale due to the number of requests you are trying to send over the network, and you will need to run multiple PUTs at the same time.

To package PUTs, you need to open the table using your HConnection, and leave it open. You also need to set Auto Flush to false. This means that the table will automatically load queries until it reaches the size of "hbase.client.write.buffer" (default 2097152).

 // batch put method private static boolean AUTO_FLUSH = false; private static boolean CLEAR_BUFFER_ON_FAIL = false; private HConnection connection; private HTableInterface fruitTable; @SuppressWarnings("rawtypes") @Override public void prepare(java.util.Map stormConf, backtype.storm.task.TopologyContext context) { Configuration config = HBaseConfiguration.create(); connection = HConnectionManager.createConnection(config); fruitTable = connection.getTable(Constants.TABLE_FRUIT); fruitTable.setAutoFlush(AUTO_FLUSH, CLEAR_BUFFER_ON_FAIL); } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { try { // do stuff // call putFruit } catch (Exception e) { LOG.error("bolt error", e); collector.reportError(e); } } // example put method you'd call from within execute somewhere private void putFruit(String key, FruitResult data) throws IOException { Put p = new Put(key.getBytes()); long ts = data.getTimestamp(); p.add(Constants.FRUIT_FAMILY, Constants.COLOR, ts, data.getColor().getBytes()); p.add(Constants.FRUIT_FAMILY, Constants.SIZE, ts, data.getSize().getBytes()); p.add(Constants.FRUIT_FAMILY, Constants.WEIGHT, ts, Bytes.toBytes(data.getWeight())); fruitTable.put(p); } 

In either method, it is still recommended that you try to close your HBase connection in cleanup . Just keep in mind that it cannot be called before your worker is killed.

Other things

  • To do a delete, just do a new Delete(key); instead of Put.

Let me know if you have more questions.

+2
source share

Can you use the publisher stream, for example?

this: there is a separate class that works as a thread that will execute requests to hbase / mysql / elasticsearch / hdfs / etc ... for you. And for performance reasons, this should be done in batches.

  • have a global list for processing parallel operations and an executing service:

     private transient BlockingQueue<Tuple> insertQueue; private transient ExecutorService theExecutor; private transient Future<?> publisherFuture; 
  • has a stream class that will insert documents to you

     private class Publisher implements Runnable { @Override public void run() { long sendBatchTs = System.currentTimeMillis(); while (true){ if(insertQueue.size >100){ // 100 tuples per batch List<Tuple> batchQueue = new ArrayList<>(100); insertQueue.drainTo(batchQueue, 100); // write code to insert the 100 documents sendBatchTs = System.currentTimeMillis(); } else if (System.currentTimeMillis() - sendBatchTs > 5000){ // to prevent tuple timeout int listSize = batchQueue.size(); List<Tuple> batchQueue = new ArrayList<>(listSize); insertQueue.drainTo(batchQueue, listSize); // write code to insert the 100 documents sendBatchTs = System.currentTimeMillis(); } } // your code } } 
  • run the stream class and list in the preparation method

      @Override public void prepare (final Map _conf, final TopologyContext _context , final OutputCollector _collector) { // open your connection insertQueue = new LinkedBlockingQueue<>(); theExecutor = Executors.newSingleThreadExecutor(); publisherFuture = theExecutor.submit(new Publisher()); } 
  • close connection when cleaning

     @Override public void cleanup() { super.cleanup(); theExecutor.shutdown(); publisherFuture.cancel(true); // close your connection } 
  • Collect tuples at runtime

      @Override public void execute(final Tuple _tuple) { insertQueue.add(_tuple); } 
+1
source share

All Articles