Oh boy, my time is shining! I had to spend a ton of optimization on HBase from Storm, so hopefully this helps you.
If you're just starting out storm-hbase is a great way to start streaming data conversion to hbase. You can simply clone the project, perform the maven installation, and then reference it in your topology.
However, if you start to get more complex logic, then creating your own classes for communicating with HBase is probably the way out. This is what I will show in my answer here.
Project setup
I assume you are using maven and the maven-shade plugin. You will need to reference the hbase-client:
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency>
Also remember to pack hbase-site.xml in your topology tray. You can download this file from your cluster and just put it in src/main/resources . I also have one for testing in dev named hbase-site.dev.xml . Then just use the shadow plugin to move it to the root of the can.
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4</version> <configuration> <createDependencyReducedPom>true</createDependencyReducedPom> <artifactSet> <excludes> <exclude>classworlds:classworlds</exclude> <exclude>junit:junit</exclude> <exclude>jmock:*</exclude> <exclude>*:xml-apis</exclude> <exclude>org.apache.maven:lib:tests</exclude> <exclude>log4j:log4j:jar:</exclude> <exclude>org.testng:testng</exclude> </excludes> </artifactSet> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer"> <resource>core-site.xml</resource> <file>src/main/resources/core-site.xml</file> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer"> <resource>hbase-site.xml</resource> <file>src/main/resources/hbase-site.xml</file> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.IncludeResourceTransformer"> <resource>hdfs-site.xml</resource> <file>src/main/resources/hdfs-site.xml</file> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" /> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> <exclude>junit/*</exclude> <exclude>webapps/</exclude> <exclude>testng*</exclude> <exclude>*.js</exclude> <exclude>*.png</exclude> <exclude>*.css</exclude> <exclude>*.json</exclude> <exclude>*.csv</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin>
Note. I have strings for other configurations that I use to remove them if you don't need them. As an aside, I donβt really like to pack configurations like this, but ... it simplifies the HBase connection setup and solves a bunch of strange connection errors.
Managing Storm HBase Connections
Update
3/19/2018: The API for HBase has changed a lot since I wrote this answer, but the concepts are the same.
The most important thing is to create one HConnection for each instance of your bolt in the prepare method, and then reuse this connection throughout the life of the bolt!
Configuration config = HBaseConfiguration.create(); connection = HConnectionManager.createConnection(config);
To get started, you can do single PUTs in HBase. You can open / close the table each call in this way.
// single put method private HConnection connection; @SuppressWarnings("rawtypes") @Override public void prepare(java.util.Map stormConf, backtype.storm.task.TopologyContext context) { Configuration config = HBaseConfiguration.create(); connection = HConnectionManager.createConnection(config); } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { try { // do stuff // call putFruit } catch (Exception e) { LOG.error("bolt error", e); collector.reportError(e); } } // example put method you'd call from within execute somewhere private void putFruit(String key, FruitResult data) throws IOException { HTableInterface table = connection.getTable(Constants.TABLE_FRUIT); try { Put p = new Put(key.getBytes()); long ts = data.getTimestamp(); p.add(Constants.FRUIT_FAMILY, Constants.COLOR, ts, data.getColor().getBytes()); p.add(Constants.FRUIT_FAMILY, Constants.SIZE, ts, data.getSize().getBytes()); p.add(Constants.FRUIT_FAMILY, Constants.WEIGHT, ts, Bytes.toBytes(data.getWeight())); table.put(p); } finally { try { table.close(); } finally { // nothing } } }
Please note that I am reusing the connection here. I recommend starting here because it is easier to work and debug. In the end, it will not scale due to the number of requests you are trying to send over the network, and you will need to run multiple PUTs at the same time.
To package PUTs, you need to open the table using your HConnection, and leave it open. You also need to set Auto Flush to false. This means that the table will automatically load queries until it reaches the size of "hbase.client.write.buffer" (default 2097152).
// batch put method private static boolean AUTO_FLUSH = false; private static boolean CLEAR_BUFFER_ON_FAIL = false; private HConnection connection; private HTableInterface fruitTable; @SuppressWarnings("rawtypes") @Override public void prepare(java.util.Map stormConf, backtype.storm.task.TopologyContext context) { Configuration config = HBaseConfiguration.create(); connection = HConnectionManager.createConnection(config); fruitTable = connection.getTable(Constants.TABLE_FRUIT); fruitTable.setAutoFlush(AUTO_FLUSH, CLEAR_BUFFER_ON_FAIL); } @Override public void execute(Tuple tuple, BasicOutputCollector collector) { try { // do stuff // call putFruit } catch (Exception e) { LOG.error("bolt error", e); collector.reportError(e); } } // example put method you'd call from within execute somewhere private void putFruit(String key, FruitResult data) throws IOException { Put p = new Put(key.getBytes()); long ts = data.getTimestamp(); p.add(Constants.FRUIT_FAMILY, Constants.COLOR, ts, data.getColor().getBytes()); p.add(Constants.FRUIT_FAMILY, Constants.SIZE, ts, data.getSize().getBytes()); p.add(Constants.FRUIT_FAMILY, Constants.WEIGHT, ts, Bytes.toBytes(data.getWeight())); fruitTable.put(p); }
In either method, it is still recommended that you try to close your HBase connection in cleanup . Just keep in mind that it cannot be called before your worker is killed.
Other things
- To do a delete, just do a
new Delete(key); instead of Put.
Let me know if you have more questions.