Friday, 12 October 2018

System Design - 'Here Now' service


Aim: 


Design a location service, where we can search for a user’s current location and also search for all users in a given location. Gaining insights about user location helps for better recommendations and predictive analysis.

Requirements:


1. The user should be able to check in their location.
2. Given a location (latitude/longitude), we should be able to find all users in the location.
3. The system should have a real-time search experience with very less latency.
4. The service should support heavy search load.
5. The system should be durable and highly available.

Scaling Estimation:


1. 200 Million check-in places
2. 100 Million active users
3. 20 Million daily check-ins
4. 30% growth every year

Database Schema:


A. Details for every check-in point – Location Table (Stored in MySQL):
1. LUID – Location Unique ID
2. Name - Location Name
3. Longitude
4. Latitude
5. Category – Breakfast/Lunch/Dinner/Coffee & Tea/Nightlife/Things to do
6. Description - Small description of the place


Location Table
PK
LUID
Int

Name
Varchar(32)

Longitude
Varchar(8)

Latitude
Varchar(8)

Category
Varchar(20)

Description
Varchar(300)








B. Details for every user – User Table (Stored in MySQL):
1. UUID – User Unique ID
2. Name – Name of the user
3. Email – email address of the user
4. DateOfBirth – Birth date of the user
5. CreationDate – Date when the user joined 
6. LastLogin – Last login information of the user

User Table
PK
UUID
Int

Name
Varchar(20)

Email
Varchar(32)

DateOfBirth
datetime

CreationDate
datetime

LastLogin
datetime









System APIs:


1. Search users in a given location: 
search(api_dev_key, location, radius_filter, max_number_of_results_to_return) api_dev_key : Mandatory. Developer API key. Used for multiple cases like throttling access based on the allowed quota.
location: Mandatory. Location where the search is performed
radius_filter: This is optional. To specify the search radius. By default, the radius is 0 meters. (Looks only the exact location).
max_number_of_results_to_return : Again optional. To specify the number of results to return. by default, return all users.


2. Check in to a location: 
checkin(user_id, location_uid, user_location_lattitude, user_location_longitude)


System Design:



1. MySQL solution: 
The user – location information will be tracked in ‘user_location_table’ in MySQL. We will insert a new row UUID, LUID, and CurrentTimestamp when the user checks in for the first time. If the user updates their check-in location, update the existing row.


user_location_table
PK
UUID
Int
LUID
Int

RecordTimestamp
datetime






A. High level query to get user location for a particular user: 
select name from location_table where LUID in (select LUID from user_location_table where UUID = <UUID1> and (CurrentTimestamp - RecordTimestamp)< 3 Hours)

B. High-level query to get all users in a particular location: 
select name from user_table where UUID in (select UUID from user_location_table where LUID = <LUID1> and (CurrentTimestamp - RecordTimestamp)< 3 Hours)

NOTE: If the data is huge, this will not return results in near real time.


2. Solution based on HBase (Preferred solution):
We can use HBase to store the check-in location of a user. We can use the ‘Time To Leave’ feature of HBase to delete the user check-in after 3 hours of check-in. The TTL can be set to 3 hours (10800 seconds).

HBase table creation:
create ‘user_check_in_info’,’c’, { TTL => 10800 }

Table structure: Row Key – <UUID> Column Qualifier – check_in_location (constant string) Column Value - <LUID/Location name>

If the same user checks in to a different location within 3 hours of the previous check-in, then we will update the same row with the new location. E.g.:

put ‘user_check_in_info’,’UUID1’,’c: check_in_location’,’LUID1/Location name 1’

Now the user is tagged to location ‘LUID1’. If the user check-in to a different location, then the row will be updated.

put ‘user_check_in_info’,’UUID1’,’c: check_in_location’,’LUID2/Location name 2’

A. To query the location of a particular user: 
This is a simple ‘get’ operation in HBase. get ‘user_check_in_info’,’UUID1’

B. To query all users in a given location: 
Since the HBase rowkey is UUID, to get all users specific to a given location, we will need to scan the entire HBase table and filter out all rows where check_in_location is the required location. This solution is not optimal. Query latency will increase as the data grows. This solution can be improved by caching the data in memory. The next section discusses one such solution.

Faster retrieval using GRIDs: 
Divide the whole map to small grids. Each gird is responsible for a group of locations that is within a specified range of latitude and longitude. As a result, we can restrict our query to a few grids which will reduce the amount of data queried. With help of given location and the optional radius, we can also find the neighboring grids that are of interests. Hence we can leverage the current system not only to get a list of users from a particular location but also from a location nearby to the given location.

The grid will be used to track the location of each user.


Deciding what should be the size of each grid: 
We can pre-divide and specify a fixed range of latitude and longitude for a given grid. But this solution will have problems when the certain area will be crowded (eg. places like San Francisco/ New York, etc.) while other areas might be sparsely populated (eg. Someplace in Antarctica!).

