HBase Bulk Load Example
AIM: Load data to HBase making use of HBase bulk
load feature.
Data Sample:
cat /home/alwin/hbase/input/bulkload/data.csv
1,2,3
12,22,23
312,22,23
42,422,23
5312,522,23
Java Code:
Driver Class:
package com.jaal.hbase.bulkload;
import
org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import
org.apache.hadoop.hbase.HBaseConfiguration;
import
org.apache.hadoop.hbase.KeyValue;
import
org.apache.hadoop.hbase.client.HTable;
import
org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import
org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.apache.hadoop.util.GenericOptionsParser;
/**
 * args[0]: Input (/home/alwin/hbase/input/bulkload/data.csv)
 * args[1]: Output (/home/alwin/hbase/output/bulkload/)
 * args[2]: Table Name (testable)
 * */
public class HBaseBulkLoadDriver {
               public
static void main(String[] args) throws Exception {
                              Configuration
conf = new Configuration();
                              args
= new GenericOptionsParser(conf, args).getRemainingArgs();
                              conf.set("tablename",
args[2]);
                              HBaseConfiguration.addHbaseResources(conf);
                              Job
job = new Job(conf, "Example - HBase Bulk Load");
                              job.setJarByClass(HBaseBulkLoadMapper.class);
                              job.setMapperClass(HBaseBulkLoadMapper.class);
                              job.setMapOutputKeyClass(ImmutableBytesWritable.class);
                              job.setMapOutputValueClass(KeyValue.class);
                              job.setInputFormatClass(TextInputFormat.class);
                              HTable
hTable = new HTable(conf, args[2]);
                              HFileOutputFormat.configureIncrementalLoad(job,
hTable);
                              FileInputFormat.addInputPath(job,
new Path(args[0]));
                              FileOutputFormat.setOutputPath(job,
new Path(args[1]));
                              job.waitForCompletion(true);
               }
}
Mapper Class:
package com.jaal.hbase.bulkload;
import java.io.IOException;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.hbase.KeyValue;
import
org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import
org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
public class HBaseBulkLoadMapper
extends
                                        Mapper<LongWritable,
Text, ImmutableBytesWritable, KeyValue> {
                    final
static byte[] COLFAM = "cf".getBytes();
                    final
static int NUM_FIELDS = 3;
                    String
tableName = "";
                    ImmutableBytesWritable
hKey = new ImmutableBytesWritable();
                    KeyValue
kv;
                    @Override
                    protected
void setup(Context context) throws IOException,
                                                            InterruptedException
{
                                        Configuration
c = context.getConfiguration();
                                        tableName
= c.get("tablename");
                    }
                    @Override
                    protected
void map(LongWritable key, Text value, Context context)
                                                            throws
IOException, InterruptedException {
                                        String[]
fields = null;
                                        try
{
                                                            fields
= value.toString().split(",", -1);
                                        }
catch (Exception ex) {
                                                            System.out.println("Error
while parsing!");
                                                            return;
                                        }
                                        if
(fields.length != NUM_FIELDS) {
                                                            System.out.println("Wrong
number of fields.");
                                                            return;
                                        }
                                        hKey.set(fields[0].getBytes());
                                        if
(!fields[1].equals("")) {
                                                            kv
= new KeyValue(hKey.get(), COLFAM, "col1".getBytes(),
                                                                                                    fields[1].getBytes());
                                                            context.write(hKey,
kv);
                                        }
                    }
}
Table create in Hbase shell
echo "create
'testable','cf'" | hbase shell
Scanning the table before data load to HBase:
hbase(main):001:0>
scan 'testable'
2016-10-20
17:39:36,977 WARN  [main]
util.NativeCodeLoader: Unable to load native-hadoop library for your
platform... using builtin-java classes where applicable
ROW                                                 
COLUMN+CELL                             
                                                                                                               
0 row(s) in
0.4930 seconds
Loading HFiles to HBase:
hbase
org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
/home/alwin/hbase/output/bulkload/ testable
Generic command: hbase
org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <location_of_hfiles(specify_till_column_family)>
<hbase_table_name>
Scanning the table after data load to HBase:
hbase(main):001:0>
scan 'testable'
2016-10-20
17:34:02,711 WARN  [main]
util.NativeCodeLoader: Unable to load native-hadoop library for your
platform... using builtin-java classes where applicable
ROW                                                 
COLUMN+CELL                                                                                                                                            
 1                                                  
column=cf:col1, timestamp=1477010004504, value=2                                                                                                        
 12                                                 
column=cf:col1, timestamp=1477010004504, value=22                                                        
                                              
 312                                                
column=cf:col1, timestamp=1477010004504, value=22                                                                                                      
 42                                                 
column=cf:col1, timestamp=1477010004504, value=422                                                                                                     
 5312                                               
column=cf:col1, timestamp=1477010004504, value=522                                                                                                     
5 row(s) in
0.5560 seconds
 
No comments:
Post a Comment