Monday, 20 November 2017

DRILL: Factors determining number of minor fragments spawn for MapRDB table scan


AIM:

Discuss different factors that will determine the number of minor fragments spawn for DRILL- MapRDB table scan. 

ENVIRONMENT DETAILS:

  
MapR Version5.2.1
Drill Version1.10

3 node cluster (VMs) each with 20GB and 8 vcores.

Created a MapRDB table ‘/tmp/mdbtb2’. Loaded some random data into the table.
Total number of rows in the table is 1,000,003.

Details of MapRDB table:

[mapr@hostnameA root]$ maprcli table region list -path '/tmp/mdbtb2' -json
{
        "timestamp":1510279617490,
        "timeofday":"2017-11-09 09:06:57.490 GMT-0500",
        "status":"OK",
        "total":4,
        "data":[
                {
                        "primarymfs":"hostnameB:5660",
                        "secondarymfs":"hostnameA:5660, hostnameC:5660",
                        "startkey":"-INFINITY",
                        "endkey":"user3020912552751108539",
                        "lastheartbeat":0,
                        "fid":"2176.50.1967492",
                        "logicalsize":270729216,
                        "physicalsize":173088768,
                        "copypendingsize":0,
                        "numberofrows":243531,
                        "numberofrowswithdelete":0,
                        "numberofspills":77,
                        "numberofsegments":77
                },
                {
                        "primarymfs":"hostnameB:5660",
                        "secondarymfs":"hostnameA:5660, hostnameC:5660",
                        "startkey":"user3020912552751108539",
                        "endkey":"user5074931419976666452",
                        "lastheartbeat":0,
                        "fid":"2176.337.1968048",
                        "logicalsize":275193856,
                        "physicalsize":176119808,
                        "copypendingsize":0,
                        "numberofrows":247945,
                        "numberofrowswithdelete":0,
                        "numberofspills":76,
                        "numberofsegments":76
                },
                {
                        "primarymfs":"hostnameB:5660",
                        "secondarymfs":"hostnameA:5660, hostnameC:5660",
                        "startkey":"user5074931419976666452",
                        "endkey":"user7176543975612180375",
                        "lastheartbeat":0,
                        "fid":"2817.1600.1055958",
                        "logicalsize":280805376,
                        "physicalsize":179748864,
                        "copypendingsize":0,
                        "numberofrows":252843,
                        "numberofrowswithdelete":0,
                        "numberofspills":80,
                        "numberofsegments":80
                },
                {
                        "primarymfs":"hostnameB:5660",
                        "secondarymfs":"hostnameA:5660, hostnameC:5660",
                        "startkey":"user7176543975612180375",
                        "endkey":"INFINITY",
                        "lastheartbeat":0,
                        "fid":"2817.33.1053610",
                        "logicalsize":284286976,
                        "physicalsize":181764096,
                        "copypendingsize":0,
                        "numberofrows":255684,
                        "numberofrowswithdelete":0,
                        "numberofspills":82,
                        "numberofsegments":80
                }
        ]
}

Hence, the MapRDB table have 4 tablets. 

VARIOUS TESTS:

Query used for all the tests: select count(*) from dfs.`tmp`.`mdbtb2`


Test 1:
 
DRILL PropertyValue
planner.slice_target100000
planner.width.max_per_query1000
planner.width.max_per_node100
Number of minor fragments spawn for major fragment 1 = 4.
Observation: The number depends on the number of tablets for the MapRDB table.






















Test 2:

 
DRILL PropertyValue
planner.slice_target1000100
planner.width.max_per_query1000
planner.width.max_per_node100
Number of minor fragments spawn for major fragment 0 = 1.
Observation: The number depends on the slice_target. The slice_target is set to 1000100. The total number of records in MapRDB table is 1000003. Hence, a single thread reads data from all the four tablets. This can cause remote reads. 
 




















Test 3:

 
DRILL PropertyValue
planner.slice_target500000
planner.width.max_per_query1000
planner.width.max_per_node100
Number of minor fragments spawn for major fragment 1 = 2.
Observation: The number depends on the slice_target. The slice_target is set to 500000. The total number of records in MapRDB table is 1000003. Hence, a 2 threads read data from all the four tablets. This can also cause remote reads. 





















SUMMARY:

  • The number of minor fragments spawn for MapRDB scan depends on two factors - number of MapRDB table tablets and `planner.slice_target`.
  • Only one minor fragment will read data from a tablet, meaning there will not be any case where multiple minor fragments will scan data from same tablet at a time.
  • If the total estimated rowcount from MapRDB is less than the `planner.slice_target` value, then only one minor fragment will be used even is table contains multiple tablets.
  • Or in more generic terms, a minor fragment can read as many tablets as long as the estimated row count is less than set `planner.slice_target`.

NOTE

In Case 3, the `planner.slice_target` is 500000. The total number of records in table is 1000003. From the query profile, we can see that following are the number of records read:
01-00-xx reads 496,374 and 01-01-xx reads 503,629.
Here, 01-01-xx reads more than the number specified in slice_target and they belong to 2 different tablets. Why did it read the data in single fragment?

There can only be maximum of 4 minor fragments in this case since there are 4 tablets.  We don’t enforce the slice target at run-time, only at planning time to determine the degree of parallelism.  
So, at run time it is possible that the sum total of the rowcount exceeds the slice target but that is fine because once we have assigned a minor fragment to read data from the 2 tablets, we don’t create additional minor fragments on the fly – it is all decided up front during planning time. 

Monday, 30 October 2017

Apache DRILL - Troubleshooting ChannelClosedException 


AIM : 

To discuss few tuning parameters that can help to avoid "ChannelClosedException" errors in DRILL.
Please note that this a generic troubleshooting guide.

SYMPTOMS :

Following error messages reported during query failure could be becasue of "ChannelClosedException".
[1] o.a.d.e.w.fragment.FragmentExecutor - SYSTEM ERROR: ChannelClosedException: Channel closed /<hostname1>:<port> <--> /<hostname2>:<port2>.
[2] SYSTEM ERROR: Drill Remote Exception

Case 1 : o.a.d.e.w.fragment.FragmentExecutor - SYSTEM ERROR: ChannelClosedException: Channel closed /<hostname1>:<port> <--> /<hostname2>:<port2>.
This may be because of "OutOfMemory" issue in any one of the drillbit node. More troubleshooting quide and tuning details can be found my previoous blog.
https://bigdatainourpalm.blogspot.com/2015/10/apache-drill-troubleshooting.html

Case 2 : If you see logs similar to following, it will be mostly due to timeout issues at client side. 
(i)  o.a.d.exec.rpc.RpcExceptionHandler - Exception occurred with closed channel.  Connection: /<hostname1>:<port> <--> /<hostname2>:<port2>. (user server)
(ii) o.a.d.exec.rpc.RpcExceptionHandler - Exception occurred with closed channel.  Connection: /<hostname1>:<port> <--> /<hostname2>:<port2>. (user client)

More error stack is given below:

2015-04-26 04:30:50,430 [27856ea2-2b5c-7908-9e89-aef5eec27034:frag:8:147] INFO  o.a.d.e.w.f.FragmentStatusReporter - 27856ea2-2b5c-7908-9e89-aef5eec27034:8:147: State to report: RUNNING
2015-04-26 04:31:11,825 [UserServer-1] INFO  o.a.drill.exec.rpc.user.UserServer - RPC connection /XXX.YY.ZZ.YY:31010 <--> /XYX.YU.YZ.YY:33320 (user server) timed out.  Timeout was set to 30 seconds. Closing connection.
2015-04-26 04:31:11,839 [27856ea2-2b5c-7908-9e89-aef5eec27034:frag:0:0] INFO  o.a.d.e.w.fragment.FragmentExecutor - 2613806b-9b7b-1d29-3017-dab770388115:0:0: State change requested RUNNING --> FAILED
2015-04-26 04:31:11,841 [UserServer-1] INFO  o.a.d.e.w.fragment.FragmentExecutor - 2667806b-9b7b-1d29-3017-dab770388115:0:0: State change requested FAILED --> FAILED
2015-04-26 04:31:11,841 [27856ea2-2b5c-7908-9e89-aef5eec27034:frag:0:0] INFO  o.a.d.e.w.fragment.FragmentExecutor - 2667806b-9b7b-1d29-3017-dab770388115:0:0: State change requested FAILED --> FAILED