We can modify the solution to use dynamic grids. Here, the grid will be decided based on the number of check-in places. For example, we can create a new grid when the number of check-in place reach, say 500. In this way, each grid will have a more uniform amount of data. This can be implemented using the quadtree data structure.

Implementation: 
We will build a tree structure with four children - a QuadTree. Each grid or node will have details of users present in a particular location. Once the number of check-in places reaches the specified limit, (in our case, 500), the node will be divided to make four new children and the load is distributed. Hence, only the leaf nodes will have information about the users. To find the neighboring grids, we can connect all the leaf nodes using a doubly linked list.





How we build the QuadTree if the system crashes? 
We will scan through all the records in HBase ‘user_check_in_info’ table. Based on the location obtained for each user, we will decide which grid the user information should be kept. 

Memory requirement: We will store location ID (8 bytes), latitude (8 bytes), longitude (8 bytes), user ID (8 bytes), username (20 bytes) and check in timestamp (8 bytes) in the grid. Since we assume that there will be 20 million checks in every day, total memory requirement will be:

(8+8+8+8+20+8) *20M Bytes = 60*20M Bytes = 1145 MB

We can add few MBs of memory to hold the pointers (parent to children) required to build the QuadTree. In total let’s assume that we need roughly 1.5 GB of memory to hold the entire tree. This can easily fit into the memory of a single machine. However, for better performance and scalability we can shard data to multiple servers based on hash function of location ID. (Discussed in data sharding section).

What happens when a user creates a new check-in?

Whenever a new check-in is added by a user, we need to insert it into the HBase, as well as, in the QuadTree. Since the QuadTree is distributed among different servers, first we need to find the grid/server of the new check-in and then add it there.

What happens when user check in to location X and then check in to location Y?

We should track only the latest check-in by the user. We have designed our HBase table in such a way that it will track only the latest check-in. However, we need to track the same in our QuadTree. Hence, when we update HBase, first we get the previous location of the user from HBase (in this case X) and update the new location (in this case Y). We can do a ‘get’ operation from HBase to retrieve the previous location of the user.

get ‘user_check_in_info’,’UUID1’

With the help of previous location information obtained from the HBase table, we modify the QuadTree to delete the user from location X and add the user to location Y.

How to filter out users with check in time that lasted more than 3 hours?

Since we have set TTL, data will be purged from the HBase table after 3 hours of check-in. However, the data will still reside in the QuadTree data structure. Hence, when we retrieve the list of users for a particular location, we need to filter out for users who have checked in time longer than 3 hours. This can be implemented with help of check-in timestamp stored in QuadTree and the current timestamp. (Rule: current timestamp - check in timestamp < 3 hours)

Housekeeping: We can run a background chore that will periodically check for the records that have expired (>3 Hours) and then do a cleanup activity. This will be similar to compaction activity performed by HBase. This is required for disk space cleanup and better overall performance.

Data Sharding:


user_check_in_info HBase Table: We can shard data based on the hash of UUID. Ideally UUID will be monotonically increasing integer generated by MySQL. We need to hash the UUID to generate a uniform distribution of data across different servers. Also, based on the hash function, we can also create a pre-split HBase table which will help in better performance and load balancing.

users_in_location QuadTree: One solution is using a hash function to map each location ID to a server where we will store the location and the user present in the location. However, this can cause hot spotting if some location is crowded with users. For example, if we have some event in Levi’s Stadium, Santa Clara, then there might a large number of check in to the stadium. Hence, partitioning based on the hash of UUID is a better solution. However, this comes with an overhead of extra computation when we need to fetch users for a particular location. Since, now the data is sharded based on UUID and can reside in any of the available QuadTree servers, we need to fire the query to get results of users from a particular location to all the servers. Once each server responds with their set of users, then we need an aggregation service that will aggregate the results from all the individual servers and preset the complete output.

Fault Tolerance & Replication:

We would need replicas of these servers, so that if the primary dies the secondary can take control. We will store this data in a distributed file system like HDFS/MapRFS. By default, these filesystems will have data replication factor of 3. Also, we can make use of SSDs for faster IOs.

Disk space required to store data:


For user details: 
8 (UUID) + 20 (Name) + 32 (Email) + 8 (DateOfBirth) + 8 (CreationDate) + 8 (LastLogin) = 84 bytes per user 

For 100 million users: 
84*100M = ~8GB For location details: 8 (LUID) + 32 (name) + 8 (Latitude) + 8 (Longitude) + 20 (Category) + 300 (Description) = 374 bytes per location For 

200 million locations: 
374*200M = ~70GB

For user-location tracking table: 
8 bytes (UUID) + 8 bytes (LUID) = 16 bytes + 4 bytes (HBase metadata) For 20 million check ins: 20*20M = ~380MB

Hence, we need ~80GB of storage space for the first year. If we design a system for 10 years with year over year growth of 30%, we will need ~1110GB storage space. If we consider data replication of factor 3, then total storage required will be 1110*3GB = 3330GB.

