Friday, 28 July 2017

Issue with using nohup on sqlline - DRILL


Problem:


Following is a normal way of query execution in DRILL:

[mapr@vm52-146 ~]$ sqlline -u "jdbc:drill:" -n <user> -p <password> --run=test.drill
1/1          show schemas;
+---------------------+
|     SCHEMA_NAME     |
+---------------------+
| INFORMATION_SCHEMA  |
| cp.default          |
| dfs.default         |
| dfs.root            |
| dfs.tmp             |
| sys                 |
+---------------------+
6 rows selected (1.773 seconds)
Closing: org.apache.drill.jdbc.impl.DrillConnectionImpl
apache drill 1.10.0
"a drill is a terrible thing to waste"

Or, the same can be executed as a script:

[mapr@vm52-146 ~]$ nano test.sh
[mapr@vm52-146 ~]$ cat test.sh
sqlline -u "jdbc:drill:" -n <user> -p <password> --run=test.drill > /tmp/result.txt
[mapr@vm52-146 ~]$ chmod +x test.sh
[mapr@vm52-146 ~]$ sh test.sh
1/1          show schemas;
6 rows selected (0.957 seconds)
Closing: org.apache.drill.jdbc.impl.DrillConnectionImpl
apache drill 1.10.0
"a little sql for your nosql"
[mapr@vm52-146 ~]$ cat /tmp/result.txt
+---------------------+
|     SCHEMA_NAME     |
+---------------------+
| INFORMATION_SCHEMA  |
| cp.default          |
| dfs.default         |
| dfs.root            |
| dfs.tmp             |
| sys                 |
+---------------------+

However, above methods do not return result if executed with nohup.

