Wednesday, 26 July 2017

Example scala code for creating MapRDB table and inserting data from Spark-Shell


import org.apache.hadoop.hbase.client.{HBaseAdmin, Put}
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.util.Bytes.toBytes
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog

val spark = SparkSession.builder().appName("MapRDBConnectorExample").enableHiveSupport().getOrCreate()
val config = HBaseConfiguration.create()
val hBaseAdmin = new HBaseAdmin(config)
val hbaseContext = new HBaseContext(spark.sparkContext, config)

import spark.sql

sql("CREATE TABLE IF NOT EXISTS HiveTable (rowKeyData INT, Column1 STRING, Column2 STRING)")
sql("INSERT INTO TABLE HiveTable VALUES(1, 'Col1', 'Col2')")


Once the above action is performed, we will see following in Hive:
hive> select * from hivetable;
OK
1       Col1    Col2
Time taken: 0.14 seconds, Fetched: 1 row(s)

val df = sql("SELECT * FROM HiveTable").toDF()

val MapRDBTableName = "/tmp/SparkMapRDBTable"
val columnFamilyName = "cf"
val cata =s"""{"table":{"namespace":"default", "name":"$MapRDBTableName"},"rowkey":"key","columns":{"rowKeyData":{"cf":"rowkey", "col":"key", "type":"int"},"Column1":{"cf":"$columnFamilyName", "col":"Column1", "type":"string"},"Column2":{"cf":"$columnFamilyName", "col":"Column2", "type":"string"}}}""".stripMargin

val hTableDescriptor = new HTableDescriptor(toBytes(MapRDBTableName))
val hColumnDescriptor = new HColumnDescriptor(toBytes(columnFamilyName))
hTableDescriptor.addFamily(hColumnDescriptor)
hBaseAdmin.createTable(hTableDescriptor)


At this point, MapRDB table will be created.


df.write.options(Map(HBaseTableCatalog.tableCatalog -> cata, HBaseTableCatalog.newTable -> "1")).format("org.apache.hadoop.hbase.spark").save()

At this point, data will be inserted to MapRDB


val df2 = spark.read.options(Map(HBaseTableCatalog.tableCatalog -> cata)).format("org.apache.hadoop.hbase.spark").load()
df2.show()

+-------+----------+-------+
|Column2|rowKeyData|Column1|
+-------+----------+-------+
|   Col2|         1|   Col1|
+-------+----------+-------+


The example was verified for MapRDB tables with

mapr-spark-2.1.0.201706271156-1.noarch
mapr-hive-2.1.201706231053-1.noarch
mapr-hbase-1.1.8.201704031229-1.noarch

No comments:

Post a Comment