High Level System Design:








Thursday, 27 September 2018



Spark-Python Logging


Aim:


To understand how we can log while using PySpark from both driver and executor.

Details:


We cannot directly use log4j logger for driver logging. However, this is not as straightforward as that of driver logging. This is because Python workers are spawned by executor JVMs and these workers do not have callback connection to Java. PySpark is built on top of Spark’s Java API. Data is processed in Python and cached/ shuffled in the JVM.

enter image description here



Py4J enables Python programs running in a Python interpreter dynamically access Java objects in a JVM. If we closely look at the diagram, we can see that the Py4J runs inside the Spark driver. We do not have Py4J inside each Spark workers. 

As a result, we will have to copy the python module file that has information about the logging configurations to distributed filesystem (HDFS/MapRFS, let’s say - /user/mapr/spark/mysparkexecutorlogger.py) and then access it as follows:

# Add the custom logger file to spark context
sc.addPyFile('maprfs:///user/mapr/spark/mysparkexecutorlogger.py')
import mysparkexecutorlogger

Contents of ‘/user/mapr/spark/mysparkexecutorlogger.py’:

import logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler('/spark/jobs/executor.log')
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s -' ' %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)




Friday, 21 September 2018

Bulk load operation in MapRDB - Full and Incremental


Aim:

To get familiarized with full/fresh bulk load and incremental bulk load concept in MapRDB tables. MapRDB is a high-performance NoSQL database. It has two main flavors – MapRDB Binary and MapRDB JSON. MapRDB Binary has the same capability as of Apache HBase, in fact, it comes with a lot of better performance and stability compared to Apache HBase.

Advantages of MapRDB bulk load over Apache HBase bulk load:

An HBase bulk load has these three steps:
[1] Generate HFiles using a mapreduce job. 
[2] Add the HFiles to the region servers. 
[3] Perform a full compaction to get good locality, since the previous step will not guarantee data locality.

A MapR-DB bulk load involves only one step: 
[1] Execute the mapreduce job. 

The MapRDB full bulk load writes directly to the segment files, bypassing the buckets. The incremental bulk load writes to the bucket files.  The mapreduce job cost for a MapRDB full bulk load is similar in cost to the one for HBase. However once complete, no compaction is needed with data locality in MapRDB. The mapreduce job for MapRDB incremental bulk load is more expensive than the mapreduce job for HBase bulk load since the MapRDB mapreduce job is actually updating the bucket files properly. Thus it is going to be slower than the HBase mapreduce job. But once complete, no compaction is needed with MapRDB for optimal performance.


Bulkload to MapRDB

The bulkload operation to MapRDB is useful when you are trying to load a large amount of data in one go. It avoids the normal write path and makes the operation much faster. It skips the write-ahead log (WAL). Definitely, this is the correct way to populate your MapRDB table (in that case, Apache HBase table as well) when you have huge put operation.

Full bulk load: Full bulk load on the MapRDB table is performed when the table does not contain any data. For this to happen, you need to make sure that ‘BULKLOAD’ property is set to ‘true’. You can set this property either at table creation time or using ‘alter’ command if the table is already created.

 a. When you create a table:
     create '<path_to_maprdb_table>', '<column_family>', BULKLOAD => 'true'
     E.g: create '/user/user01/blt', 'c', BULKLOAD => 'true'

 b. When table is already present:
     alter '<path_to_maprdb_table>', '<column_family>', BULKLOAD => 'true'
     E.g: alter '/user/user01/blt', 'c', BULKLOAD => 'false'

Incremental bulk load: Once there is data present in the MapRDB and table is enabled for read-write operation, then we need to do the incremental bulk load. The incremental bulk load can be executed multiple times. To enable incremental bulk load to a MapRDB table, make sure the ‘BULKLOAD’ property is set to ‘false’.

    alter '<path_to_maprdb_table>', '<column_family>', BULKLOAD => 'false'
    E.g: alter '/user/user01/blt', 'c', BULKLOAD => 'false'

You can use the same code for both full bulk load and incremental bulk load. The only change you have to make is to change property ‘BULKLOAD’ from ‘true’ (for full bulk load) to ‘false’ (for incremental bulk load).

You can find sample code for MapRDB/HBase bulk load in the following GitHub:

References:


Saturday, 7 April 2018


Use Case: Analyze Flight details using Spark - MapRDB JSON - Apache Drill leveraging the secondary index


Aim:


This post discusses a use case for analyzing flight travel details. 
This analytic use case includes the following:

  • Understand the flight travel statistics data
  • Extract the necessary data from the CSV files using Spark
  • Run queries using Spark SQL
  • Perform data cleaning and business logic using Spark
  • Convert the data to JSON and then to OJAI MapRDB RDD
  • Store the data in MapRDB JSON table
  • Create a secondary index on the MapRDB JSON table
  • Query the data using Apache DRILL
  • Use BI tool to see the report 


The following diagram explains the use case in brief.