[mapr@vm52-146 ~]$ nohup ./test.sh &
[4] 24554
[mapr@vm52-146 ~]$ nohup: ignoring input and appending output to `nohup.out'


[4]+  Stopped                 nohup ./test.sh
[mapr@vm52-146 ~]$ cat /tmp/result.txt
[mapr@vm52-146 ~]$

Solution:

Add “-Djline.terminal=jline.UnsupportedTerminal” property in SQLLINE_JAVA_OPTS inside drill-env.sh

Wednesday, 26 July 2017

Example scala code for creating MapRDB table and inserting data from Spark-Shell


import org.apache.hadoop.hbase.client.{HBaseAdmin, Put}
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.util.Bytes.toBytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog

val spark = SparkSession.builder().appName("MapRDBConnectorExample").enableHiveSupport().getOrCreate()
val config = HBaseConfiguration.create()
val hBaseAdmin = new HBaseAdmin(config)
val hbaseContext = new HBaseContext(spark.sparkContext, config)

import spark.sql

sql("CREATE TABLE IF NOT EXISTS HiveTable (rowKeyData INT, Column1 STRING, Column2 STRING)")
sql("INSERT INTO TABLE HiveTable VALUES(1, 'Col1', 'Col2')")


Once the above action is performed, we will see following in Hive:
hive> select * from hivetable;
OK
1       Col1    Col2
Time taken: 0.14 seconds, Fetched: 1 row(s)

val df = sql("SELECT * FROM HiveTable").toDF()

val MapRDBTableName = "/tmp/SparkMapRDBTable"
val columnFamilyName = "cf"
val cata =s"""{"table":{"namespace":"default", "name":"$MapRDBTableName"},"rowkey":"key","columns":{"rowKeyData":{"cf":"rowkey", "col":"key", "type":"int"},"Column1":{"cf":"$columnFamilyName", "col":"Column1", "type":"string"},"Column2":{"cf":"$columnFamilyName", "col":"Column2", "type":"string"}}}""".stripMargin

val hTableDescriptor = new HTableDescriptor(toBytes(MapRDBTableName))
val hColumnDescriptor = new HColumnDescriptor(toBytes(columnFamilyName))
hTableDescriptor.addFamily(hColumnDescriptor)
hBaseAdmin.createTable(hTableDescriptor)


At this point, MapRDB table will be created.


df.write.options(Map(HBaseTableCatalog.tableCatalog -> cata, HBaseTableCatalog.newTable -> "1")).format("org.apache.hadoop.hbase.spark").save()

At this point, data will be inserted to MapRDB


val df2 = spark.read.options(Map(HBaseTableCatalog.tableCatalog -> cata)).format("org.apache.hadoop.hbase.spark").load()
df2.show()

+-------+----------+-------+
|Column2|rowKeyData|Column1|
+-------+----------+-------+
|   Col2|         1|   Col1|
+-------+----------+-------+


The example was verified for MapRDB tables with

mapr-spark-2.1.0.201706271156-1.noarch
mapr-hive-2.1.201706231053-1.noarch
mapr-hbase-1.1.8.201704031229-1.noarch

Wednesday, 19 July 2017

HBase - Identifying and moving corrupted HFiles


Identifying corrupted files:


HBase provides a utility to check for any corrupted files in HBase.
hbase hbck -checkCorruptHFiles

The above command will check the entire HBase tables for any corrupted files.
To check corrupted in specific table use the following command:
hbase hbck -checkCorruptHFiles <table_name>

If any files are corrupted, then we will have something similar to following in the output log:

Checked 17 hfile for corruption
  HFiles corrupted:                  2
    HFiles moved while checking:     0
Summary: CORRUPTED


The logs also contain details about which files are corrupted.
Another way of checking whether a file is corrupted or not is using the following command:
hbase org.apache.hadoop.hbase.io.hfile.HFile -f <path_to_hfile> 


Sidelining corrupted files:


HBase provides a utility sidelining corrupted files in HBase.
hbase hbck -sidelineCorruptHFiles

The above command will check the entire HBase tables.
To apply on specific table use the following command:
hbase hbck -sidelineCorruptHFiles <table_name>

You will see similar information in the command output log:

Checked 17 hfile for corruption
  HFiles corrupted:                  2
    HFiles successfully quarantined: 2
      maprfs:/hbase/corrupt/hcrt/1090c602c005a4ca76fda4ec7bd2865c/f/97fcf7fee25c469a81e7a0aa567a4627
      maprfs:/hbase/corrupt/hcrt/1090c602c005a4ca76fda4ec7bd2865c/f/97fcf7fee25c469a81e7a0aa567a4628
    HFiles failed quarantine:        0
    HFiles moved while checking:     0
Summary: CORRUPTED => OK


The corrupted files are moved to '/hbase/corrupt' folder.

Monday, 17 July 2017

HBase Compaction - Part 2 


Following is a study of parameters that control minor and major compaction in HBase.

Minor compaction 

The default minor compaction algorithm depend on the following parameters:
[1] hbase.hstore.compaction.min – Minimum number of StoreFiles to be selected for a compaction to occur. Defaults to 3 Hbase 1.1.8.
StoreFiles – files produced by memstore flush
StoreFile contains many metadata information, some of the important ones are:
MAX_SEQ_ID_KEY – maximum sequence ID in FileInfo
MAJOR_COMPACTION_KEY – Major compaction flag info
EXCLUDE_FROM_MINOR_COMPACTION_KEY  – Major compaction flag info
HFILE_NAME_REGEX - HFiles are uuid ([0-9a-z]+). Bulk loaded hfiles has (_SeqId_[0-9]+_) has suffix.
[2] hbase.hstore.compaction.max – maximum number of storefiles to be compacted in each minor compaction. Defaults to 10.
[3] hbase.hstore.compaction.min.size – any file that is smaller that this will be a candidate for compaction.
[4] hbase.hstore.compaction.max.size – any file greater than this is automatically excluded from compaction - (by default it is LONG.MAX_VALUE)
[5] hbase.store.compaction.ratio – default is 1.2f


The simple formula for selection of a file for minor compaction is :
selects a file for compaction when the file size <= sum(smaller_files_size) * hbase.hstore.compaction.ratio.


Example from HBase official documentation:
The following StoreFiles exist: 100, 50, 23, 12, and 12 bytes apiece (oldest to newest). With the above parameters, the files that would be selected for minor compaction are 23, 12, and 12.
Why?
Remember the logic
selects a file for compaction when the file size <= sum(smaller_files_size) * hbase.hstore.compaction.ratio.
    100 --> No, because sum(50, 23, 12, 12) * 1.0 = 97.
    50 --> No, because sum(23, 12, 12) * 1.0 = 47.
    23 --> Yes, because sum(12, 12) * 1.0 = 24.
    12 --> Yes, because the previous file has been included, and because this does not exceed the the max-file limit of 5
    12 --> Yes, because the previous file had been included, and because this does not exceed the the max-file limit of 5.



Following log snippet shows HBase regionserver logs during minor compaction or shortcompactions:
It provide following information:
[1] Shows which table, which column family, which region is undergoing compaction.
[2] Number of files compacted.
[3] Total size of file for compaction. Sum of individual files undergoing compaction.
[4] Shows total file size after compaction is completed.
[5] Time taken for minor compaction.


Minor compaction logs: (Table name is 'hb' with column family 'c') 

2017-07-10 17:08:13,967 INFO  [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499720893966] regionserver.HRegion: Starting compaction on c in region hb,,1499720284228.0f0486e029334542705e66f401fa698b.
2017-07-10 17:08:13,968 INFO  [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499720893966] regionserver.HStore: Starting compaction of 3 file(s) in c of hb,,1499720284228.0f0486e029334542705e66f401fa698b. into tmpdir=maprfs:/hbase/data/default/hb/0f0486e029334542705e66f401fa698b/.tmp, totalSize=14.7 K
2017-07-10 17:08:13,980 INFO  [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499720893966] hfile.CacheConfig: blockCache=LruBlockCache{blockCount=2, currentSize=1291688, freeSize=1249607128, maxSize=1250898816, heapSize=1291688, minSize=1188353920, minFactor=0.95, multiSize=594176960, multiFactor=0.5, singleSize=297088480, singleFactor=0.25}, cacheDataOnRead=true, cacheDataOnWrite=false, cacheIndexesOnWrite=false, cacheBloomsOnWrite=false, cacheEvictOnClose=false, cacheDataCompressed=false, prefetchOnOpen=false
2017-07-10 17:08:14,109 INFO  [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499720893966] regionserver.HStore: Completed compaction of 3 (all) file(s) in c of hb,,1499720284228.0f0486e029334542705e66f401fa698b. into 4cc6a50eb38d4ef2844a3339bcdfe11d(size=5.0 K), total size for store is 5.0 K. This selection was in queue for 0sec, and took 0sec to execute.
2017-07-10 17:08:14,113 INFO  [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499720893966] regionserver.CompactSplitThread: Completed compaction: Request = regionName=hb,,1499720284228.0f0486e029334542705e66f401fa698b., storeName=c, fileCount=3, fileSize=14.7 K, priority=1, time=4222857501044128; duration=0sec


Major Compaction:


Major compaction depend on the following parameters:
[1] hbase.hregion.majorcompaction - Default value is 604800000 (7 days)
The time interval between each major compaction. Setting this to 0 will disable time based major compaction. 
Sometimes, minor compactions can be promoted to major compaction.

[2] Off peak hour compactions:
Identifying peak hour of your cluster will help in notifying HBase not to do heavy minor compactions during the busy hours. 
For this from HBase 1.2+ onward, there are following parameters:
[a] hbase.hstore.compaction.max.size.offpeak – sets a value for the largest file that can be used for compaction
[b] hbase.offpeak.start.hour= 0..23 (specify start hour)
[c] hbase.offpeak.end.hour= 0..23 (specify end hour)
The hstore compaction ratio is by default 1.2 for peak hours. For offpeak hours, it is 5.
Both the values can be adjusted using the following parameters:
[a] hbase.hstore.compaction.ratio
[b] hbase.hstore.compaction.ratio.offpeak

[3] hbase.hregion.majorcompaction.jitter
Compactions are carried out by regionservers. Inorder to make sure that all regionserver does not do major compaction at the same time, we have this jitter parameter. 
By default the value is 0.5. 0.5 is the maximum value of outer bound. hbase.hregion.majorcompaction is multiplied by this some fraction that will be inside this jitter value and then added/subtracted to determine when to run the next major compaction.


Following log snippet shows HBase regionserver logs during major compaction or largecompactions:
It provide following information:
[1] Displays the table and region undergoing major compaction.
[2] If the major compaction is triggered manually, then minor compaction is called internally.
[3] Intermittently the store file will be stored inside the .tmp folder.
[4]Provides information about the number of files under compaction, total size of new file generated, time taken for the compaction.


Major compaction logs: (Table name is 'hb' with column family 'c') 

2017-07-10 21:10:55,158 INFO  [PriorityRpcServer.handler=1,queue=1,port=16020] regionserver.RSRpcServices: Compacting hb,,1499720284228.0f0486e029334542705e66f401fa698b.
2017-07-10 21:10:55,159 DEBUG [PriorityRpcServer.handler=1,queue=1,port=16020] compactions.RatioBasedCompactionPolicy: Selecting compaction from 2 store files, 0 compacting, 2 eligible, 10 blocking
2017-07-10 21:10:55,159 DEBUG [PriorityRpcServer.handler=1,queue=1,port=16020] regionserver.HStore: 0f0486e029334542705e66f401fa698b - c: Initiating major compaction (all files)
2017-07-10 21:10:55,159 DEBUG [PriorityRpcServer.handler=1,queue=1,port=16020] regionserver.CompactSplitThread: Small Compaction requested: org.apache.hadoop.hbase.regionserver.DefaultStoreEngine$DefaultCompactionContext@1ffa895a; Because: User-triggered major compaction; compaction_queue=(0:1), split_queue=0, merge_queue=        0
2017-07-10 21:10:55,159 INFO  [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] regionserver.HRegion: Starting compaction on c in region hb,,1499720284228.0f0486e029334542705e66f401fa698b.
2017-07-10 21:10:55,160 INFO  [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] regionserver.HStore: Starting compaction of 2 file(s) in c of hb,,1499720284228.0f0486e029334542705e66f401fa698b. into tmpdir=maprfs:/hbase/data/default/hb/0f0486e029334542705e66f401fa698b/.tmp, totalSize=10.9 K
2017-07-10 21:10:55,162 DEBUG [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] compactions.Compactor: Compacting maprfs:/hbase/data/default/hb/0f0486e029334542705e66f401fa698b/c/0daeafd75a4c4ba4a53172a73b9ca4b0, keycount=41, bloomtype=ROW, size=6.0 K, encoding=NONE, seqNum=174, earliestPutTs=1499720300277
2017-07-10 21:10:55,164 DEBUG [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] compactions.Compactor: Compacting maprfs:/hbase/data/default/hb/0f0486e029334542705e66f401fa698b/c/bfeea9e43bc347ec84863bdd4476e270, keycount=2, bloomtype=ROW, size=4.9 K, encoding=NONE, seqNum=182, earliestPutTs=1499735424849
2017-07-10 21:10:55,165 INFO  [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] hfile.CacheConfig: blockCache=LruBlockCache{blockCount=3, currentSize=1308744, freeSize=1249590072, maxSize=1250898816, heapSize=1308744, minSize=1188353920, minFactor=0.95, multiSize=594176960, multiFactor=0.5, singleSize=297088480, singleFactor=0.25}, cacheDataOnRead=true, cacheDataOnWrite=false, cacheIndexesOnWrite=false, cacheBloomsOnWrite=false, cacheEvictOnClose=false, cacheDataCompressed=false, prefetchOnOpen=false
2017-07-10 21:10:55,191 DEBUG [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] regionserver.HRegionFileSystem: Committing store file maprfs:/hbase/data/default/hb/0f0486e029334542705e66f401fa698b/.tmp/da9d2b5454e640769a9b20c82124a010 as maprfs:/hbase/data/default/hb/0f0486e029334542705e66f401fa698b        /c/da9d2b5454e640769a9b20c82124a010
2017-07-10 21:10:55,208 DEBUG [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] regionserver.HStore: Removing store files after compaction...
2017-07-10 21:10:55,219 DEBUG [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] backup.HFileArchiver: Archiving compacted store files.
2017-07-10 21:10:55,231 DEBUG [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] backup.HFileArchiver: Finished archiving from class org.apache.hadoop.hbase.backup.HFileArchiver$FileableStoreFile, file:maprfs:/hbase/data/default/hb/0f0486e029334542705e66f401fa698b/c/0daeafd75a4c4ba4a53172a73b9ca4b0, to maprfs:/hbase/archive/data/default/hb/0f0486e029334542705e66f401fa698b/c/0daeafd75a4c4ba4a53172a73b9ca4b0
2017-07-10 21:10:55,243 DEBUG [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] backup.HFileArchiver: Finished archiving from class org.apache.hadoop.hbase.backup.HFileArchiver$FileableStoreFile, file:maprfs:/hbase/data/default/hb/0f0486e029334542705e66f401fa698b/c/bfeea9e43bc347ec84863bdd4476e270, to maprfs:/hbase/archive/data/default/hb/0f0486e029334542705e66f401fa698b/c/bfeea9e43bc347ec84863bdd4476e270
2017-07-10 21:10:55,244 INFO  [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] regionserver.HStore: Completed major compaction of 2 (all) file(s) in c of hb,,1499720284228.0f0486e029334542705e66f401fa698b. into da9d2b5454e640769a9b20c82124a010(size=6.1 K), total size for store is 6.1 K. This selection was in queue for 0sec, and took 0sec to execute.
2017-07-10 21:10:55,246 INFO  [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] regionserver.CompactSplitThread: Completed compaction: Request = regionName=hb,,1499720284228.0f0486e029334542705e66f401fa698b., storeName=c, fileCount=2, fileSize=10.9 K, priority=1, time=4237418695815609; duration=0sec
2017-07-10 21:10:55,246 DEBUG [regionserver/vm52/10.10.XX.XX:16020-shortCompactions-1499725825449] regionserver.CompactSplitThread: CompactSplitThread Status : compaction_queue=(0:0), split_queue=0, merge_queue=0




Thursday, 13 July 2017


HBase Compaction - Part 1 - Why we need them?


HBase has two types of compaction. Minor compaction and Major compaction.


Why we need minor compaction?


When you insert some data to HBase, in normal write path (the other being bulk load, which does not go through normal write path), the data goes to WAL - Write Ahead Log. 
Regionserver reads the data and puts to its memstore. When data in memstore reaches a threshold, it is flushed to disk generating storefiles.
The threshold for memstore is normally in MBs. Hence, memstore flush will generate large number of small storefiles.
This will in turn cause performance issues while read operation. Hence we have minor compactions. 
Ideally minor compaction deals with small sized files. Minor compaction kicks in automatically when certain conditions are met (Discussed in HBase Compaction - Part 2).
It is not recommended to turn off minor compaction.

StoreFile contains many metadata information, some of the important ones are:
[1] MAX_SEQ_ID_KEY – maximum sequence ID in FileInfo
[2] MAJOR_COMPACTION_KEY – Major compaction flag info
[3] EXCLUDE_FROM_MINOR_COMPACTION_KEY  – Major compaction flag info
[4] HFILE_NAME_REGEX - HFiles are uuid ([0-9a-z]+). Bulk loaded hfiles has (_SeqId_[0-9]+_) has suffix.


Why we need major compaction?


Minor compaction deals with small files, while major compaction deals with larger files.
Major compaction combines multiple storefiles and generate one big HFile.
Also, when we update or delete a row in HBase table, HBase just write a deletion marker and masks the data from being accessed in further operations.
During major compaction, HBase reads each file and removes all the data marked for deletion.
This is impoertant, since if it is not performed it will eat up the disk space and will reduce the performance.
Hence major compactions. As of HBase 1.1.8, major compaction automatically happens once in 7 days unless turned off manually.
In busy clusters it is recommended to turn off automatic major compaction and trigger it manually at off peak hours.

Major compaction will read the complete table data and writes into a new file.
As a result it requires at least one full read and one full write of table data.
The two main benefits of major compaction are:
[1] Saving disk space by removing the expired and deleted data
[2] Improve the read performance by increasing disk seek performance