The key here is 'user server'/'user client' message in the logs.
The default timeout value is 30 seconds. 

SOLUTION/WORKAROUND :

Increase the value of user client timeout. This can be done by setting 'drill.exec.user.timeout' property in 'drill-override.conf' file.
An example is given below (changing timeout to 300 seconds or 5 minutes):

cat drill-override.conf

drill.exec: {
cluster-id: "ajames-drillbits",
zk.connect: "<hn>:5181",
rpc.user.timeout: "300"
}

You need to restart drillbit services after making the change.

Apache DRILL - TroubleShooting OutOfMemory Issues


AIM :

To discuss few tuning parameters that can help to avoid "OutOfMemory" errors in DRILL.
Please note that this a generic troubleshooting guide.

SYMPTOMS :

Drill query failures with following error messages are also potential candidates for "OutOfMemory" issues.
[1] o.a.d.e.w.fragment.FragmentExecutor - SYSTEM ERROR: ChannelClosedException: Channel closed /<hostname1>:<port> <--> /<hostname2>:<port2>.
[2] SYSTEM ERROR: Drill Remote Exception

The above error messages are just the after effect of "OutOfMemory" that occured in any one of the drillbit node that was running a fragment of query that failed.
As a result, to ensure that the issue is indeed caused by insufficient memory, you need to check logs from all the drillbit nodes. 

You should see error logs similar to below in atleast one of the drillbit node logs:

Caused by: org.apache.drill.exec.exception.OutOfMemoryException: Failure allocating buffer. 
at io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:64) ~[drill-memory-base-1.10.0.jar:4.0.27.Final] 
at org.apache.drill.exec.memory.AllocationManager.<init>(AllocationManager.java:80) ~[drill-memory-base-1.10.0.jar:1.10.0] 
at org.apache.drill.exec.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:243) ~[drill-memory-base-1.10.0.jar:1.10.0] 
at org.apache.drill.exec.memory.BaseAllocator.buffer(BaseAllocator.java:225) ~[drill-memory-base-1.10.0.jar:1.10.0] 
at org.apache.drill.exec.memory.BaseAllocator.buffer(BaseAllocator.java:195) ~[drill-memory-base-1.10.0.jar:1.10.0] 
at io.netty.buffer.ExpandableByteBuf.capacity(ExpandableByteBuf.java:43) ~[drill-memory-base-1.10.0.jar:4.0.27.Final] 
at io.netty.buffer.AbstractByteBuf.ensureWritable(AbstractByteBuf.java:251) ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final] 
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:849) ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final] 
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:841) ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final] 
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:831) ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final] 
at io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:92) ~[netty-codec-4.0.27.Final.jar:4.0.27.Final] 
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:227) ~[netty-codec-4.0.27.Final.jar:4.0.27.Final] 
... 11 common frames omitted 
Caused by: java.lang.OutOfMemoryError: Direct buffer memory 
at java.nio.Bits.reserveMemory(Bits.java:658) ~[na:1.7.0_51] 
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) ~[na:1.7.0_51] 
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306) ~[na:1.7.0_51] 
at io.netty.buffer.UnpooledUnsafeDirectByteBuf.allocateDirect(UnpooledUnsafeDirectByteBuf.java:108) ~[netty-buffer-4.0.27.Final.jar:4.0.27.Final] 


The key here is "Caused by: java.lang.OutOfMemoryError: Direct buffer memory". 
This indicates that drill bit is running out of direct memory allocated for it in that node.


SOLUTION/WORKAROUND :

We will discuss about 3 different options to avoid such errors.

[Option 1] Play around with following two parameters:

(a) planner.width.max_per_node
(b) planner.width.max_per_query

