Perhaps the HBase CopyTable MapReduce job uses TableMapReduceUtil.initTableReducerJob() , which allows you to set an alternative quorumAddress in case you need to write to remote clusters:
public static void initTableReducerJob(String table, Class<? extends TableReducer> reducer, org.apache.hadoop.mapreduce.Job job, Class partitioner, String quorumAddress, String serverClass, String serverImpl)
quorumAddress - remote cluster for recording; default - null for output to the cluster specified in hbase-site.xml. Set this String to the zookeeper ensemble of the alternate remote cluster when you have a cluster record reduction other than the default; for example, copying tables between clusters, the source would be hbase-site.xml, and this parameter would have the ensemble address of the remote cluster. The format to be transmitted is private. Pass :: such as server, server2, server3: 2181: / hbase.
Another option is to implement your own custom reducer for writing to the remote table instead of writing to the context. Something like this:
public static class MyReducer extends Reducer<Text, Result, Text, Text> { protected Table remoteTable; protected Connection connection; @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); // Clone configuration and provide a new quorum address for the remote cluster Configuration config = HBaseConfiguration.create(context.getConfiguration()); config.set("hbase.zookeeper.quorum","quorum1,quorum2,quorum3"); connection = ConnectionFactory.createConnection(config); // HBase 0.99+ //connection = HConnectionManager.createConnection(config); // HBase <0.99 remoteTable = connection.getTable("myTable".getBytes()); remoteTable.setAutoFlush(false); remoteTable.setWriteBufferSize(1024L*1024L*10L); // 10MB buffer } public void reduce(Text boardKey, Iterable<Result> results, Context context) throws IOException, InterruptedException { /* Write puts to remoteTable */ } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); if (remoteTable!=null) { remoteTable.flushCommits(); remoteTable.close(); } if(connection!=null) { connection.close(); } } }
Rubén moraleda
source share