Thursday 20 October 2016

HBase Bulk Load Example


AIM: Load data to HBase making use of HBase bulk load feature.

Data Sample:

cat /home/alwin/hbase/input/bulkload/data.csv
1,2,3
12,22,23
312,22,23
42,422,23
5312,522,23


Java Code:


Driver Class:


package com.jaal.hbase.bulkload;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * args[0]: Input (/home/alwin/hbase/input/bulkload/data.csv)
 * args[1]: Output (/home/alwin/hbase/output/bulkload/)
 * args[2]: Table Name (testable)
 * */
public class HBaseBulkLoadDriver {
               public static void main(String[] args) throws Exception {
                              Configuration conf = new Configuration();
                              args = new GenericOptionsParser(conf, args).getRemainingArgs();

                              conf.set("tablename", args[2]);
                             
                              HBaseConfiguration.addHbaseResources(conf);
                             
                              Job job = new Job(conf, "Example - HBase Bulk Load");
                              job.setJarByClass(HBaseBulkLoadMapper.class);

                              job.setMapperClass(HBaseBulkLoadMapper.class);
                              job.setMapOutputKeyClass(ImmutableBytesWritable.class);
                              job.setMapOutputValueClass(KeyValue.class);

                              job.setInputFormatClass(TextInputFormat.class);

                              HTable hTable = new HTable(conf, args[2]);

                              HFileOutputFormat.configureIncrementalLoad(job, hTable);

                              FileInputFormat.addInputPath(job, new Path(args[0]));
                              FileOutputFormat.setOutputPath(job, new Path(args[1]));

                              job.waitForCompletion(true);
               }
}

Mapper Class:


package com.jaal.hbase.bulkload;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class HBaseBulkLoadMapper extends
                                        Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {

                    final static byte[] COLFAM = "cf".getBytes();
                    final static int NUM_FIELDS = 3;

                    String tableName = "";

                    ImmutableBytesWritable hKey = new ImmutableBytesWritable();
                    KeyValue kv;

                    @Override
                    protected void setup(Context context) throws IOException,
                                                            InterruptedException {
                                        Configuration c = context.getConfiguration();

                                        tableName = c.get("tablename");
                    }

                    @Override
                    protected void map(LongWritable key, Text value, Context context)
                                                            throws IOException, InterruptedException {

                                        String[] fields = null;
                                        try {
                                                            fields = value.toString().split(",", -1);
                                        } catch (Exception ex) {
                                                            System.out.println("Error while parsing!");
                                                            return;
                                        }

                                        if (fields.length != NUM_FIELDS) {
                                                            System.out.println("Wrong number of fields.");
                                                            return;
                                        }

                                        hKey.set(fields[0].getBytes());

                                        if (!fields[1].equals("")) {
                                                            kv = new KeyValue(hKey.get(), COLFAM, "col1".getBytes(),
                                                                                                    fields[1].getBytes());
                                                            context.write(hKey, kv);
                                        }

                    }
}

 

Table create in Hbase shell

echo "create 'testable','cf'" | hbase shell

Scanning the table before data load to HBase:

hbase(main):001:0> scan 'testable'
2016-10-20 17:39:36,977 WARN  [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
ROW                                                  COLUMN+CELL                                                                                                                                             
0 row(s) in 0.4930 seconds

Loading HFiles to HBase:

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /home/alwin/hbase/output/bulkload/ testable
Generic command: hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <location_of_hfiles(specify_till_column_family)> <hbase_table_name>


Scanning the table after data load to HBase:

hbase(main):001:0> scan 'testable'
2016-10-20 17:34:02,711 WARN  [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
ROW                                                  COLUMN+CELL                                                                                                                                            
 1                                                   column=cf:col1, timestamp=1477010004504, value=2                                                                                                        
 12                                                  column=cf:col1, timestamp=1477010004504, value=22                                                                                                       
 312                                                 column=cf:col1, timestamp=1477010004504, value=22                                                                                                      
 42                                                  column=cf:col1, timestamp=1477010004504, value=422                                                                                                     
 5312                                                column=cf:col1, timestamp=1477010004504, value=522                                                                                                     

5 row(s) in 0.5560 seconds

No comments:

Post a Comment