From official Apache Drill documentation (https://drill.apache.org/docs/configuration-options-introduction/)

planner.width.max_per_node : Maximum number of threads that can run in parallel for a query on a node. A slice is an individual thread. This number indicates the maximum number of slices per query for the query’s major fragment on a node.

planner.width.max_per_query : Same as max per node but applies to the query as executed by the entire cluster. For example, this value might be the number of active Drillbits, or a higher number to return results faster.

If the drillbit is running out of memory because of more than one thread running in the same node, setting the above values to lower value and re-executing the query should help.
You can either set it at session level or system level. 

Session Level:
alter session set `planner.width.max_per_node`=1

If the query works fine with the above value, you can tune it by increasing to higher and more optimal value.
A similar approach can be taken for 'planner.width.max_per_query' as well. However, instead of starting with value 1, start tuning with value 50/100 and increase.

Both the above two properties do not require drillbit service restart.

[Option 2] Tune 'drill.exec.buffer.size' property.

From official Apache Drill documentation (https://drill.apache.org/docs/start-up-options/)

drill.exec.buffer.size : Defines the amount of memory available, in terms of record batches, to hold data on the downstream side of an operation. Drill pushes data downstream as quickly as possible to make data immediately available. This requires Drill to use memory to hold the data pending operations. When data on a downstream operation is required, that data is immediately available so Drill does not have to go over the network to process it. Providing more memory to this option increases the speed at which Drill completes a query.

The default value is 6. Following is an example how it affects the direct memory:
Let's say a query comprises of 2 major fragments. The number of minor fragments inside each major fragment is given below:
Major Fragment 0 - 1 Minor Fragment
Major Fragment 1 - 100 Minor Fragments

Major Fragment 1 will have a SENDER that sends data and RECEIVER at Major Fragment 0 received it. Major Fragment 0 will be running on the foreman node. 
'drill.exec.buffer.size' controls the size of the 100 buffers on the foreman. Foreman is accumulating batches being sent to him by 100 minor fragments in 100 buffers. Each buffer has a soft-limit size of 6 (drill.exec.buffer.size). So the foreman will essentially be responsible for holding 6 * 100 batches in memory at least. Let's say each batch is 8 mb in size. So the foreman will be responsible for holding approximately 6 * 100 * 8mb = 4.68GB of memory. However this is not all the memory being consumed on the foreman, since there is likely some unreported memory being consumed by netty buffers as well. 

So, if we change the value of 'drill.exec.buffer.size' to 3, then the memory used will be (4.68/2)=2.34GB. This can reslove the issue.
The new value is to be addeded in drill-override.conf. Below is an example for the same.

cat drill-override.conf

drill.exec: {
cluster-id: "ajames-drillbits",
zk.connect: "<hn>:5181",
buffer.size: "1",
}

Changing the above value requires restarting of drillbit services.

[Option 3] Increase direct memory 

By default it is 8GB. This is specified inside drill-env.sh file in DRILL CONF directory.
export DRILL_MAX_DIRECT_MEMORY=${DRILL_MAX_DIRECT_MEMORY:-"8G"}
This change also requires restart of drillbit services.

Friday, 22 September 2017

How to control time during which HBase major compaction will be executed?


Following two properties will help to control time at which major compaction will be kicked in.
(1) hbase.offpeak.start.hour
(2) hbase.offpeak.end.hour

The above properties take value between 0-23.
For example, if we identify that HBase cluster is not loaded during night, let's say 10 PM to 06 AM every day, then we can set the following in hbase-site.xml and restart the regionserver.

<property>
<name>hbase.offpeak.start.hour</name>
<value>22</value>
</property>

<property>
<name>hbase.offpeak.end.hour</name>
<value>6</value>
</property>

If the value is not correctly set, we will see similar WARN message in regionserver logs:
2017-09-22 19:21:16,533 WARN  [StoreOpener-ee1faec4bdc3df3a4f4fa959c641e782-1] compactions.OffPeakHours: Ignoring invalid start/end hour for peak hour : start = 22 end = -1. Valid numbers are [0-23]

We can also change the compaction ratio to have more finer control over the compaction. Following are the two properties that will help to achieve the same:
(1) hbase.hstore.compaction.ratio (Default value is 1.2)
(2) hbase.hstore.compaction.ratio.offpeak (Default value is 5.0)


If we want to change the values to 1.4 and 6.5, add the following in regionserver and restart the service.
<property>
<name>hbase.hstore.compaction.ratio</name>
<value>1.4</value>
</property>

<property>
<name>hbase.hstore.compaction.ratio.offpeak</name>
<value>6.5</value>
</property>

You will see similar message in regionserver logs once the values are imposed:
2017-09-22 19:36:16,555 INFO  [StoreOpener-9edcc4b5cb0376b7366544d00b42ba44-1] compactions.CompactionConfiguration: size [134217728, 9223372036854775807); files [3, 10); ratio 1.400000; off-peak ratio 6.500000; throttle point 2684354560; major period 604800000, major jitter 0.500000, min locality to compact 0.000000

From official HBase documentation:


hbase.hstore.compaction.ratio: For minor compaction, this ratio is used to determine whether a given StoreFile which is larger than hbase.hstore.compaction.min.size is eligible for compaction. Its effect is to limit compaction of large StoreFiles. The value of hbase.hstore.compaction.ratio is expressed as a floating-point decimal. A large ratio, such as 10, will produce a single giant StoreFile. Conversely, a low value, such as .25, will produce behavior similar to the BigTable compaction algorithm, producing four StoreFiles. A moderate value of between 1.0 and 1.4 is recommended. When tuning this value, you are balancing write costs with read costs. Raising the value (to something like 1.4) will have more write costs, because you will compact larger StoreFiles. However, during reads, HBase will need to seek through fewer StoreFiles to accomplish the read. Consider this approach if you cannot take advantage of Bloom filters. Otherwise, you can lower this value to something like 1.0 to reduce the background cost of writes, and use Bloom filters to control the number of StoreFiles touched during reads. For most cases, the default value is appropriate.

hbase.hstore.compaction.ratio.offpeak: Allows you to set a different (by default, more aggressive) ratio for determining whether larger StoreFiles are included in compactions during off-peak hours. Works in the same way as hbase.hstore.compaction.ratio. Only applies if hbase.offpeak.start.hour and hbase.offpeak.end.hour are also enabled.

Wednesday, 20 September 2017

HBase Memstore Flush - Part 2


Aim:


Aim of this blog is to discuss various scenarios which will lead to memstore flushes in HBase.

Any put operation to HBase goes to memstore (in memory). It is also written to WAL by default. There is one memstore per column family per region per regionserver per HBase table. When certain threshold is reached memstore is flushed.

The threshold can be mainly categorized into two:
[A] Size based
[B] Time based

This blog focuses on time based memstore flushes. My previous blog (HBase Memstore Flush - Part 1) discusses about size based memstore flushes.

Time based memstore flushes:


Memstore is also flushed periodically. The flushing interval in time based memstore flush is controlled by 'hbase.regionserver.optionalcacheflushinterval' set in hbase-site.xml. If nothing is set, the default value - 3600000ms (1 hour) is taken. Periodic memstore flushes will help in freeing up regionserver memory. However, more number of small memstore flushes, the more number of minor compaction. Hence depending on the application running on HBase, we need to tune the parameter. Setting 'hbase.regionserver.optionalcacheflushinterval' to negative value will disable periodic memstore flushes.

Periodic memstore flushes are introduced as part of https://issues.apache.org/jira/browse/HBASE-5930 

Following is part of HBase source code that performs the same:

static class PeriodicMemstoreFlusher extends ScheduledChore {
    final HRegionServer server;
    final static int RANGE_OF_DELAY = 20000; //millisec
    final static int MIN_DELAY_TIME = 3000; //millisec
    public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
      super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
      this.server = server;
    }

    @Override
    protected void chore() {
      for (Region r : this.server.onlineRegions.values()) {
        if (r == null)
          continue;
        if (((HRegion)r).shouldFlush()) {
          FlushRequester requester = server.getFlushRequester();
          if (requester != null) {
            long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
            LOG.info(getName() + " requesting flush for region " +
              r.getRegionInfo().getRegionNameAsString() + " after a delay of " + randomDelay);
            //Throttle the flushes by putting a delay. If we don't throttle, and there
            //is a balanced write-load on the regions in a table, we might end up
            //overwhelming the filesystem with too many flushes at once.
            requester.requestDelayedFlush(r, randomDelay, false);
          }
        }
      }
    }
  }

Following is the definition for shouldFlush():

boolean shouldFlush() {
    // This is a rough measure.
    if (this.maxFlushedSeqId > 0
          && (this.maxFlushedSeqId + this.flushPerChanges < this.sequenceId.get())) {
      return true;
    }
    long modifiedFlushCheckInterval = flushCheckInterval;
    if (getRegionInfo().isMetaRegion() &&
        getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
      modifiedFlushCheckInterval = META_CACHE_FLUSH_INTERVAL;
    }
    if (modifiedFlushCheckInterval <= 0) { //disabled
      return false;
    }
    long now = EnvironmentEdgeManager.currentTime();
    //if we flushed in the recent past, we don't need to do again now
    if ((now - getEarliestFlushTimeForAllStores() < modifiedFlushCheckInterval)) {
      return false;
    }
    //since we didn't flush in the recent past, flush now if certain conditions
    //are met. Return true on first such memstore hit.
    for (Store s : getStores()) {
      if (s.timeOfOldestEdit() < now - modifiedFlushCheckInterval) {
        // we have an old enough edit in the memstore, flush
        return true;
      }
    }
    return false;
  }

'flushCheckInterval' is set from following properties:

this.flushCheckInterval = conf.getInt(MEMSTORE_PERIODIC_FLUSH_INTERVAL,DEFAULT_CACHE_FLUSH_INTERVAL);

where 

public static final String MEMSTORE_PERIODIC_FLUSH_INTERVAL = "hbase.regionserver.optionalcacheflushinterval";
public static final int DEFAULT_CACHE_FLUSH_INTERVAL = 3600000;

The periodic flush chore will be invoked based on 'hbase.server.thread.wakefrequency' value. Default value is 10000ms.

HBase Memstore Flush - Part 1


Aim:


Aim of this blog is to discuss various scenarios which will lead to memstore flushes in HBase.

Any put operation to HBase goes to memstore (in memory). It is also written to WAL by default. There is one memstore per column family per region per regionserver per HBase table. When certain threshold is reached memstore is flushed.

The threshold can be mainly categorized into two:
[A] Size based
[B] Time based

This blog focuses on size based memstore flushes. My next blog (HBase Memstore Flush - Part 2) discusses about time based memstore flushes.

Size based memstore flushes:


Since memstore is in memory and is part of regionserver memory, it is flushed when it reaches a certain threshold.

The threshold is controlled by following parameters:


[a] hbase.hregion.memstore.flush.size (specified in bytes)

Each memstore is checked for this threshold periodically (determined by 'hbase.server.thread.wakefrequency'). If the memstore hits this limit, it will be flushed. Please note that every memstore flush creates one HFile per CF per region.

[b] Regionserver might have many regions managed by it. Since memstore uses heap memory of regionserver, we also need to control the total heap memory used by all the memstores. This is controlled by following parameters:

(1) hbase.regionserver.global.memstore.size.lower.limit 
Maximum size of all memstores in a region server before flushes are forced. Defaults to 95% of hbase.regionserver.global.memstore.size (0.95). hbase.regionserver.global.memstore.lowerLimit is old property for the same. It will be honored if specified.

(2) hbase.regionserver.global.memstore.size
Maximum size of all memstores in a region server before new updates are blocked and flushes are forced. Defaults to 40% of heap (0.4). Updates are blocked and flushes are forced until size of all memstores in a region server hits hbase.regionserver.global.memstore.size.lower.limit. hbase.regionserver.global.memstore.upperLimit is the old property for the same. It will be honored if specified.


Few other parameters that needs to be taken care are the following:
* hbase.hregion.memstore.block.multiplier - Updates are blocked if memstore has hbase.hregion.memstore.block.multiplier times hbase.hregion.memstore.flush.size bytes. The default value is 4.

* hbase.hregion.percolumnfamilyflush.size.lower.bound - If FlushLargeStoresPolicy is used, then every time that we hit the total memstore limit, we find out all the column families whose memstores exceed this value, and only flush them, while retaining the others whose memstores are lower than this limit. If none of the families have their memstore size more than this, all the memstores will be flushed (just as usual). This value should be less than half of the total memstore threshold (hbase.hregion.memstore.flush.size). (https://issues.apache.org/jira/browse/HBASE-10201). To restore the old behavior of flushes writing out all column families, set hbase.regionserver.flush.policy to org.apache.hadoop.hbase.regionserver.FlushAllStoresPolicy either in hbase-default.xml or on a per-table basis by setting the policy to use with HTableDescriptor.getFlushPolicyClassName().

Friday, 15 September 2017

How to enable/disable Hue autocomplete feature in editors

Aim: 

Hue 3.12 comes with rich features. One such feature is option to enable or disable autocomplete in editors and notebooks. Autocomplete feature is turned ON by default.

How to?

There are few options that will help to achieve this.

[1] If we need to use the new editor which has the autocomplete feature available, then make sure following property is set to true in hue.ini file.  (requires hue restart)

use_new_editor=true (if this property is not explicitly set, it defaults to 'true')

If the above property is set to 'false', then autocomplete feature will not be available.

[2] set editor_autocomplete_timeout=0 in hue.ini file to disable autocomplete feature.  (requires hue restart)

[3] After loading the Hue editor, press "Ctrl+,". You will see options to 'Enable Autocompleter' and 'Enable Live Autocompletion'. Marking it  as unchecked will disable the autocomplete feature.  

Tuesday, 5 September 2017

Hive log files not getting deleted even after retention number is reached from Hive 2.1 onward


Issue:

Hive log files not getting deleted even after the retention number is reached. Log rotation works fine.
Issue is observed from Hive 2.1 which uses log4j2 for logging.

Cause:

The issue is observed for 'TimeBasedTriggeringPolicy' in log4j2.
This is a know limitation in 'TimeBasedTriggeringPolicy' for log4j2 as mentioned in https://issues.apache.org/jira/browse/LOG4J2-435.

Workaround:

One workaround for the issue is to use SizeBasedTriggeringPolicy. 
To use 'SizeBasedTriggeringPolicy', make the following changes in 'hive-log4j2.properties' inside respective HIVE_CONF_DIR.

Comment out following:
appender.DRFA.filePattern = ${sys:hive.log.dir}/${sys:hive.log.file}.%d{yyyy-MM-dd}
appender.DRFA.policies.time.type = TimeBasedTriggeringPolicy
appender.DRFA.policies.time.interval = 1
appender.DRFA.policies.time.modulate = true

Add the following:
appender.DRFA.filePattern = ${sys:hive.log.dir}/${sys:hive.log.file}.%i
appender.DRFA.policies.size.type=SizeBasedTriggeringPolicy
appender.DRFA.policies.size.size=100MB    -----> Customize the size of each log file you need
appender.DRFA.strategy.max = 3   -----> Customize the number of log files to be retained

It will look similar to this after above activity:

It will look similar to this after above activity:
# daily rolling file appender
appender.DRFA.type = RollingRandomAccessFile
appender.DRFA.name = DRFA
appender.DRFA.fileName = ${sys:hive.log.dir}/${sys:hive.log.file}
# Use %pid in the filePattern to append <process-id>@<host-name> to the filename if you want separate log files for different CLI session
#appender.DRFA.filePattern = ${sys:hive.log.dir}/${sys:hive.log.file}.%d{yyyy-MM-dd}
appender.DRFA.filePattern = ${sys:hive.log.dir}/${sys:hive.log.file}.%i
appender.DRFA.layout.type = PatternLayout
appender.DRFA.layout.pattern = %d{ISO8601} %5p [%t] %c{2}: %m%n
appender.DRFA.policies.type = Policies
#appender.DRFA.policies.time.type = TimeBasedTriggeringPolicy
appender.DRFA.policies.size.type=SizeBasedTriggeringPolicy
appender.DRFA.policies.size.size=100MB
#appender.DRFA.policies.time.interval = 1
#appender.DRFA.policies.time.modulate = true
appender.DRFA.strategy.type = DefaultRolloverStrategy
appender.DRFA.strategy.max = 3

 

Friday, 18 August 2017

How to add DEBUG for hbase shell?


Test is performed in a MapR cluster.

To enable DEBUG in 'hbase shell', make the following change:

Inside '/opt/mapr/hbase/hbase-<version>/bin/hbase' file
Change : export HBASE_ROOT_LOGGER="INFO,DRFA"
to : export HBASE_ROOT_LOGGER="DEBUG,console" 
in 'if [ "$COMMAND" = "shell" ] ; then' section.

Sample is given below:


# figure out which class to run
if [ "$COMMAND" = "shell" ] ; then
  # send hbase shell log messages to the log file
  export HBASE_LOGFILE="hbase-${HBASE_IDENT_STRING}-shell-${HOSTNAME}.log"
  export HBASE_ROOT_LOGGER="DEBUG,console"
  # eg export JRUBY_HOME=/usr/local/share/jruby
  if [ "$JRUBY_HOME" != "" ] ; then
    CLASSPATH="$JRUBY_HOME/lib/jruby.jar:$CLASSPATH"
    HBASE_OPTS="$HBASE_OPTS -Djruby.home=$JRUBY_HOME -Djruby.lib=$JRUBY_HOME/lib"
  fi
        #find the hbase ruby sources
  if [ -d "$HBASE_HOME/lib/ruby" ]; then
    HBASE_OPTS="$HBASE_OPTS -Dhbase.ruby.sources=$HBASE_HOME/lib/ruby"
  else
    HBASE_OPTS="$HBASE_OPTS -Dhbase.ruby.sources=$HBASE_HOME/hbase-shell/src/main/ruby"
  fi
  HBASE_OPTS="$HBASE_OPTS $HBASE_SHELL_OPTS"
  CLASS="log4j.logger.org.jruby.Main -X+O ${JRUBY_OPTS} ${HBASE_HOME}/bin/hirb.rb"

Thursday, 17 August 2017

Can Classic Drill and Drill-On-Yarn co-exist? 


This combination is not completely tested but we can get it working. 

Following notes will help to configure Drill-Yarn and Drill-Classic to co-exist on the same cluster. 
The activity is performed in a MapR cluster.

[root@vm78-218 ~]# maprcli node list -columns svc 
service hostname ip 
historyserver,hbmaster,webserver,nodemanager,spark-historyserver,drill-bits,cldb,fileserver,hoststats vm78-217 IP1 
hbmaster,hbregionserver,webserver,cldb,fileserver,hoststats,hue vm78-210 IP2 
hivemeta,httpfs,webserver,drill-bits,fileserver,resourcemanager,hue vm78-218 IP3 

We have two classic drill-bits running on vm78-217 and vm78-218. 
We will install Drill On YARN (1.10) on vm78-210. (Note, this machine do not have any drill installed). 

yum install mapr-drill-yarn -y 

Now, configure Drill-YARN to co-exist with Classic Drill. 

[1] Change the drill log location in 'drill-env.sh'. Add the following: 
export DRILL_LOG_DIR="/opt/mapr/drill/drill-1.1.0-YARN/logs" 
Make sure that the directory is present in all the nodes running nodemanager with necessary permissions. 

[2] Make necessary changes in 'drill-on-yarn.conf' based on http://maprdocs.mapr.com/home/Drill/configure_drill_to_run_under_yarn.html 

[3] We need to change the following properties in 'drill-override.conf' 
• drill.exec.cluster-id 
• drill.exec.zk.root 
• drill.drill.exec.rpc.user.server.port 
• drill.exec.rpc.bit.server.port 
• drill.exec.http.port 
(The idea is to give different values to above properties other than the default properties. This is under the assumption that the default properties is used by classic drill.) 

Sample is given below: 
drill.exec: { 
cluster-id: "ajames-drillbitsonyarn", 
zk.root: "drillonyarn", 
rpc.user.server.port: 21010, 
rpc.bit.server.port: 21011, 
http.port: 7047, 
zk.connect: "<zk-qourum>" 

[4] Follow steps in http://maprdocs.mapr.com/home/Drill/step_4_launch_drill_under_yarn.html to launch the DRILL-ON-YARN. 

You can also refer https://community.mapr.com/docs/DOC-1188 

Issue : Hive on MR and Hive on TEZ query output difference

Issue:



If we see different outputs for the same Hive query when executed in MR mode and TEZ mode, there could be multiple reasons. In this particular case we discuss a scenario where Hive on MR gives correct result while Hive on TEZ returns wrong result.

Analysis:


It was found that the issue is with the location where the actual data files are residing inside the hdfs hive table location. (For this particular query, issue was with one table)
Previously (before transactional feature was introduced in Hive) all files for a partition (or a table if the table is not partitioned) lived in a single directory.  With these changes, any partitions (or tables) written with an ACID aware writer will have a directory for the base files and a directory for each set of delta files. (https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions#HiveTransactions-BaseandDeltaDirectories). From the hive debug logs, it was seen TEZ was reading data multiple times which resulted in wrong results.
 

Following is part of the files and directories inside the table directory.

-rwxr-x--- 1 prodxx prodxx 813 Jun  12 00:04 000000_0_copy_1
-rwxr-x--- 1 prodxx prodxx 832 Jun  12 00:04 000000_0_copy_2
-rwxr-x--- 1 prodxx prodxx 822 Jun  12 00:04 000000_0_copy_3
-rwxr-x--- 1 prodxx prodxx 826 Jun  12 00:05 000000_0_copy_4
-rwxr-x--- 1 prodxx prodxx 836 Jun  12 00:05 000000_0_copy_5
drwxr-x--- 2 prodxx prodxx   5 Jul 16 05:50 delta_0000116_0000116
drwxr-x--- 2 prodxx prodxx   1 Jul 16 05:50 delta_0000117_0000117
drwxr-x--- 2 prodxx prodxx   1 Jul 16 05:50 delta_0000129_0000129
drwxr-x--- 2 prodxx prodxx   5 Jul 16 05:50 delta_0000118_0000118
drwxr-x--- 2 prodxx prodxx   5 Jul 16 05:50 delta_0000130_0000130



Here there are data files (example :- 000000_0_copy_4, 000000_0_copy_5, etc) outside ‘delta_*’ folder. Also, we do not see any ‘base_*’ folder.
Since the files were lying outside of ‘delta_*’ or ‘base_*’ folder, TEZ reader was giving wrong results.
One possible reason why the data files were residing outside designated directory could be using LOAD command to load data.
However, LOAD is not supported for transactional tables. (https://cwiki.apache.org/confluence/display/Hive/Hive+Transactions#HiveTransactions-Limitations)


Another finding in this particular case was, there were no ‘base_*’ folder inside any hive transactional tables.
Compactor is the process responsible for combining multiple delta files into a single delta per bucket during minor compaction and combining delta and base files into single base per bucket during major compaction.
Hive metastore is responsible running the compactor. It is a background process inside hive metastore. So, we need hivemetastore package installed.

In this specific case, there as no hivemetastore package installed.

Tuesday, 8 August 2017


Issue with Hue Hive editor not showing verbose logs


Issue:

Hue Hive editor does not show logs for the jobs executed. Issue seen from Hive 2.1 version.  

Reason:

From Hive 2.1.0 onwards (with HIVE-13027), Hive uses Log4j2's asynchronous logger by default. Setting hive.async.log.enabled to false will disable asynchronous logging and fallback to synchronous logging. Asynchronous logging can give significant performance improvement as logging will be handled in a separate thread that uses the LMAX disruptor queue for buffering log messages. Refer to https://logging.apache.org/log4j/2.x/manual/async.html for benefits and drawbacks. (https://cwiki.apache.org/confluence/display/Hive/GettingStarted)

There is a bug with beeline not showing verbose log when asynchronous logger is enabled. (https://issues.apache.org/jira/browse/HIVE-14183)

Workaround:

Set the following property in all hiverserver2 nodes and restart the service.

<property>
<name>hive.async.log.enabled</name>
<value>false</value>
</property>

Monday, 7 August 2017

Connect Apache Phoenix to MapR HBase

Aim:

Integrate Apache Phoenix and MapR HBase.

Environment details:

MapR Core Version : 5.2.1
Phoenix Version         : 4.8.2
MapR HBase Version : 1.1.8 

Pre-requisites:

MapR cluster (unsecured) up and running with HBase master and regionserver. Following example is performed on a single node cluster. The services running on the cluster is shown below:

historyserver,hbmaster,hbregionserver,webserver,nodemanager,cldb,fileserver,resourcemanager,hoststats

Steps:

[1] Download the required Phoenix from http://phoenix.apache.org/download.html
[2] Extract the tar file
[3] Copy the phoenix-4.8.2-HBase-1.1-server.jar to ‘/opt/mapr/hbase/hbase-1.1.8/lib/’ folder on all nodes that has HBase master or regionserver installed.

[4] Restart the HBase services on all nodes.
maprcli node services -action restart -name hbmaster -filter ["csvc==hbmaster"]
maprcli node services -action restart -name hbregionserver -filter ["csvc==hbregionserver"]

[5] Go to the Phoenix bin directory, execute the following command:
./sqlline.py <zookeeper_node>:5181
[root@ip-10-X-Y-Z bin]# ./sqlline.py `hostname`:5181
Setting property: [incremental, false]
Setting property: [isolation, TRANSACTION_READ_COMMITTED]
issuing: !connect jdbc:phoenix:ip-10-X-Y-Z:5181 none none org.apache.phoenix.jdbc.PhoenixDriver
Connecting to jdbc:phoenix:ip-10-X-Y-Z:5181
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/mapr/phoenix/apache-phoenix-4.8.2-HBase-1.1-bin/phoenix-4.8.2-HBase-1.1-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/mapr/lib/slf4j-log4j12-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
17/08/07 18:02:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Connected to: Phoenix (version 4.8)
Driver: PhoenixEmbeddedDriver (version 4.8)
Autocommit status: true
Transaction isolation: TRANSACTION_READ_COMMITTED
Building list of tables and columns for tab-completion (set fastconnect to true to skip)...
88/88 (100%) Done
Done
sqlline version 1.1.9
0: jdbc:phoenix:ip-10-X-Y-Z:5181>

[6] Listing tables in Phoenix:
0: jdbc:phoenix:ip-10-9-0-9:5181> !tables
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---+
| TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | INDEX_STATE | IMMUTABLE_ROWS | S |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---+
| | SYSTEM | CATALOG | SYSTEM TABLE | | | | | | false | n |
| | SYSTEM | FUNCTION | SYSTEM TABLE | | | | | | false | n |
| | SYSTEM | SEQUENCE | SYSTEM TABLE | | | | | | false | n |
| | SYSTEM | STATS | SYSTEM TABLE | | | | | | false | n |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+-----------------+---+
0: jdbc:phoenix:ip-10-9-0-9:5181>


[6] At this point you will see following tables in HBase:
hbase(main):006:0> list
Listing HBase tables. Specify a path or configure namespace mappings to list M7 tables.
TABLE
SYSTEM.CATALOG
SYSTEM.FUNCTION
SYSTEM.SEQUENCE
SYSTEM.STATS
4 row(s) in 0.0090 seconds

=> ["SYSTEM.CATALOG", "SYSTEM.FUNCTION", "SYSTEM.SEQUENCE", "SYSTEM.STATS"]  

Friday, 28 July 2017

Issue with using nohup on sqlline - DRILL


Problem:


Following is a normal way of query execution in DRILL:

[mapr@vm52-146 ~]$ sqlline -u "jdbc:drill:" -n <user> -p <password> --run=test.drill
1/1          show schemas;
+---------------------+
|     SCHEMA_NAME     |
+---------------------+
| INFORMATION_SCHEMA  |
| cp.default          |
| dfs.default         |
| dfs.root            |
| dfs.tmp             |
| sys                 |
+---------------------+
6 rows selected (1.773 seconds)
Closing: org.apache.drill.jdbc.impl.DrillConnectionImpl
apache drill 1.10.0
"a drill is a terrible thing to waste"

Or, the same can be executed as a script:

[mapr@vm52-146 ~]$ nano test.sh
[mapr@vm52-146 ~]$ cat test.sh
sqlline -u "jdbc:drill:" -n <user> -p <password> --run=test.drill > /tmp/result.txt
[mapr@vm52-146 ~]$ chmod +x test.sh
[mapr@vm52-146 ~]$ sh test.sh
1/1          show schemas;
6 rows selected (0.957 seconds)
Closing: org.apache.drill.jdbc.impl.DrillConnectionImpl
apache drill 1.10.0
"a little sql for your nosql"
[mapr@vm52-146 ~]$ cat /tmp/result.txt
+---------------------+
|     SCHEMA_NAME     |
+---------------------+
| INFORMATION_SCHEMA  |
| cp.default          |
| dfs.default         |
| dfs.root            |
| dfs.tmp             |
| sys                 |
+---------------------+

However, above methods do not return result if executed with nohup.

[mapr@vm52-146 ~]$ nohup ./test.sh &
[4] 24554
[mapr@vm52-146 ~]$ nohup: ignoring input and appending output to `nohup.out'


[4]+  Stopped                 nohup ./test.sh
[mapr@vm52-146 ~]$ cat /tmp/result.txt
[mapr@vm52-146 ~]$

Solution:

Add “-Djline.terminal=jline.UnsupportedTerminal” property in SQLLINE_JAVA_OPTS inside drill-env.sh

Wednesday, 26 July 2017

Example scala code for creating MapRDB table and inserting data from Spark-Shell


import org.apache.hadoop.hbase.client.{HBaseAdmin, Put}
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.util.Bytes.toBytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog

val spark = SparkSession.builder().appName("MapRDBConnectorExample").enableHiveSupport().getOrCreate()
val config = HBaseConfiguration.create()
val hBaseAdmin = new HBaseAdmin(config)
val hbaseContext = new HBaseContext(spark.sparkContext, config)

import spark.sql

sql("CREATE TABLE IF NOT EXISTS HiveTable (rowKeyData INT, Column1 STRING, Column2 STRING)")
sql("INSERT INTO TABLE HiveTable VALUES(1, 'Col1', 'Col2')")


Once the above action is performed, we will see following in Hive:
hive> select * from hivetable;
OK
1       Col1    Col2
Time taken: 0.14 seconds, Fetched: 1 row(s)

val df = sql("SELECT * FROM HiveTable").toDF()

val MapRDBTableName = "/tmp/SparkMapRDBTable"
val columnFamilyName = "cf"
val cata =s"""{"table":{"namespace":"default", "name":"$MapRDBTableName"},"rowkey":"key","columns":{"rowKeyData":{"cf":"rowkey", "col":"key", "type":"int"},"Column1":{"cf":"$columnFamilyName", "col":"Column1", "type":"string"},"Column2":{"cf":"$columnFamilyName", "col":"Column2", "type":"string"}}}""".stripMargin

val hTableDescriptor = new HTableDescriptor(toBytes(MapRDBTableName))
val hColumnDescriptor = new HColumnDescriptor(toBytes(columnFamilyName))
hTableDescriptor.addFamily(hColumnDescriptor)
hBaseAdmin.createTable(hTableDescriptor)


At this point, MapRDB table will be created.


df.write.options(Map(HBaseTableCatalog.tableCatalog -> cata, HBaseTableCatalog.newTable -> "1")).format("org.apache.hadoop.hbase.spark").save()

At this point, data will be inserted to MapRDB


val df2 = spark.read.options(Map(HBaseTableCatalog.tableCatalog -> cata)).format("org.apache.hadoop.hbase.spark").load()
df2.show()

+-------+----------+-------+
|Column2|rowKeyData|Column1|
+-------+----------+-------+
|   Col2|         1|   Col1|
+-------+----------+-------+


The example was verified for MapRDB tables with

mapr-spark-2.1.0.201706271156-1.noarch
mapr-hive-2.1.201706231053-1.noarch
mapr-hbase-1.1.8.201704031229-1.noarch

Wednesday, 19 July 2017

HBase - Identifying and moving corrupted HFiles


Identifying corrupted files:


HBase provides a utility to check for any corrupted files in HBase.
hbase hbck -checkCorruptHFiles

The above command will check the entire HBase tables for any corrupted files.
To check corrupted in specific table use the following command:
hbase hbck -checkCorruptHFiles <table_name>

If any files are corrupted, then we will have something similar to following in the output log:

Checked 17 hfile for corruption
  HFiles corrupted:                  2
    HFiles moved while checking:     0
Summary: CORRUPTED


The logs also contain details about which files are corrupted.
Another way of checking whether a file is corrupted or not is using the following command:
hbase org.apache.hadoop.hbase.io.hfile.HFile -f <path_to_hfile> 


Sidelining corrupted files:


HBase provides a utility sidelining corrupted files in HBase.
hbase hbck -sidelineCorruptHFiles

The above command will check the entire HBase tables.
To apply on specific table use the following command:
hbase hbck -sidelineCorruptHFiles <table_name>

You will see similar information in the command output log:

Checked 17 hfile for corruption
  HFiles corrupted:                  2
    HFiles successfully quarantined: 2
      maprfs:/hbase/corrupt/hcrt/1090c602c005a4ca76fda4ec7bd2865c/f/97fcf7fee25c469a81e7a0aa567a4627
      maprfs:/hbase/corrupt/hcrt/1090c602c005a4ca76fda4ec7bd2865c/f/97fcf7fee25c469a81e7a0aa567a4628
    HFiles failed quarantine:        0
    HFiles moved while checking:     0
Summary: CORRUPTED => OK


The corrupted files are moved to '/hbase/corrupt' folder.

Monday, 17 July 2017

HBase Compaction - Part 2 


Following is a study of parameters that control minor and major compaction in HBase.

Minor compaction 

The default minor compaction algorithm depend on the following parameters:
[1] hbase.hstore.compaction.min – Minimum number of StoreFiles to be selected for a compaction to occur. Defaults to 3 Hbase 1.1.8.
StoreFiles – files produced by memstore flush
StoreFile contains many metadata information, some of the important ones are:
MAX_SEQ_ID_KEY – maximum sequence ID in FileInfo
MAJOR_COMPACTION_KEY – Major compaction flag info
EXCLUDE_FROM_MINOR_COMPACTION_KEY  – Major compaction flag info
HFILE_NAME_REGEX - HFiles are uuid ([0-9a-z]+). Bulk loaded hfiles has (_SeqId_[0-9]+_) has suffix.
[2] hbase.hstore.compaction.max – maximum number of storefiles to be compacted in each minor compaction. Defaults to 10.
[3] hbase.hstore.compaction.min.size – any file that is smaller that this will be a candidate for compaction.
[4] hbase.hstore.compaction.max.size – any file greater than this is automatically excluded from compaction - (by default it is LONG.MAX_VALUE)
[5] hbase.store.compaction.ratio – default is 1.2f


The simple formula for selection of a file for minor compaction is :
selects a file for compaction when the file size <= sum(smaller_files_size) * hbase.hstore.compaction.ratio.


Example from HBase official documentation:
The following StoreFiles exist: 100, 50, 23, 12, and 12 bytes apiece (oldest to newest). With the above parameters, the files that would be selected for minor compaction are 23, 12, and 12.
Why?
Remember the logic
selects a file for compaction when the file size <= sum(smaller_files_size) * hbase.hstore.compaction.ratio.
    100 --> No, because sum(50, 23, 12, 12) * 1.0 = 97.
    50 --> No, because sum(23, 12, 12) * 1.0 = 47.
    23 --> Yes, because sum(12, 12) * 1.0 = 24.
    12 --> Yes, because the previous file has been included, and because this does not exceed the the max-file limit of 5
    12 --> Yes, because the previous file had been included, and because this does not exceed the the max-file limit of 5.



Following log snippet shows HBase regionserver logs during minor compaction or shortcompactions:
It provide following information:
[1] Shows which table, which column family, which region is undergoing compaction.
[2] Number of files compacted.
[3] Total size of file for compaction. Sum of individual files undergoing compaction.
[4] Shows total file size after compaction is completed.
[5] Time taken for minor compaction.


Minor compaction logs: (Table name is 'hb' with column family 'c') 

2017-07-10 17:08:13,967 INFO  [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499720893966] regionserver.HRegion: Starting compaction on c in region hb,,1499720284228.0f0486e029334542705e66f401fa698b.
2017-07-10 17:08:13,968 INFO  [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499720893966] regionserver.HStore: Starting compaction of 3 file(s) in c of hb,,1499720284228.0f0486e029334542705e66f401fa698b. into tmpdir=maprfs:/hbase/data/default/hb/0f0486e029334542705e66f401fa698b/.tmp, totalSize=14.7 K
2017-07-10 17:08:13,980 INFO  [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499720893966] hfile.CacheConfig: blockCache=LruBlockCache{blockCount=2, currentSize=1291688, freeSize=1249607128, maxSize=1250898816, heapSize=1291688, minSize=1188353920, minFactor=0.95, multiSize=594176960, multiFactor=0.5, singleSize=297088480, singleFactor=0.25}, cacheDataOnRead=true, cacheDataOnWrite=false, cacheIndexesOnWrite=false, cacheBloomsOnWrite=false, cacheEvictOnClose=false, cacheDataCompressed=false, prefetchOnOpen=false
2017-07-10 17:08:14,109 INFO  [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499720893966] regionserver.HStore: Completed compaction of 3 (all) file(s) in c of hb,,1499720284228.0f0486e029334542705e66f401fa698b. into 4cc6a50eb38d4ef2844a3339bcdfe11d(size=5.0 K), total size for store is 5.0 K. This selection was in queue for 0sec, and took 0sec to execute.
2017-07-10 17:08:14,113 INFO  [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499720893966] regionserver.CompactSplitThread: Completed compaction: Request = regionName=hb,,1499720284228.0f0486e029334542705e66f401fa698b., storeName=c, fileCount=3, fileSize=14.7 K, priority=1, time=4222857501044128; duration=0sec


Major Compaction:


Major compaction depend on the following parameters:
[1] hbase.hregion.majorcompaction - Default value is 604800000 (7 days)
The time interval between each major compaction. Setting this to 0 will disable time based major compaction. 
Sometimes, minor compactions can be promoted to major compaction.

[2] Off peak hour compactions:
Identifying peak hour of your cluster will help in notifying HBase not to do heavy minor compactions during the busy hours. 
For this from HBase 1.2+ onward, there are following parameters:
[a] hbase.hstore.compaction.max.size.offpeak – sets a value for the largest file that can be used for compaction
[b] hbase.offpeak.start.hour= 0..23 (specify start hour)
[c] hbase.offpeak.end.hour= 0..23 (specify end hour)
The hstore compaction ratio is by default 1.2 for peak hours. For offpeak hours, it is 5.
Both the values can be adjusted using the following parameters:
[a] hbase.hstore.compaction.ratio
[b] hbase.hstore.compaction.ratio.offpeak

[3] hbase.hregion.majorcompaction.jitter
Compactions are carried out by regionservers. Inorder to make sure that all regionserver does not do major compaction at the same time, we have this jitter parameter. 
By default the value is 0.5. 0.5 is the maximum value of outer bound. hbase.hregion.majorcompaction is multiplied by this some fraction that will be inside this jitter value and then added/subtracted to determine when to run the next major compaction.


Following log snippet shows HBase regionserver logs during major compaction or largecompactions:
It provide following information:
[1] Displays the table and region undergoing major compaction.
[2] If the major compaction is triggered manually, then minor compaction is called internally.
[3] Intermittently the store file will be stored inside the .tmp folder.
[4]Provides information about the number of files under compaction, total size of new file generated, time taken for the compaction.


Major compaction logs: (Table name is 'hb' with column family 'c') 

2017-07-10 21:10:55,158 INFO  [PriorityRpcServer.handler=1,queue=1,port=16020] regionserver.RSRpcServices: Compacting hb,,1499720284228.0f0486e029334542705e66f401fa698b.
2017-07-10 21:10:55,159 DEBUG [PriorityRpcServer.handler=1,queue=1,port=16020] compactions.RatioBasedCompactionPolicy: Selecting compaction from 2 store files, 0 compacting, 2 eligible, 10 blocking
2017-07-10 21:10:55,159 DEBUG [PriorityRpcServer.handler=1,queue=1,port=16020] regionserver.HStore: 0f0486e029334542705e66f401fa698b - c: Initiating major compaction (all files)
2017-07-10 21:10:55,159 DEBUG [PriorityRpcServer.handler=1,queue=1,port=16020] regionserver.CompactSplitThread: Small Compaction requested: org.apache.hadoop.hbase.regionserver.DefaultStoreEngine$DefaultCompactionContext@1ffa895a; Because: User-triggered major compaction; compaction_queue=(0:1), split_queue=0, merge_queue=        0
2017-07-10 21:10:55,159 INFO  [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] regionserver.HRegion: Starting compaction on c in region hb,,1499720284228.0f0486e029334542705e66f401fa698b.
2017-07-10 21:10:55,160 INFO  [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] regionserver.HStore: Starting compaction of 2 file(s) in c of hb,,1499720284228.0f0486e029334542705e66f401fa698b. into tmpdir=maprfs:/hbase/data/default/hb/0f0486e029334542705e66f401fa698b/.tmp, totalSize=10.9 K
2017-07-10 21:10:55,162 DEBUG [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] compactions.Compactor: Compacting maprfs:/hbase/data/default/hb/0f0486e029334542705e66f401fa698b/c/0daeafd75a4c4ba4a53172a73b9ca4b0, keycount=41, bloomtype=ROW, size=6.0 K, encoding=NONE, seqNum=174, earliestPutTs=1499720300277
2017-07-10 21:10:55,164 DEBUG [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] compactions.Compactor: Compacting maprfs:/hbase/data/default/hb/0f0486e029334542705e66f401fa698b/c/bfeea9e43bc347ec84863bdd4476e270, keycount=2, bloomtype=ROW, size=4.9 K, encoding=NONE, seqNum=182, earliestPutTs=1499735424849
2017-07-10 21:10:55,165 INFO  [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] hfile.CacheConfig: blockCache=LruBlockCache{blockCount=3, currentSize=1308744, freeSize=1249590072, maxSize=1250898816, heapSize=1308744, minSize=1188353920, minFactor=0.95, multiSize=594176960, multiFactor=0.5, singleSize=297088480, singleFactor=0.25}, cacheDataOnRead=true, cacheDataOnWrite=false, cacheIndexesOnWrite=false, cacheBloomsOnWrite=false, cacheEvictOnClose=false, cacheDataCompressed=false, prefetchOnOpen=false
2017-07-10 21:10:55,191 DEBUG [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] regionserver.HRegionFileSystem: Committing store file maprfs:/hbase/data/default/hb/0f0486e029334542705e66f401fa698b/.tmp/da9d2b5454e640769a9b20c82124a010 as maprfs:/hbase/data/default/hb/0f0486e029334542705e66f401fa698b        /c/da9d2b5454e640769a9b20c82124a010
2017-07-10 21:10:55,208 DEBUG [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] regionserver.HStore: Removing store files after compaction...
2017-07-10 21:10:55,219 DEBUG [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] backup.HFileArchiver: Archiving compacted store files.
2017-07-10 21:10:55,231 DEBUG [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] backup.HFileArchiver: Finished archiving from class org.apache.hadoop.hbase.backup.HFileArchiver$FileableStoreFile, file:maprfs:/hbase/data/default/hb/0f0486e029334542705e66f401fa698b/c/0daeafd75a4c4ba4a53172a73b9ca4b0, to maprfs:/hbase/archive/data/default/hb/0f0486e029334542705e66f401fa698b/c/0daeafd75a4c4ba4a53172a73b9ca4b0
2017-07-10 21:10:55,243 DEBUG [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] backup.HFileArchiver: Finished archiving from class org.apache.hadoop.hbase.backup.HFileArchiver$FileableStoreFile, file:maprfs:/hbase/data/default/hb/0f0486e029334542705e66f401fa698b/c/bfeea9e43bc347ec84863bdd4476e270, to maprfs:/hbase/archive/data/default/hb/0f0486e029334542705e66f401fa698b/c/bfeea9e43bc347ec84863bdd4476e270
2017-07-10 21:10:55,244 INFO  [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] regionserver.HStore: Completed major compaction of 2 (all) file(s) in c of hb,,1499720284228.0f0486e029334542705e66f401fa698b. into da9d2b5454e640769a9b20c82124a010(size=6.1 K), total size for store is 6.1 K. This selection was in queue for 0sec, and took 0sec to execute.
2017-07-10 21:10:55,246 INFO  [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] regionserver.CompactSplitThread: Completed compaction: Request = regionName=hb,,1499720284228.0f0486e029334542705e66f401fa698b., storeName=c, fileCount=2, fileSize=10.9 K, priority=1, time=4237418695815609; duration=0sec
2017-07-10 21:10:55,246 DEBUG [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] regionserver.CompactSplitThread: CompactSplitThread Status : compaction_queue=(0:0), split_queue=0, merge_queue=0




Thursday, 13 July 2017


HBase Compaction - Part 1 - Why we need them?


HBase has two types of compaction. Minor compaction and Major compaction.


Why we need minor compaction?


When you insert some data to HBase, in normal write path (the other being bulk load, which does not go through normal write path), the data goes to WAL - Write Ahead Log. 
Regionserver reads the data and puts to its memstore. When data in memstore reaches a threshold, it is flushed to disk generating storefiles.
The threshold for memstore is normally in MBs. Hence, memstore flush will generate large number of small storefiles.
This will in turn cause performance issues while read operation. Hence we have minor compactions. 
Ideally minor compaction deals with small sized files. Minor compaction kicks in automatically when certain conditions are met (Discussed in HBase Compaction - Part 2).
It is not recommended to turn off minor compaction.

StoreFile contains many metadata information, some of the important ones are:
[1] MAX_SEQ_ID_KEY – maximum sequence ID in FileInfo
[2] MAJOR_COMPACTION_KEY – Major compaction flag info
[3] EXCLUDE_FROM_MINOR_COMPACTION_KEY  – Major compaction flag info
[4] HFILE_NAME_REGEX - HFiles are uuid ([0-9a-z]+). Bulk loaded hfiles has (_SeqId_[0-9]+_) has suffix.


Why we need major compaction?


Minor compaction deals with small files, while major compaction deals with larger files.
Major compaction combines multiple storefiles and generate one big HFile.
Also, when we update or delete a row in HBase table, HBase just write a deletion marker and masks the data from being accessed in further operations.
During major compaction, HBase reads each file and removes all the data marked for deletion.
This is impoertant, since if it is not performed it will eat up the disk space and will reduce the performance.
Hence major compactions. As of HBase 1.1.8, major compaction automatically happens once in 7 days unless turned off manually.
In busy clusters it is recommended to turn off automatic major compaction and trigger it manually at off peak hours.

Major compaction will read the complete table data and writes into a new file.
As a result it requires at least one full read and one full write of table data.
The two main benefits of major compaction are:
[1] Saving disk space by removing the expired and deleted data
[2] Improve the read performance by increasing disk seek performance

Wednesday, 10 May 2017


SQOOP Import failure to Hive parquet table



Sample query that fails:


sqoop import --connect <connection_string> --query "select * from alwin_test where \$CONDITIONS"  --create-hive-table --hive-import --hive-table <hive_table_name> --username <uname> -P --target-dir '/user/mapr/oracleimport' -m 1 --as-parquetfile

However the following will succeed:

sqoop import --connect <connection_string> --query "select * from alwin_test where \$CONDITIONS"  --create-hive-table --hive-import --hive-table <hive_table_name> --username <uname> -P --target-dir '/user/mapr/oracleimport' -m 1

Error thrown:


ERROR sqoop.Sqoop: Got exception running Sqoop: org.apache.avro.SchemaParseException: org.codehaus.jackson.JsonParseException: Unexpected end-out: was expecting closing quote for a string value 
at [Source: java.io.StringReader@13ae2297; line: 1, column: 6001] 
org.apache.avro.SchemaParseException: org.codehaus.jackson.JsonParseException: Unexpected end-of-input: was expecting closing quote for a string value 
at [Source: java.io.StringReader@13ae2297; line: 1, column: 6001] 
at org.apache.avro.Schema$Parser.parse(Schema.java:955) 
at org.apache.avro.Schema$Parser.parse(Schema.java:943) 
at org.kitesdk.data.DatasetDescriptor$Builder.schemaLiteral(DatasetDescriptor.java:463) 
at org.kitesdk.data.spi.hive.HiveUtils.descriptorForTable(HiveUtils.java:157) 
at org.kitesdk.data.spi.hive.HiveAbstractMetadataProvider.load(HiveAbstractMetadataProvider.java:104) 
at org.kitesdk.data.spi.filesystem.FileSystemDatasetRepository.load(FileSystemDatasetRepository.java:197) 
at org.kitesdk.data.spi.hive.HiveManagedDatasetRepository.create(HiveManagedDatasetRepository.java:82) 
at org.kitesdk.data.Datasets.create(Datasets.java:239) 
at org.kitesdk.data.Datasets.create(Datasets.java:307) 
at org.apache.sqoop.mapreduce.ParquetJob.createDataset(ParquetJob.java:107) 
at org.apache.sqoop.mapreduce.ParquetJob.configureImportJob(ParquetJob.java:89) 
at org.apache.sqoop.mapreduce.DataDrivenImportJob.configureMapper(DataDrivenImportJob.java:108) 
at org.apache.sqoop.mapreduce.ImportJobBase.runImport(ImportJobBase.java:260) 
at org.apache.sqoop.manager.SqlManager.importTable(SqlManager.java:673) 
at org.apache.sqoop.manager.OracleManager.importTable(OracleManager.java:444) 
at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:497) 
at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:606) 
at org.apache.sqoop.Sqoop.run(Sqoop.java:143) 
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70) 
at org.apache.sqoop.Sqoop.runSqoop(Sqoop.java:179) 
at org.apache.sqoop.Sqoop.runTool(Sqoop.java:218) 
at org.apache.sqoop.Sqoop.runTool(Sqoop.java:227) 
at org.apache.sqoop.Sqoop.main(Sqoop.java:236) 
Caused by: org.codehaus.jackson.JsonParseException: Unexpected end-of-input: was expecting closing quote for a string value 


Root Cause:


The issue is because of limitation on the number of characters that can be stored in TBLPROPERTIES in hive metastore.

Inside hive metastore db:

mysql> describe TABLE_PARAMS;
+-------------+----------------+------+-----+---------+-------+
| Field       | Type           | Null | Key | Default | Extra |
+-------------+----------------+------+-----+---------+-------+
| TBL_ID      | bigint(20)     | NO   | PRI | NULL    |       |
| PARAM_KEY   | varchar(256)   | NO   | PRI | NULL    |       |
| PARAM_VALUE | varchar(4000)  | YES  |     | NULL    |       |
+-------------+----------------+------+-----+---------+-------+
3 rows in set (0.00 sec)


The default type for PARAM_VALUE is varchar(4000).
In this case, when hive parquet table is created, table property 'avro.schema.literal' is truncated to 4000 characters which results invalid JSON.
This will cause JSON parse exception resulting SQOOP import failure.


Resolution:


The solution is to either limit the character length to 4000 or increase character length for 'PARAM_VALUE'.

Following are the solutions:

[1] Increase character length for 'PARAM_VALUE'.
For example:
mysql> describe TABLE_PARAMS;
+-------------+----------------+------+-----+---------+-------+
| Field       | Type           | Null | Key | Default | Extra |
+-------------+----------------+------+-----+---------+-------+
| TBL_ID      | bigint(20)     | NO   | PRI | NULL    |       |
| PARAM_KEY   | varchar(256)   | NO   | PRI | NULL    |       |
| PARAM_VALUE | varchar(10000) | YES  |     | NULL    |       |
+-------------+----------------+------+-----+---------+-------+
3 rows in set (0.00 sec)

OR [2] Instead of using "select * from alwin_test where \$CONDITIONS" in SQOOP import, we can use alias name to reduce column name length and hence reduce character length for 'avro.schema.literal'

OR [3] Create a view of Oracle (source) tables with much smaller column name and then do SQOOP import on the view created.