Thursday, 27 October 2016


HBase Table Replication


How HBase Replication Works?


HBase replication is based on Source-Push methodology. This means master pushes the data asynchronously. This asynchronous method will result in eventual consistency of both the tables. In a busy cluster the slave might lag from the master in order of minutes.

The underlying principle of replication is replaying of WALEntries. WALEdit is an object representing one transaction and can have more than one mutation operations (puts/ deletes), but will have for only one row. Please note that writing to WAL is optional for normal HBase operations, however for replication to work WAL must be enabled.

Before trying out replication, make sure to review the following requirements:
[1] Zookeeper should be handled by yourself, not by HBase, and should always be available during the deployment.
[2] All machines from both clusters should be able to reach every other machine since replication goes from any region server to any other one on the slave cluster. That also includes the Zookeeper clusters.
[3] Both clusters should have the same HBase and Hadoop major revision. For example, having 0.90.1 on the master and 0.90.0 on the slave is correct but not 0.90.1 and 0.89.20100725.
[4] Every table that contains families that are scoped for replication should exist on every cluster with the exact same name, same for those replicated families.
[5] For multiple slaves, Master/Master, or cyclic replication version 0.92 or greater is needed.
Also, if both source and destination cluster uses same zookeeper quorum, then make sure that they use a different 'zookeeper.znode.parent' znode.

Different modes of replication:


[1] Master - Slave (Single direction)
[2] Master - Master (Bi-directional)
[3] Cyclic - more than 2 cluster in picture. Can have various combination of above two.

Enabling replication:


MODE: Master - Slave


Changes to be made on Master: (Make sure HBase table to be replicated exists in both the cluster)
[1] Edit ${HBASE_HOME}/conf/hbase-site.xml on both clusters and add the following:
<property>
   <name>hbase.replication</name>
   <value>true</value>
</property>
[2] Push hbase-site.xml to all nodes.
[3] Restart hbase
[4] Run the following command in the HBase master's shell while it's running:
add_peer '<n>', "slave.zookeeper.quorum:zookeeper.clientport.:zookeeper.znode.parent"
[5] Once you have a peer, enable replication on your column families. One way to do this is to alter the table and set the scope like this:
disable 'your_table'
alter 'your_table', {NAME => 'family_name', REPLICATION_SCOPE => '1'}
enable 'your_table'

MODE: Master – Master


Make the above changes in both the clusters.

Verifying whether data is replicated or not:


VerifyReplication MR job - runs on the master node and we need to specify the peer id.
$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath`
"${HADOOP_HOME}/bin/hadoop" jar "${HBASE_HOME}/hbase-server-VERSION.jar"
verifyrep --starttime=<timestamp> --stoptime=<timestamp> --families=<myFam> <ID> <tableName>
The VerifyReplication command prints out GOODROWS and BADROWS counters to indicate rows that did and did not replicate correctly.

Example:
hadoop jar /opt/mapr/hbase/hbase-1.1.1/lib/hbase-server-1.1.1-mapr-1602.jar verifyrep --starttime=1477346187024 --stoptime 1478346187024 --families=c 1 hreplica

Also, the status of replication can be viewed from the HBase shell using 'status' command.

hbase(main):001:0> status 'replication'
version 1.1.1-mapr-1602
1 live servers
    m51-d16-2:
       SOURCE: PeerID=1, AgeOfLastShippedOp=0, SizeOfLogQueue=0, TimeStampsOfLastShippedOp=Mon Oct 24 22:53:04 EDT 2016, Replication Lag=0
       SINK  : AgeOfLastAppliedOp=0, TimeStampsOfLastAppliedOp=Wed Oct 26 19:16:57 EDT 2016
MapR cluster status can be viewed using the 'maprcli dashboard info' command or the UI.

Common errors and their reasons:


[A] ERROR when host details are not added in source cluster:

2016-10-24 17:56:32,632 WARN  [main-EventThread.replicationSource,1] regionserver.HBaseInterClusterReplicationEndpoint: Can't replicate because of a local or network error:
java.net.UnknownHostException: unknown host: m51-d16-2
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.<init>(RpcClientImpl.java:301)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl.createConnection(RpcClientImpl.java:131)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl.getConnection(RpcClientImpl.java:1286)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1164)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:213)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:287)
        at org.apache.hadoop.hbase.protobuf.generated.AdminProtos$AdminService$BlockingStub.replicateWALEntry(AdminProtos.java:23209)
        at org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil.replicateWALEntry(ReplicationProtbufUtil.java:65)
        at org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint.replicate(HBaseInterClusterReplicationEndpoint.java:161)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.shipEdits(ReplicationSource.java:694)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.run(ReplicationSource.java:406)

[B] ERROR while the destination cluster is down:

2016-10-24 17:59:53,040 WARN  [main-EventThread.replicationSource,1] regionserver.HBaseInterClusterReplicationEndpoint: Can't replicate because of a local or network error:
java.io.IOException: No replication sinks are available
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.getReplicationSink(ReplicationSinkManager.java:115)
        at org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint.replicate(HBaseInterClusterReplicationEndpoint.java:155)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.shipEdits(ReplicationSource.java:694)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.run(ReplicationSource.java:406)

[C] ERROR when table does not exist in destination cluster as in source cluster:
2016-10-24 17:59:16,036 WARN  [main-EventThread.replicationSource,1] regionserver.HBaseInterClusterReplicationEndpoint: Can't replicate because of an error on the remote cluster:
org.apache.hadoop.hbase.ipc.RemoteWithExtrasException(org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException): org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: Table 'hreplica' was not found, got: hbase:namespace.: 1 time,
        at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:229)
        at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access$1700(AsyncProcess.java:209)
        at org.apache.hadoop.hbase.client.AsyncProcess$AsyncRequestFutureImpl.getErrors(AsyncProcess.java:1595)
        at org.apache.hadoop.hbase.client.HTable.batch(HTable.java:1185)
        at org.apache.hadoop.hbase.client.HTable.batch(HTable.java:1202)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSink.batch(ReplicationSink.java:236)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSink.replicateEntries(ReplicationSink.java:160)
        at org.apache.hadoop.hbase.replication.regionserver.Replication.replicateLogEntries(Replication.java:198)
        at org.apache.hadoop.hbase.regionserver.RSRpcServices.replicateWALEntry(RSRpcServices.java:1708)
        at org.apache.hadoop.hbase.protobuf.generated.AdminProtos$AdminService$2.callBlockingMethod(AdminProtos.java:22253)
        at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2114)
        at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:101)
        at org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:130)
        at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:107)
        at java.lang.Thread.run(Thread.java:745)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1208)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:213)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:287)
        at org.apache.hadoop.hbase.protobuf.generated.AdminProtos$AdminService$BlockingStub.replicateWALEntry(AdminProtos.java:23209)
        at org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil.replicateWALEntry(ReplicationProtbufUtil.java:65)
        at org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint.replicate(HBaseInterClusterReplicationEndpoint.java:161)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.shipEdits(ReplicationSource.java:694)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.run(ReplicationSource.java:406)

 

[D] ERROR when regionserver is down/ table is disabled:

2016-10-24 18:19:28,877 WARN  [main-EventThread.replicationSource,1] regionserver.HBaseInterClusterReplicationEndpoint: Can't replicate because of an error on the remote cluster:
org.apache.hadoop.hbase.ipc.RemoteWithExtrasException(org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException): org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: NotServingRegionException: 1 time,
        at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:229)
        at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access$1700(AsyncProcess.java:209)
        at org.apache.hadoop.hbase.client.AsyncProcess$AsyncRequestFutureImpl.getErrors(AsyncProcess.java:1595)
        at org.apache.hadoop.hbase.client.HTable.batch(HTable.java:1185)
        at org.apache.hadoop.hbase.client.HTable.batch(HTable.java:1202)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSink.batch(ReplicationSink.java:236)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSink.replicateEntries(ReplicationSink.java:160)
        at org.apache.hadoop.hbase.replication.regionserver.Replication.replicateLogEntries(Replication.java:198)
        at org.apache.hadoop.hbase.regionserver.RSRpcServices.replicateWALEntry(RSRpcServices.java:1708)
        at org.apache.hadoop.hbase.protobuf.generated.AdminProtos$AdminService$2.callBlockingMethod(AdminProtos.java:22253)
        at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2114)
        at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:101)
        at org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:130)
        at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:107)
        at java.lang.Thread.run(Thread.java:745)

        at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1208)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:213)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:287)
        at org.apache.hadoop.hbase.protobuf.generated.AdminProtos$AdminService$BlockingStub.replicateWALEntry(AdminProtos.java:23209)
        at org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil.replicateWALEntry(ReplicationProtbufUtil.java:65)
        at org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint.replicate(HBaseInterClusterReplicationEndpoint.java:161)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.shipEdits(ReplicationSource.java:694)

        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.run(ReplicationSource.java:406)

Thursday, 20 October 2016

HBase Bulk Load Example


AIM: Load data to HBase making use of HBase bulk load feature.

Data Sample:

cat /home/alwin/hbase/input/bulkload/data.csv
1,2,3
12,22,23
312,22,23
42,422,23
5312,522,23


Java Code:


Driver Class:


package com.jaal.hbase.bulkload;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * args[0]: Input (/home/alwin/hbase/input/bulkload/data.csv)
 * args[1]: Output (/home/alwin/hbase/output/bulkload/)
 * args[2]: Table Name (testable)
 * */
public class HBaseBulkLoadDriver {
               public static void main(String[] args) throws Exception {
                              Configuration conf = new Configuration();
                              args = new GenericOptionsParser(conf, args).getRemainingArgs();

                              conf.set("tablename", args[2]);
                             
                              HBaseConfiguration.addHbaseResources(conf);
                             
                              Job job = new Job(conf, "Example - HBase Bulk Load");
                              job.setJarByClass(HBaseBulkLoadMapper.class);

                              job.setMapperClass(HBaseBulkLoadMapper.class);
                              job.setMapOutputKeyClass(ImmutableBytesWritable.class);
                              job.setMapOutputValueClass(KeyValue.class);

                              job.setInputFormatClass(TextInputFormat.class);

                              HTable hTable = new HTable(conf, args[2]);

                              HFileOutputFormat.configureIncrementalLoad(job, hTable);

                              FileInputFormat.addInputPath(job, new Path(args[0]));
                              FileOutputFormat.setOutputPath(job, new Path(args[1]));

                              job.waitForCompletion(true);
               }
}

Mapper Class:


package com.jaal.hbase.bulkload;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class HBaseBulkLoadMapper extends
                                        Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {

                    final static byte[] COLFAM = "cf".getBytes();
                    final static int NUM_FIELDS = 3;

                    String tableName = "";

                    ImmutableBytesWritable hKey = new ImmutableBytesWritable();
                    KeyValue kv;

                    @Override
                    protected void setup(Context context) throws IOException,
                                                            InterruptedException {
                                        Configuration c = context.getConfiguration();

                                        tableName = c.get("tablename");
                    }

                    @Override
                    protected void map(LongWritable key, Text value, Context context)
                                                            throws IOException, InterruptedException {

                                        String[] fields = null;
                                        try {
                                                            fields = value.toString().split(",", -1);
                                        } catch (Exception ex) {
                                                            System.out.println("Error while parsing!");
                                                            return;
                                        }

                                        if (fields.length != NUM_FIELDS) {
                                                            System.out.println("Wrong number of fields.");
                                                            return;
                                        }

                                        hKey.set(fields[0].getBytes());

                                        if (!fields[1].equals("")) {
                                                            kv = new KeyValue(hKey.get(), COLFAM, "col1".getBytes(),
                                                                                                    fields[1].getBytes());
                                                            context.write(hKey, kv);
                                        }

                    }
}

 

Table create in Hbase shell

echo "create 'testable','cf'" | hbase shell

Scanning the table before data load to HBase:

hbase(main):001:0> scan 'testable'
2016-10-20 17:39:36,977 WARN  [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
ROW                                                  COLUMN+CELL                                                                                                                                             
0 row(s) in 0.4930 seconds

Loading HFiles to HBase:

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /home/alwin/hbase/output/bulkload/ testable
Generic command: hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <location_of_hfiles(specify_till_column_family)> <hbase_table_name>


Scanning the table after data load to HBase:

hbase(main):001:0> scan 'testable'
2016-10-20 17:34:02,711 WARN  [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
ROW                                                  COLUMN+CELL                                                                                                                                            
 1                                                   column=cf:col1, timestamp=1477010004504, value=2                                                                                                        
 12                                                  column=cf:col1, timestamp=1477010004504, value=22                                                                                                       
 312                                                 column=cf:col1, timestamp=1477010004504, value=22                                                                                                      
 42                                                  column=cf:col1, timestamp=1477010004504, value=422                                                                                                     
 5312                                                column=cf:col1, timestamp=1477010004504, value=522                                                                                                     

5 row(s) in 0.5560 seconds