The data and complete code is available in my GitHub repository -  https://github.com/jamealwi2/spark-maprdb


The Data


The data contains details of flight travels made in the CSV format.
You can download the data from here.

Extracting the necessary data


At this point, you should be having the sample data with you.
You can see that each row in the file contains 29 fields. Details about each field can be found here.

Let's design two specific use case to work with:
  1. Find the total number of canceled flights.
  2. Find the number of flights canceled to a particular destination.

Let's get started and get our hands dirty ;)

Read the data from the CSV file.

    // Reading the input data
    val csvfile = "/tmp/flightdata/2008.csv"
    var csvDF = spark.read.option("header", "true").csv(csvfile)


Since we are interested only in flight travels which were canceled, we need only the following fields:
Year,Month,DayofMonth,CRSDepTime,FlightNum,Origin,Dest,Cancelled,CancellationCode

Create a case class 'CancelledFlights' that defines necessary fields for the use case.

  // Case Class containing the Year, Month, DayofMonth, Scheduled departure time(CRSDepTime), 
  // FlightNumber, Origin, Destination, Cancellation status and Cancellation code.
  case class CancelledFlights(Year: String,
                              Month: String,
                              DayofMonth: String,
                              CRSDepTime: String,
                              FlightNum: String,
                              Origin: String,
                              Dest: String,
                              Cancelled: String,
                              CancellationCode: String)
extends Serializable


Now, create a temporary view to select the necessary fields.

// Creating a temporary view to query only necessary fields.
    csvDF.createOrReplaceTempView("flightdata")

Filtering the necessary fields:

// Filtering the necessary information.
    // We need only subset of fields from the available 29 fields.
    // Also, we need information only for the cancelled flights.
    // Cancelled flights will have 'Cancelled=1'.
    val cancelledFlightsDS: Dataset[CancelledFlights] = spark
      .sql(
        "select Year,Month,DayofMonth,CRSDepTime,FlightNum,Origin,Dest,Cancelled,CancellationCode from flightdata where Cancelled='1'")
      .as[CancelledFlights]

Perform data cleaning and business logic using Spark


Let's go ahead to clean the data a bit and give more value to it. We will convert the CancellationCode to meaningful CancellationReason. Define a UDF that matches the CancellationCode to CancellationReason.

// Converting the Cancellation code to more meaningful reason.
    val lookup = udf[String, String](cancellationCode =>
      cancellationCode match {
        case "A" => "carrier"
        case "B" => "weather"
        case "C" => "NAS"
        case "D" => "security"
        case _   => "unknown"
})

Apply the business logic to the Spark Dataset.

val cancelledFlightsWithReasonDS = cancelledFlightsDS.withColumn(
      "CancellationReason",
      lookup(cancelledFlightsDS("CancellationCode")))
    cancelledFlightsWithReasonDS.createOrReplaceTempView("cancelledflightdata")

Create a case class with CancellationReason.

// Case Class containing the Year, Month, DayofMonth, Scheduled departure time(CRSDepTime), 
  // FlightNumber, Origin, Destination and Cancellation reason.
  case class CancelledFlightsWithReason(Year: String,
                                        Month: String,
                                        DayofMonth: String,
                                        CRSDepTime: String,
                                        FlightNum: String,
                                        Origin: String,
                                        Dest: String,
                                        CancellationReason: String)
extends Serializable

Create a new dataset with CancellationReason and removing CancellationCode.

val cancelledFlightsWithReasonDS2: Dataset[CancelledFlightsWithReason] =
      spark
        .sql(
          "select Year,Month,DayofMonth,CRSDepTime,FlightNum,Origin,Dest,CancellationReason from cancelledflightdata")
        .as[CancelledFlightsWithReason]

Convert the data to JSON and then to OJAI MapRDB-JSON RDD


We will now create an OJAI MAPRDB-JSON RDD that will then be stored in MapRDB JSON table. Define a case class with necessary details.

// Case class defining the MapRDB-JSON document. 
  case class FlightJsonDocument(_id: String,
                                FlightNum: String,
                                Origin: String,
                                Dest: String,
                                CancellationReason: String)
extends Serializable

Define a function that will perform the logic to create the '_id' necessary for the MapRDB JSON table. We will combine Year, Month, DayOfMonth, FlightNum and CRSDepTime.

// Function creating the _id for MapRDB JSON
  def createJsonDocument(r: CancelledFlightsWithReason): FlightJsonDocument = {
    val _id = r.Year + "_" + r.Month + "_" + r.DayofMonth + "#" + r.FlightNum + "#" + r.CRSDepTime
    FlightJsonDocument(_id, r.FlightNum, r.Origin, r.Dest, r.CancellationReason)
  }

Enough of the preparation works, let's create the OJAI RDD!

// Creating OJAI MapRDB-JSON RDD
    val ojaiRDD = cancelledFlightsWithReasonDS2
      .map(createJsonDocument)
      .toJSON
      .rdd
.map(MapRDBSpark.newDocument)


