Thursday, 27 October 2016


HBase Table Replication


How HBase Replication Works?


HBase replication is based on Source-Push methodology. This means master pushes the data asynchronously. This asynchronous method will result in eventual consistency of both the tables. In a busy cluster the slave might lag from the master in order of minutes.

The underlying principle of replication is replaying of WALEntries. WALEdit is an object representing one transaction and can have more than one mutation operations (puts/ deletes), but will have for only one row. Please note that writing to WAL is optional for normal HBase operations, however for replication to work WAL must be enabled.

Before trying out replication, make sure to review the following requirements:
[1] Zookeeper should be handled by yourself, not by HBase, and should always be available during the deployment.
[2] All machines from both clusters should be able to reach every other machine since replication goes from any region server to any other one on the slave cluster. That also includes the Zookeeper clusters.
[3] Both clusters should have the same HBase and Hadoop major revision. For example, having 0.90.1 on the master and 0.90.0 on the slave is correct but not 0.90.1 and 0.89.20100725.
[4] Every table that contains families that are scoped for replication should exist on every cluster with the exact same name, same for those replicated families.
[5] For multiple slaves, Master/Master, or cyclic replication version 0.92 or greater is needed.
Also, if both source and destination cluster uses same zookeeper quorum, then make sure that they use a different 'zookeeper.znode.parent' znode.

Different modes of replication:


[1] Master - Slave (Single direction)
[2] Master - Master (Bi-directional)
[3] Cyclic - more than 2 cluster in picture. Can have various combination of above two.

Enabling replication:


MODE: Master - Slave


Changes to be made on Master: (Make sure HBase table to be replicated exists in both the cluster)
[1] Edit ${HBASE_HOME}/conf/hbase-site.xml on both clusters and add the following:
<property>
   <name>hbase.replication</name>
   <value>true</value>
</property>
[2] Push hbase-site.xml to all nodes.
[3] Restart hbase
[4] Run the following command in the HBase master's shell while it's running:
add_peer '<n>', "slave.zookeeper.quorum:zookeeper.clientport.:zookeeper.znode.parent"
[5] Once you have a peer, enable replication on your column families. One way to do this is to alter the table and set the scope like this:
disable 'your_table'
alter 'your_table', {NAME => 'family_name', REPLICATION_SCOPE => '1'}
enable 'your_table'

MODE: Master – Master


Make the above changes in both the clusters.

Verifying whether data is replicated or not:


VerifyReplication MR job - runs on the master node and we need to specify the peer id.
$ HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase classpath`
"${HADOOP_HOME}/bin/hadoop" jar "${HBASE_HOME}/hbase-server-VERSION.jar"
verifyrep --starttime=<timestamp> --stoptime=<timestamp> --families=<myFam> <ID> <tableName>
The VerifyReplication command prints out GOODROWS and BADROWS counters to indicate rows that did and did not replicate correctly.

Example:
hadoop jar /opt/mapr/hbase/hbase-1.1.1/lib/hbase-server-1.1.1-mapr-1602.jar verifyrep --starttime=1477346187024 --stoptime 1478346187024 --families=c 1 hreplica

Also, the status of replication can be viewed from the HBase shell using 'status' command.

hbase(main):001:0> status 'replication'
version 1.1.1-mapr-1602
1 live servers
    m51-d16-2:
       SOURCE: PeerID=1, AgeOfLastShippedOp=0, SizeOfLogQueue=0, TimeStampsOfLastShippedOp=Mon Oct 24 22:53:04 EDT 2016, Replication Lag=0
       SINK  : AgeOfLastAppliedOp=0, TimeStampsOfLastAppliedOp=Wed Oct 26 19:16:57 EDT 2016
MapR cluster status can be viewed using the 'maprcli dashboard info' command or the UI.

Common errors and their reasons:


[A] ERROR when host details are not added in source cluster:

2016-10-24 17:56:32,632 WARN  [main-EventThread.replicationSource,1] regionserver.HBaseInterClusterReplicationEndpoint: Can't replicate because of a local or network error:
java.net.UnknownHostException: unknown host: m51-d16-2
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.<init>(RpcClientImpl.java:301)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl.createConnection(RpcClientImpl.java:131)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl.getConnection(RpcClientImpl.java:1286)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1164)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:213)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:287)
        at org.apache.hadoop.hbase.protobuf.generated.AdminProtos$AdminService$BlockingStub.replicateWALEntry(AdminProtos.java:23209)
        at org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil.replicateWALEntry(ReplicationProtbufUtil.java:65)
        at org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint.replicate(HBaseInterClusterReplicationEndpoint.java:161)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.shipEdits(ReplicationSource.java:694)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.run(ReplicationSource.java:406)

[B] ERROR while the destination cluster is down:

2016-10-24 17:59:53,040 WARN  [main-EventThread.replicationSource,1] regionserver.HBaseInterClusterReplicationEndpoint: Can't replicate because of a local or network error:
java.io.IOException: No replication sinks are available
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.getReplicationSink(ReplicationSinkManager.java:115)
        at org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint.replicate(HBaseInterClusterReplicationEndpoint.java:155)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.shipEdits(ReplicationSource.java:694)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.run(ReplicationSource.java:406)

[C] ERROR when table does not exist in destination cluster as in source cluster:
2016-10-24 17:59:16,036 WARN  [main-EventThread.replicationSource,1] regionserver.HBaseInterClusterReplicationEndpoint: Can't replicate because of an error on the remote cluster:
org.apache.hadoop.hbase.ipc.RemoteWithExtrasException(org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException): org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: Table 'hreplica' was not found, got: hbase:namespace.: 1 time,
        at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:229)
        at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access$1700(AsyncProcess.java:209)
        at org.apache.hadoop.hbase.client.AsyncProcess$AsyncRequestFutureImpl.getErrors(AsyncProcess.java:1595)
        at org.apache.hadoop.hbase.client.HTable.batch(HTable.java:1185)
        at org.apache.hadoop.hbase.client.HTable.batch(HTable.java:1202)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSink.batch(ReplicationSink.java:236)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSink.replicateEntries(ReplicationSink.java:160)
        at org.apache.hadoop.hbase.replication.regionserver.Replication.replicateLogEntries(Replication.java:198)
        at org.apache.hadoop.hbase.regionserver.RSRpcServices.replicateWALEntry(RSRpcServices.java:1708)
        at org.apache.hadoop.hbase.protobuf.generated.AdminProtos$AdminService$2.callBlockingMethod(AdminProtos.java:22253)
        at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2114)
        at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:101)
        at org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:130)
        at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:107)
        at java.lang.Thread.run(Thread.java:745)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1208)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:213)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:287)
        at org.apache.hadoop.hbase.protobuf.generated.AdminProtos$AdminService$BlockingStub.replicateWALEntry(AdminProtos.java:23209)
        at org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil.replicateWALEntry(ReplicationProtbufUtil.java:65)
        at org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint.replicate(HBaseInterClusterReplicationEndpoint.java:161)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.shipEdits(ReplicationSource.java:694)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.run(ReplicationSource.java:406)

 

[D] ERROR when regionserver is down/ table is disabled:

2016-10-24 18:19:28,877 WARN  [main-EventThread.replicationSource,1] regionserver.HBaseInterClusterReplicationEndpoint: Can't replicate because of an error on the remote cluster:
org.apache.hadoop.hbase.ipc.RemoteWithExtrasException(org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException): org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: NotServingRegionException: 1 time,
        at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:229)
        at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access$1700(AsyncProcess.java:209)
        at org.apache.hadoop.hbase.client.AsyncProcess$AsyncRequestFutureImpl.getErrors(AsyncProcess.java:1595)
        at org.apache.hadoop.hbase.client.HTable.batch(HTable.java:1185)
        at org.apache.hadoop.hbase.client.HTable.batch(HTable.java:1202)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSink.batch(ReplicationSink.java:236)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSink.replicateEntries(ReplicationSink.java:160)
        at org.apache.hadoop.hbase.replication.regionserver.Replication.replicateLogEntries(Replication.java:198)
        at org.apache.hadoop.hbase.regionserver.RSRpcServices.replicateWALEntry(RSRpcServices.java:1708)
        at org.apache.hadoop.hbase.protobuf.generated.AdminProtos$AdminService$2.callBlockingMethod(AdminProtos.java:22253)
        at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2114)
        at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:101)
        at org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:130)
        at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:107)
        at java.lang.Thread.run(Thread.java:745)

        at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1208)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:213)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:287)
        at org.apache.hadoop.hbase.protobuf.generated.AdminProtos$AdminService$BlockingStub.replicateWALEntry(AdminProtos.java:23209)
        at org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil.replicateWALEntry(ReplicationProtbufUtil.java:65)
        at org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint.replicate(HBaseInterClusterReplicationEndpoint.java:161)
        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.shipEdits(ReplicationSource.java:694)

        at org.apache.hadoop.hbase.replication.regionserver.ReplicationSource.run(ReplicationSource.java:406)

Thursday, 20 October 2016

HBase Bulk Load Example


AIM: Load data to HBase making use of HBase bulk load feature.

Data Sample:

cat /home/alwin/hbase/input/bulkload/data.csv
1,2,3
12,22,23
312,22,23
42,422,23
5312,522,23


Java Code:


Driver Class:


package com.jaal.hbase.bulkload;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * args[0]: Input (/home/alwin/hbase/input/bulkload/data.csv)
 * args[1]: Output (/home/alwin/hbase/output/bulkload/)
 * args[2]: Table Name (testable)
 * */
public class HBaseBulkLoadDriver {
               public static void main(String[] args) throws Exception {
                              Configuration conf = new Configuration();
                              args = new GenericOptionsParser(conf, args).getRemainingArgs();

                              conf.set("tablename", args[2]);
                             
                              HBaseConfiguration.addHbaseResources(conf);
                             
                              Job job = new Job(conf, "Example - HBase Bulk Load");
                              job.setJarByClass(HBaseBulkLoadMapper.class);

                              job.setMapperClass(HBaseBulkLoadMapper.class);
                              job.setMapOutputKeyClass(ImmutableBytesWritable.class);
                              job.setMapOutputValueClass(KeyValue.class);

                              job.setInputFormatClass(TextInputFormat.class);

                              HTable hTable = new HTable(conf, args[2]);

                              HFileOutputFormat.configureIncrementalLoad(job, hTable);

                              FileInputFormat.addInputPath(job, new Path(args[0]));
                              FileOutputFormat.setOutputPath(job, new Path(args[1]));

                              job.waitForCompletion(true);
               }
}

Mapper Class:


package com.jaal.hbase.bulkload;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class HBaseBulkLoadMapper extends
                                        Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue> {

                    final static byte[] COLFAM = "cf".getBytes();
                    final static int NUM_FIELDS = 3;

                    String tableName = "";

                    ImmutableBytesWritable hKey = new ImmutableBytesWritable();
                    KeyValue kv;

                    @Override
                    protected void setup(Context context) throws IOException,
                                                            InterruptedException {
                                        Configuration c = context.getConfiguration();

                                        tableName = c.get("tablename");
                    }

                    @Override
                    protected void map(LongWritable key, Text value, Context context)
                                                            throws IOException, InterruptedException {

                                        String[] fields = null;
                                        try {
                                                            fields = value.toString().split(",", -1);
                                        } catch (Exception ex) {
                                                            System.out.println("Error while parsing!");
                                                            return;
                                        }

                                        if (fields.length != NUM_FIELDS) {
                                                            System.out.println("Wrong number of fields.");
                                                            return;
                                        }

                                        hKey.set(fields[0].getBytes());

                                        if (!fields[1].equals("")) {
                                                            kv = new KeyValue(hKey.get(), COLFAM, "col1".getBytes(),
                                                                                                    fields[1].getBytes());
                                                            context.write(hKey, kv);
                                        }

                    }
}

 

Table create in Hbase shell

echo "create 'testable','cf'" | hbase shell

Scanning the table before data load to HBase:

hbase(main):001:0> scan 'testable'
2016-10-20 17:39:36,977 WARN  [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
ROW                                                  COLUMN+CELL                                                                                                                                             
0 row(s) in 0.4930 seconds

Loading HFiles to HBase:

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /home/alwin/hbase/output/bulkload/ testable
Generic command: hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <location_of_hfiles(specify_till_column_family)> <hbase_table_name>


Scanning the table after data load to HBase:

hbase(main):001:0> scan 'testable'
2016-10-20 17:34:02,711 WARN  [main] util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
ROW                                                  COLUMN+CELL                                                                                                                                            
 1                                                   column=cf:col1, timestamp=1477010004504, value=2                                                                                                        
 12                                                  column=cf:col1, timestamp=1477010004504, value=22                                                                                                       
 312                                                 column=cf:col1, timestamp=1477010004504, value=22                                                                                                      
 42                                                  column=cf:col1, timestamp=1477010004504, value=422                                                                                                     
 5312                                                column=cf:col1, timestamp=1477010004504, value=522                                                                                                     

5 row(s) in 0.5560 seconds

Tuesday, 27 September 2016

Running maven build HBase application in MapR


Step by step guide to build a simple HBase application using Maven and execute the same in a MapR cluster.

Assumption: We already have a MapR cluster up and running with Hbase and other required components.

Perform the following steps:

      Go into the following folder and create a directory for maven
cd /home/mapr
mkdir maven

        Download and install maven
cd maven
tar -xvf apache-maven-3.3.9-bin.tar.gz

        Set maven home
export M2_HOME=/home/mapr/maven/apache-maven-3.3.9
export PATH=${M2_HOME}/bin:${PATH}

        Create a folder for the sample HBase application
mkdir -p java-jobs/hbase
cd java-jobs/hbase

        Generate a sample maven project
mvn archetype:generate -DgroupId=com.ajames.hbase.examples -DartifactId=hbaseapp -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

        Modify the pom.xml file to add the mapr repository details
cd hbaseapp/
vi pom.xml

Add mapr repository details:
              
  <repositories>
    <repository>
      <id>mapr-maven</id>
      <url>http://repository.mapr.com/maven</url>
      <releases><enabled>true</enabled></releases>
      <snapshots><enabled>false</enabled></snapshots>
    </repository>
  </repositories>

        Modify pom.xml file to add the HBase dependecy

  <dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>1.1.1-mapr-1602</version>
  </dependency>

        Create the sample application
cd src/main/java/com/ajames/hbase/examples/

Sample code:
package com.ajames.hbase.examples;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import java.io.IOException;

/* Create table schema using following command.
 * echo "create 'details','a','b'" | hbase shell
 */

public class HBaseDemo {

                    public static void main(String[] args) throws IOException {
                                        Configuration conf = HBaseConfiguration.create();
                                        HTable table = new HTable(conf,"details");
                                        Put p1 = new Put("details1".getBytes());
                                       
                                        byte[] a = "a".getBytes();
                                        byte[] b = "b".getBytes();
                                       
                                        p1.add(a,"c1".getBytes(),"First".getBytes());
                                        p1.add(b,"c1".getBytes(),"Second".getBytes());
                                                                               
                                        table.put(p1);
                                        table.close();
                    }
}

        Building the application
cd /home/mapr/java-jobs/hbase/hbaseapp
mvn clean package

Running the application
export HADOOP_CLASSPATH=`hbase classpath`
[mapr@m52-d18-1 hbaseapp]$ hadoop jar target/hbaseapp-1.0-SNAPSHOT.jar com.ajames.hbase.examples.HBaseDemo
16/09/26 21:08:01 INFO client.ConnectionFactory: mapr.hbase.default.db unsetDB is neither MapRDB or HBase, set HBASE_MAPR mode since mapr client is installed.
16/09/26 21:08:01 INFO client.ConnectionFactory: ConnectionFactory receives mapr.hbase.default.db(unsetDB), set clusterType(HBASE_MAPR), hbase_admin_connect_at_construction(false)
16/09/26 21:08:01 INFO client.ConnectionFactory: ConnectionFactory creates a hbase connection!
16/09/26 21:08:02 INFO client.HTable: BufferedMutator Use HBase ThreadPool
16/09/26 21:08:02 INFO zookeeper.RecoverableZooKeeper: Process identifier=hconnection-0x66dd1d3 connecting to ZooKeeper ensemble=10.10.72.147:5181,10.10.72.148:5181
16/09/26 21:08:02 INFO client.ConnectionManager$HConnectionImplementation: Closing zookeeper sessionid=0x15767d869180047


      Verify the HBase table
hbase shell
hbase(main):001:0> scan 'details'
ROW                                              COLUMN+CELL
 details1                                        column=a:c1, timestamp=1474938482964, value=First
 details1                                        column=b:c1, timestamp=1474938482964, value=Second

1 row(s) in 0.5730 seconds