HBase Bulk Load Example
AIM: Load data to HBase making use of HBase bulk
load feature.
Data Sample:
cat /home/alwin/hbase/input/bulkload/data.csv
1,2,3
12,22,23
312,22,23
42,422,23
5312,522,23
Java Code:
Driver Class:
package com.jaal.hbase.bulkload;
import
org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import
org.apache.hadoop.hbase.HBaseConfiguration;
import
org.apache.hadoop.hbase.KeyValue;
import
org.apache.hadoop.hbase.client.HTable;
import
org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import
org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import
org.apache.hadoop.mapreduce.Job;
import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import
org.apache.hadoop.util.GenericOptionsParser;
/**
* args[0]: Input (/home/alwin/hbase/input/bulkload/data.csv)
* args[1]: Output (/home/alwin/hbase/output/bulkload/)
* args[2]: Table Name (testable)
* */
public class HBaseBulkLoadDriver {
public
static void main(String[] args) throws Exception {
Configuration
conf = new Configuration();
args
= new GenericOptionsParser(conf, args).getRemainingArgs();
conf.set("tablename",
args[2]);
HBaseConfiguration.addHbaseResources(conf);
Job
job = new Job(conf, "Example - HBase Bulk Load");
job.setJarByClass(HBaseBulkLoadMapper.class);
job.setMapperClass(HBaseBulkLoadMapper.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(KeyValue.class);
job.setInputFormatClass(TextInputFormat.class);
HTable
hTable = new HTable(conf, args[2]);
HFileOutputFormat.configureIncrementalLoad(job,
hTable);
FileInputFormat.addInputPath(job,
new Path(args[0]));
FileOutputFormat.setOutputPath(job,
new Path(args[1]));
job.waitForCompletion(true);
}
}
Mapper Class:
package com.jaal.hbase.bulkload;
import java.io.IOException;
import
org.apache.hadoop.conf.Configuration;
import
org.apache.hadoop.hbase.KeyValue;
import
org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import
org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import
org.apache.hadoop.mapreduce.Mapper;
public class HBaseBulkLoadMapper
extends
Mapper<LongWritable,
Text, ImmutableBytesWritable, KeyValue> {
final
static byte[] COLFAM = "cf".getBytes();
final
static int NUM_FIELDS = 3;
String
tableName = "";
ImmutableBytesWritable
hKey = new ImmutableBytesWritable();
KeyValue
kv;
@Override
protected
void setup(Context context) throws IOException,
InterruptedException
{
Configuration
c = context.getConfiguration();
tableName
= c.get("tablename");
}
@Override
protected
void map(LongWritable key, Text value, Context context)
throws
IOException, InterruptedException {
String[]
fields = null;
try
{
fields
= value.toString().split(",", -1);
}
catch (Exception ex) {
System.out.println("Error
while parsing!");
return;
}
if
(fields.length != NUM_FIELDS) {
System.out.println("Wrong
number of fields.");
return;
}
hKey.set(fields[0].getBytes());
if
(!fields[1].equals("")) {
kv
= new KeyValue(hKey.get(), COLFAM, "col1".getBytes(),
fields[1].getBytes());
context.write(hKey,
kv);
}
}
}
Table create in Hbase shell
echo "create
'testable','cf'" | hbase shell
Scanning the table before data load to HBase:
hbase(main):001:0>
scan 'testable'
2016-10-20
17:39:36,977 WARN [main]
util.NativeCodeLoader: Unable to load native-hadoop library for your
platform... using builtin-java classes where applicable
ROW
COLUMN+CELL
0 row(s) in
0.4930 seconds
Loading HFiles to HBase:
hbase
org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
/home/alwin/hbase/output/bulkload/ testable
Generic command: hbase
org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <location_of_hfiles(specify_till_column_family)>
<hbase_table_name>
Scanning the table after data load to HBase:
hbase(main):001:0>
scan 'testable'
2016-10-20
17:34:02,711 WARN [main]
util.NativeCodeLoader: Unable to load native-hadoop library for your
platform... using builtin-java classes where applicable
ROW
COLUMN+CELL
1
column=cf:col1, timestamp=1477010004504, value=2
12
column=cf:col1, timestamp=1477010004504, value=22
312
column=cf:col1, timestamp=1477010004504, value=22
42
column=cf:col1, timestamp=1477010004504, value=422
5312
column=cf:col1, timestamp=1477010004504, value=522
5 row(s) in
0.5560 seconds
No comments:
Post a Comment