Store the data in MapRDB JSON table


To persist the data to MapRDB JSON, use 'saveToMapRDB' method available with OJAI MAPRDB-JSON RDD.

// Storing the data to MapRDB-JSON table.
    ojaiRDD.saveToMapRDB("/tmp/flighttable",
                         createTable = true,
                         idFieldPath = "_id")

Querying the data using Apache Drill


Now let's query the data using Apache Drill. Execute the first use case, i.e., counting the total number of canceled flights. 

0: jdbc:drill:> select count(*) from dfs.tmp.flighttable;
+---------+
| EXPR$0  |
+---------+
| 137418  |
+---------+
1 row selected (4.897 seconds)
0: jdbc:drill:>

If you take look into the physical plan for the query, you can see that the data is read from the '/tmp/flighttable' MapRDB-JSON table.

tableName=maprfs:///tmp/flighttable

The complete physical plan is given below:

00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {412256.1 rows, 2336119.1 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 225
00-01      Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {412256.0 rows, 2336119.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 224
00-02        StreamAgg(group=[{}], EXPR$0=[$SUM0($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {412255.0 rows, 2336118.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 223
00-03          StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {412254.0 rows, 2336106.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 222
00-04            Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 137418.0, cumulative cost = {274836.0 rows, 687090.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 221
00-05              Scan(table=[[dfs, tmp, flighttable]], groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec [tableName=maprfs:///tmp/flighttable, condition=null], columns=[], maxwidth=1]]) : rowType = RecordType(): rowcount = 137418.0, cumulative cost = {137418.0 rows, 137418.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 220


Create a secondary index on the MapRDB JSON table


Let's move on to the second use case. It is different from the first use case in the aspect that first use was dealing with the complete set of data. In this use case, we are interested in the number of flights canceled to a particular destination. While querying MapRDB JSON, if there is a condition that involves field other than the '_id' field (equivalent to RowKey concept in Apache HBase), then it calls for a complete table scan. Well, that does look like performance friendly!

Example: Find out the total number of canceled flights to 'LAS'.

0: jdbc:drill:> select count(*) from dfs.tmp.flighttable where Dest='LAS';
+---------+
| EXPR$0  |
+---------+
| 1804    |
+---------+
1 row selected (1.319 seconds)

Looking at the physical plan:

00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {206128.1 rows, 1168054.1 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 459
00-01      Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {206128.0 rows, 1168054.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 458
00-02        StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {206127.0 rows, 1168053.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 457
00-03          Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 68709.0, cumulative cost = {137418.0 rows, 343545.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 456
00-04            Scan(table=[[dfs, tmp, flighttable]], groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec [tableName=maprfs:///tmp/flighttable, condition=(Dest = "LAS")], columns=[`Dest`], maxwidth=1]]) : rowType = RecordType(ANY Dest): rowcount = 68709.0, cumulative cost = {68709.0 rows, 68709.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 455

You can see that the cumulative cost for the table scan operation is 68709 rows.
From the Drill query profile page, you will also get the total cost and the total duration of the query.




The secondary indexing feature available from MapR 6.0 comes to the rescue here. We will create a secondary index with the 'Dest' as indexed field.

maprcli table index add -path /tmp/flighttable -index destinationindex -indexedfields Dest -includedfields FlightNum

Re-run the query and observe the results.

0: jdbc:drill:> select count(*) from dfs.tmp.flighttable where Dest='LAS';
+---------+
| EXPR$0  |
+---------+
| 1804    |
+---------+
1 row selected (0.807 seconds)


00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {207.22700979050714 rows, 1169.1530554795404 cpu, 262144.0 io, 0.0 network, 0.0 memory}, id = 751
00-01      Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {207.12700979050715 rows, 1169.0530554795405 cpu, 262144.0 io, 0.0 network, 0.0 memory}, id = 750
00-02        StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {206.12700979050715 rows, 1168.0530554795405 cpu, 262144.0 io, 0.0 network, 0.0 memory}, id = 749
00-03          Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 68.70900326350238, cumulative cost = {137.41800652700476 rows, 343.5450163175119 cpu, 262144.0 io, 0.0 network, 0.0 memory}, id = 748
00-04            Scan(table=[[dfs, tmp, flighttable]], groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec [tableName=maprfs:///tmp/flighttable, condition=(Dest = "LAS"), indexName=destinationindex], columns=[`Dest`], maxwidth=1]]) : rowType = RecordType(ANY Dest): rowcount = 68.70900326350238, cumulative cost = {68.70900326350238 rows, 68.70900326350238 cpu, 262144.0 io, 0.0 network, 0.0 memory}, id = 747

Now we have indexed table in the picture. You can see that in the table scan operation (indexName=destinationindex). As you might have noticed, the time for the query to return the result has reduced as is the cumulative cost. (Wait, you may not appreciate the results here since we are dealing with queries that return results in a second,  but it will make considerable performance impact on a larger dataset.)




Use BI tool to see the report


You can now go ahead to use BI tool of your choice to connect to Drill and produce interesting reports. 

Happy Drilling!




Thursday, 5 April 2018

How to use 'mapr dbshell' to create, insert and query MapRDB-JSON table?


Aim:

This post discusses the basics of using 'mapr dbshell' to create, insert and query on MapRDB JSON tables.

Details:


A. Creating a table

[1] To log in to MapRDBJSON shell, execute the following:
mapr dbshell

[mapr@vm60-155 root]$ mapr dbshell
====================================================
*                  MapR-DB Shell                   *
* NOTE: This is a shell for JSON table operations. *
====================================================
Version: 6.0.0-mapr

MapR-DB Shell
maprdb mapr:>

[2] You can use 'create' command to create a MaprDBJSON table from dbshell. 
Let's create a table, say '/tmp/maprdbjsontest'.

maprdb mapr:> create /tmp/maprdbjsontest
Table /tmp/maprdbjsontest created.

B. Inserting data

Now that we have the table created, let's discuss how to insert data to table from dbshell.
Let's say, I have following JSON data to insert:

{"_id":"1","confidence":0.24,"label":"person"}
{"_id":"2","label":"person2"}
{"_id":"3","confidence":0.54,"label":"person3"}
{"_id":"4","confidence":null,"label":"person4"}
{"_id":"5","confidence":1.5,"label":"person5","topleft":{"extra":{"v":50},"x":62,"y":1}}
{"_id":"6","confidence":2.5,"label":"person6","topleft":{"extra":{"v":null},"x":62,"y":1}}
{"_id":"8","confidence":"null","label":"person6","topleft":{"extra":{"v":null},"x":62,"y":1}}

Use the 'insert' command to insert data to table from dbshell. Generic syntax is as follows:
insert --table <path_to_table> --value '<JSON>'

insert --table /tmp/maprdbjsontest/ --value '{"_id": "1", "label": "person", "confidence": 0.24}'
insert --table /tmp/maprdbjsontest/ --value '{"_id": "2", "label": "person2"}'
insert --table /tmp/maprdbjsontest/ --value '{"_id": "3", "label": "person3", "confidence": 0.54}'
insert --table /tmp/maprdbjsontest/ --value '{"_id": "4", "label": "person4", "confidence": null}'
insert --table /tmp/maprdbjsontest/ --value '{"_id":"5","confidence":1.5,"label":"person5","topleft":{"extra":{"v":50},"x":62,"y":1}}'
insert --table /tmp/maprdbjsontest/ --value '{"_id":"6","confidence":2.5,"label":"person6","topleft":{"extra":{"v":null},"x":62,"y":1}}'
insert --t /tmp/maprdbjsontest/ --v '{"_id":"8","confidence":"null","label":"person6","topleft":{"extra":{"v":null},"x":62,"y":1}}'

As you might have noticed (while inserting the document with _id=8), we can use short for --table and --value as --t and --v correspondingly. 


C. Querying data

Now we will see how to query data.

[1] To scan/see data from a table, you can use 'find' command. 'findbyid' can be used to query a document with a specific id.

To see complete data:

maprdb mapr:> find /tmp/maprdbjsontest/
{"_id":"1","confidence":0.24,"label":"person"}
{"_id":"2","label":"person2"}
{"_id":"3","confidence":0.54,"label":"person3"}
{"_id":"4","confidence":null,"label":"person4"}
{"_id":"5","confidence":1.5,"label":"person5","topleft":{"extra":{"v":50},"x":62,"y":1}}
{"_id":"6","confidence":2.5,"label":"person6","topleft":{"extra":{"v":null},"x":62,"y":1}}
{"_id":"8","confidence":"null","label":"person6","topleft":{"extra":{"v":null},"x":62,"y":1}}
7 document(s) found.

To limit the number of records fetched, use '--limit'.

maprdb mapr:> find /tmp/maprdbjsontest/ --limit 4
{"_id":"1","confidence":0.24,"label":"person"}
{"_id":"2","label":"person2"}
{"_id":"3","confidence":0.54,"label":"person3"}
{"_id":"4","confidence":null,"label":"person4"}
4 document(s) found.

To query a specific document use 'findbyid'

maprdb mapr:> findbyid --t /tmp/maprdbjsontest/ --id 1
{"_id":"1","confidence":0.24,"label":"person"}
1 document(s) found.

[2] To query data which satisfies some condition
We will discuss how to fetch records that match certain conditions.

Case 1: Fetch all the records where confidence value is 0.24.
maprdb mapr:> find /tmp/maprdbjsontest/ --c {"$and":[{"$eq":{"confidence":0.24}}]} --fields confidence,label
{"confidence":0.24,"label":"person"}
1 document(s) found.

Case 2: Fetch all the records where confidence value is not equal to 0.24.
maprdb mapr:> find /tmp/maprdbjsontest/ --c {"$and":[{"$ne":{"confidence":0.24}}]} --fields confidence,label
{"label":"person2"}
{"confidence":0.54,"label":"person3"}
{"confidence":null,"label":"person4"}
{"confidence":1.5,"label":"person5"}
{"confidence":2.5,"label":"person6"}
{"confidence":"null","label":"person6"}
6 document(s) found.

Case 3: Fetch all the records where confidence value is 0.24 or 1.5.
maprdb mapr:> find /tmp/maprdbjsontest/ --c {"$or":[{"$eq":{"confidence":0.24}},{"$eq":{"confidence":1.5}}]} --fields confidence,label
{"confidence":0.24,"label":"person"}
{"confidence":1.5,"label":"person5"}
2 document(s) found.
Case 4: Fetch all the records where confidence value greater than 0.24.
maprdb mapr:> find /tmp/maprdbjsontest/ --c {"$and":[{"$gt":{"confidence":0.24}}]} --fields confidence,label
{"confidence":0.54,"label":"person3"}
{"confidence":1.5,"label":"person5"}
{"confidence":2.5,"label":"person6"}
3 document(s) found.

Case 5: Fetch all the records where confidence value less than 0.26.
maprdb mapr:> find /tmp/maprdbjsontest/ --c {"$and":[{"$lt":{"confidence":0.26}}]} --fields confidence,label
{"confidence":0.24,"label":"person"}
1 document(s) found.

[3] To query data which contains a field value as null

There are two aspects to null here. 
As per JSON standards, null means that the field itself is not existing.
So, to query for documents which do not have a field, we use "$notexists".

maprdb mapr:> find /tmp/maprdbjsontest/ --c '{"$notexists":"confidence"}' --fields confidence,label
{"label":"person2"}
1 document(s) found.

Now if you have a field with value as NULL, then to retrieve such records, use "$typeOf" operator.

maprdb mapr:> find /tmp/maprdbjsontest/ --c '{"$typeOf":{"confidence":"null"}}' --fields confidence,label
{"confidence":null,"label":"person4"}
1 document(s) found.

To query all documents that are not null use "$notTypeOf" operator.

maprdb mapr:> find /tmp/maprdbjsontest/ --c '{"$notTypeOf":{"confidence":"null"}}' --fields confidence,label
{"confidence":0.24,"label":"person"}
{"label":"person2"}
{"confidence":0.54,"label":"person3"}
{"confidence":1.5,"label":"person5"}
{"confidence":2.5,"label":"person6"}
{"confidence":"null","label":"person6"}
6 document(s) found.

Please note that for record with 'person6', the confidence value is string "null" and not null.

Friday, 30 March 2018

DRILL Query on Hive transaction table getting wrong stats


Aim:

This post discusses an interesting DRILL behavior when querying Hive transaction tables.

Details:

From DRILL 1.12 onwards, it supports querying over Hive transaction table.
This is as part of https://issues.apache.org/jira/browse/DRILL-5978. DRILL 1.12 now comes with Hive 2.1 package which supports operations on Hive transaction tables.

Environment setup:
DRILL 1.12/ Hive 2.1 

1. Create a Hive transaction table in Hive. Let's call it txn_test2.

hive> show create table txn_test2;
OK
CREATE TABLE `txn_test2`(
  `a` int,
  `b` string)
PARTITIONED BY (
  `c` string)
CLUSTERED BY (
  a)
INTO 3 BUCKETS
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
  'maprfs:/user/hive/warehouse/txn_test'
TBLPROPERTIES (
  'transactional'='true',
  'transient_lastDdlTime'='1522439951')
Time taken: 0.151 seconds, Fetched: 19 row(s)
2. Insert some data into the table.

hive> select * from txn_test2;
OK
3       James   1
1       Alwin   1
4       James   1
2       Alwin   1
3       James   2
1       Alwin   2
4       James   2
2       Alwin   2
Time taken: 0.247 seconds, Fetched: 8 row(s)

3. Query the Hive table using DRILL. 

0: jdbc:drill:> use hive;
+-------+-----------------------------------+
|  ok   |              summary              |
+-------+-----------------------------------+
| true  | Default schema changed to [hive]  |
+-------+-----------------------------------+
1 row selected (0.111 seconds)
0: jdbc:drill:> select count(1) from txn_test2;
+---------+
| EXPR$0  |
+---------+
| 8       |
+---------+
1 row selected (1.258 seconds)
0: jdbc:drill:>


4. Now check the physical plan for the corresponding query. 

00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.1 rows, 17.1 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 646
00-01      Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0 rows, 17.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 645
00-02        StreamAgg(group=[{}], EXPR$0=[COUNT($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {2.0 rows, 16.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 644
00-03          Project($f0=[1]) : rowType = RecordType(INTEGER $f0): rowcount = 1.0, cumulative cost = {1.0 rows, 4.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 643
00-04            Scan(table=[[hive, txn_test2]], groupscan=[HiveScan [table=Table(dbName:default, tableName:txn_test2), columns=[], numPartitions=2, partitions= [Partition(values:[1]), Partition(values:[2])], inputDirectories=[maprfs:/user/hive/warehouse/txn_test/c=1, maprfs:/user/hive/warehouse/txn_test/c=2]]]) : rowType = RecordType(): rowcount = 1.0, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 642

Did you observe anything interesting? If not, take a closer look at the cumulative cost of the table scan operation. You can see that the cumulative cost shows the total number of rows to be 0. Now, that's weird! It should show me 8 since the Hive table has 8 rows. (Well, if not exactly 8, something nearby at least!)

If you are familiar with DRILL, then you will know that the one property that the number of minor fragments (~ number of threads) depends on is the slice_target. slice_target depends on the cost calculated. If the cost is 0, it will spin only one minor fragment. This will be a problem when you are dealing with millions of records. 

Here, the issue is with statistics returned from Hive. 

hive> describe formatted txn_test2 partition (c=1);
OK
# col_name              data_type               comment

a                       int
b                       string

# Partition Information
# col_name              data_type               comment

c                       string

# Detailed Partition Information
Partition Value:        [1]
Database:               default
Table:                  txn_test2
CreateTime:             Fri Mar 30 13:01:30 PDT 2018
LastAccessTime:         UNKNOWN
Location:               maprfs:/user/hive/warehouse/txn_test/c=1
Partition Parameters:
        numFiles                3
        numRows                 0
        rawDataSize             0
        totalSize               1940
        transient_lastDdlTime   1522440090

# Storage Information
SerDe Library:          org.apache.hadoop.hive.ql.io.orc.OrcSerde
InputFormat:            org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
Compressed:             No
Num Buckets:            3
Bucket Columns:         [a]
Sort Columns:           []
Storage Desc Params:
        serialization.format    1
Time taken: 0.278 seconds, Fetched: 34 row(s)

You can see that the number of rows (numRows) is 0. 


Solution:


Compute statistics in Hive using the command:
https://cwiki.apache.org/confluence/display/Hive/StatsDev

hive> analyze table txn_test2 partition(c) compute statistics;
Partition default.txn_test2{c=2} stats: [numFiles=3, numRows=4, totalSize=1940, rawDataSize=500]
Partition default.txn_test2{c=1} stats: [numFiles=3, numRows=4, totalSize=1940, rawDataSize=500]
OK
Time taken: 0.677 seconds
hive>

Now verify the statistics again:

hive> describe formatted txn_test2 partition (c=1);
OK
# col_name              data_type               comment

a                       int
b                       string

# Partition Information
# col_name              data_type               comment

c                       string

# Detailed Partition Information
Partition Value:        [1]
Database:               default
Table:                  txn_test2
CreateTime:             Fri Mar 30 13:01:30 PDT 2018
LastAccessTime:         UNKNOWN
Location:               maprfs:/user/hive/warehouse/txn_test/c=1
Partition Parameters:
        COLUMN_STATS_ACCURATE   {\"BASIC_STATS\":\"true\"}
        numFiles                3
        numRows                 4
        rawDataSize             500
        totalSize               1940
        transient_lastDdlTime   1522440837

# Storage Information
SerDe Library:          org.apache.hadoop.hive.ql.io.orc.OrcSerde
InputFormat:            org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
Compressed:             No
Num Buckets:            3
Bucket Columns:         [a]
Sort Columns:           []
Storage Desc Params:
        serialization.format    1
Time taken: 0.305 seconds, Fetched: 35 row(s)

You can see that the numRows have changed to 4. (This is because we are checking stats for partition c=1)

Fire the query from DRILL again and check the physical plan.

00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {25.1 rows, 289.1 cpu, 3880.0 io, 0.0 network, 0.0 memory}, id = 808
00-01      Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {25.0 rows, 289.0 cpu, 3880.0 io, 0.0 network, 0.0 memory}, id = 807
00-02        StreamAgg(group=[{}], EXPR$0=[COUNT($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {24.0 rows, 288.0 cpu, 3880.0 io, 0.0 network, 0.0 memory}, id = 806
00-03          Project($f0=[1]) : rowType = RecordType(INTEGER $f0): rowcount = 8.0, cumulative cost = {16.0 rows, 192.0 cpu, 3880.0 io, 0.0 network, 0.0 memory}, id = 805
00-04            Scan(table=[[hive, txn_test2]], groupscan=[HiveScan [table=Table(dbName:default, tableName:txn_test2), columns=[], numPartitions=2, partitions= [Partition(values:[1]), Partition(values:[2])], inputDirectories=[maprfs:/user/hive/warehouse/txn_test/c=1, maprfs:/user/hive/warehouse/txn_test/c=2]]]) : rowType = RecordType(): rowcount = 8.0, cumulative cost = {8.0 rows, 160.0 cpu, 3880.0 io, 0.0 network, 0.0 memory}, id = 804

Now you can see that the estimate is correct. This is the expected behavior.

Happy Drilling!