Only for HBase

The first time you create an HBase mapreduce and I am unable to delete lines in HBase (trying to run it as a map-only job). The job succeeds and is able to scan the HBase table, and I can get the correct rows in Mapper read from HBase (verified through sysout). However, it seems that the call Delete del = new Delete(row.get())does not actually do anything.

The following is the code I'm trying to run:

HBaseDelete.java

public class HBaseDelete { 
  public static void main(String[] args) throws Exception {

    Configuration config = HBaseConfiguration.create();
    Job job = new Job(config, "log_table");
    job.setJarByClass(HBaseDeleteMapper.class);     

    Scan scan = new Scan();
    scan.setCaching(500);        
    scan.setCacheBlocks(false);

    TableMapReduceUtil.initTableMapperJob("log_table", scan, HBaseDeleteMapper.class, null, null, job);

    job.setOutputFormatClass(NullOutputFormat.class);
    job.setNumReduceTasks(0);

    boolean b = job.waitForCompletion(true);
    if (!b) {
        throw new IOException("error with job!");
    }

  }
}

HBaseDeleteMapper.java

public class HBaseDeleteMapper extends TableMapper<ImmutableBytesWritable, Delete>{
  @Override
  public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
    Delete delete = new Delete(row.get());
    context.write(row, delete);
  }
}

Is there something missing to fix the deletion?

+4
source share
2 answers

You are writing a context, not a table, your cartographer should look somewhat similar to this:

public class HBaseDeleteMapper extends TableMapper<ImmutableBytesWritable, NullWritable>{

    private HTable myTable;

    protected void setup(Context context) throws IOException, InterruptedException {
        /* HTable instance for deletes */
        myTable = new HTable(HBaseConfiguration.create(), "myTable".getBytes());
    }

    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
        myTable.delete(new Delete(row.get())); /* Delete the row from the table */
        //context.write(row, NullWritable.get()); /* Just an output with deleted rows if you need it for something (avoid it if you not) */
    }

    protected void cleanup(Context context) throws IOException, InterruptedException { 
        myTable.close(); /* Close table */
    }

}

, , 1 RPC , . , List<Delete> :

public class HBaseDeleteMapper extends TableMapper<NullWritable, NullWritable>{

    private HTable myTable;
    private List<Delete> deleteList = new ArrayList<Delete>();
    final private int buffer = 10000; /* Buffer size, tune it as desired */

    protected void setup(Context context) throws IOException, InterruptedException {
        /* HTable instance for deletes */
        myTable = new HTable(HBaseConfiguration.create(), "myTable".getBytes());
    }

    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {
        deleteList.add(new Delete(row.get())); /* Add delete to the batch */
        if (deleteList.size()==buffer) {
            myTable.delete(deleteList); /* Submit batch */
            deleteList.clear(); /* Clear batch */
        }
    }

    protected void cleanup(Context context) throws IOException, InterruptedException {
        if (deleteList.size()>0) {
            myTable.delete(deleteList); /* Submit remaining batch */
        }
        myTable.close(); /* Close table */
    }

}
+5

hbase rowkeys, , , > 1000 ( , /). hdfs.

Driver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.oclc.wcsync.hadoop.mapper.HbaseBulkDeleteMapper;
import org.oclc.wcsync.hadoop.util.JobName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

/** doc. */
public class HbaseBulkDelete extends Configured implements Tool{

    /** doc. */
    private static final Logger LOG = LoggerFactory.getLogger(HbaseBulkDelete.class);


    /**
     * doc.
     * @param args ...
     * @throws Exception ...
     */
    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(HBaseConfiguration.create(), new HbaseBulkDelete(), args);
        System.exit(res);
    }

    @Override
    public int run(String[] strings) throws Exception {


        JobName jobName = JobName.HBASE_DELETE;
        LOG.info ("Got into class driver");
        Configuration conf = HBaseConfiguration.create ();
        String env = "prod";
        Properties hadoopProps = new Properties();
        hadoopProps.load(HbaseBulkDelete.class.getResourceAsStream("/hadoop.config." + env + ".properties"));
        conf.set("jobName", jobName.name());
        conf.set ("hbase.master.catalog.timeout","600000");
        conf.set ("hbase.client.scanner.timeout.period","600000");
        conf.set ("hbase.rpc.timeout","6000000");
        conf.set ("mapred.task.timeout","6000000");
        conf.set("mapreduce.map.memory.mb","4096");
        Job job = new Job(conf);
        job.setJobName(jobName.format("HbaseBulkDelete"));
        job.setJarByClass(HbaseBulkDelete.class);
        Scan s = new Scan ();
        s.addFamily(Bytes.toBytes("data"));
        s.setStartRow (Bytes.toBytes ("Your_Substring"));

        TableMapReduceUtil.initTableMapperJob ("Ingest", s, HbaseBulkDeleteMapper.class, TextOutputFormat.class,
                TextOutputFormat.class, job);

        job.setNumReduceTasks(0);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileOutputFormat.setOutputPath(job, new Path("/user/neethu/HbaseBulkDelete"));


        return job.waitForCompletion(true) ? 0 : -1;
    }
}

MAPPER

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class HbaseBulkDeleteMapper extends TableMapper<Text, Text> {
    private static final Logger LOG = LoggerFactory.getLogger(HbaseBulkDeleteMapper.class);
    Configuration conf;
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        conf = context.getConfiguration();

    }
    List<Delete> listOfBatchDelete = new ArrayList<Delete> ();
    @Override
    public void map(ImmutableBytesWritable row, Result values, Context context)
            throws IOException, InterruptedException {
        HTable table= new HTable(conf,"Ingest");
        if (listOfBatchDelete != null && !listOfBatchDelete.isEmpty () && listOfBatchDelete.size () > 1000) {
            LOG.info ("Deleted records!");

            listOfBatchDelete.clear ();
        }
        String KEY=Bytes.toString(values.getRow ());
        try {
            if (KEY.contains ("Your_substring") ){
                LOG.info ("RowKey:"+KEY );
                Delete d=new Delete(Bytes.toBytes(KEY));
                listOfBatchDelete.add(d);
                context.write (new Text ("RowKey"), new Text (KEY));
            }

        } catch (Exception e) {
            LOG.error ("error  ---" + e);
        }
       // table.delete(listOfBatchDelete);
    }
}
+1

All Articles