Saturday 7 April 2018


Use Case: Analyze Flight details using Spark - MapRDB JSON - Apache Drill leveraging the secondary index


Aim:


This post discusses a use case for analyzing flight travel details. 
This analytic use case includes the following:

  • Understand the flight travel statistics data
  • Extract the necessary data from the CSV files using Spark
  • Run queries using Spark SQL
  • Perform data cleaning and business logic using Spark
  • Convert the data to JSON and then to OJAI MapRDB RDD
  • Store the data in MapRDB JSON table
  • Create a secondary index on the MapRDB JSON table
  • Query the data using Apache DRILL
  • Use BI tool to see the report 


The following diagram explains the use case in brief.






The data and complete code is available in my GitHub repository -  https://github.com/jamealwi2/spark-maprdb


The Data


The data contains details of flight travels made in the CSV format.
You can download the data from here.

Extracting the necessary data


At this point, you should be having the sample data with you.
You can see that each row in the file contains 29 fields. Details about each field can be found here.

Let's design two specific use case to work with:
  1. Find the total number of canceled flights.
  2. Find the number of flights canceled to a particular destination.

Let's get started and get our hands dirty ;)

Read the data from the CSV file.

    // Reading the input data
    val csvfile = "/tmp/flightdata/2008.csv"
    var csvDF = spark.read.option("header", "true").csv(csvfile)


Since we are interested only in flight travels which were canceled, we need only the following fields:
Year,Month,DayofMonth,CRSDepTime,FlightNum,Origin,Dest,Cancelled,CancellationCode

Create a case class 'CancelledFlights' that defines necessary fields for the use case.

  // Case Class containing the Year, Month, DayofMonth, Scheduled departure time(CRSDepTime), 
  // FlightNumber, Origin, Destination, Cancellation status and Cancellation code.
  case class CancelledFlights(Year: String,
                              Month: String,
                              DayofMonth: String,
                              CRSDepTime: String,
                              FlightNum: String,
                              Origin: String,
                              Dest: String,
                              Cancelled: String,
                              CancellationCode: String)
extends Serializable


Now, create a temporary view to select the necessary fields.

// Creating a temporary view to query only necessary fields.
    csvDF.createOrReplaceTempView("flightdata")

Filtering the necessary fields:

// Filtering the necessary information.
    // We need only subset of fields from the available 29 fields.
    // Also, we need information only for the cancelled flights.
    // Cancelled flights will have 'Cancelled=1'.
    val cancelledFlightsDS: Dataset[CancelledFlights] = spark
      .sql(
        "select Year,Month,DayofMonth,CRSDepTime,FlightNum,Origin,Dest,Cancelled,CancellationCode from flightdata where Cancelled='1'")
      .as[CancelledFlights]

Perform data cleaning and business logic using Spark


Let's go ahead to clean the data a bit and give more value to it. We will convert the CancellationCode to meaningful CancellationReason. Define a UDF that matches the CancellationCode to CancellationReason.

// Converting the Cancellation code to more meaningful reason.
    val lookup = udf[String, String](cancellationCode =>
      cancellationCode match {
        case "A" => "carrier"
        case "B" => "weather"
        case "C" => "NAS"
        case "D" => "security"
        case _   => "unknown"
})

Apply the business logic to the Spark Dataset.

val cancelledFlightsWithReasonDS = cancelledFlightsDS.withColumn(
      "CancellationReason",
      lookup(cancelledFlightsDS("CancellationCode")))
    cancelledFlightsWithReasonDS.createOrReplaceTempView("cancelledflightdata")

Create a case class with CancellationReason.

// Case Class containing the Year, Month, DayofMonth, Scheduled departure time(CRSDepTime), 
  // FlightNumber, Origin, Destination and Cancellation reason.
  case class CancelledFlightsWithReason(Year: String,
                                        Month: String,
                                        DayofMonth: String,
                                        CRSDepTime: String,
                                        FlightNum: String,
                                        Origin: String,
                                        Dest: String,
                                        CancellationReason: String)
extends Serializable

Create a new dataset with CancellationReason and removing CancellationCode.

val cancelledFlightsWithReasonDS2: Dataset[CancelledFlightsWithReason] =
      spark
        .sql(
          "select Year,Month,DayofMonth,CRSDepTime,FlightNum,Origin,Dest,CancellationReason from cancelledflightdata")
        .as[CancelledFlightsWithReason]

Convert the data to JSON and then to OJAI MapRDB-JSON RDD


We will now create an OJAI MAPRDB-JSON RDD that will then be stored in MapRDB JSON table. Define a case class with necessary details.

// Case class defining the MapRDB-JSON document. 
  case class FlightJsonDocument(_id: String,
                                FlightNum: String,
                                Origin: String,
                                Dest: String,
                                CancellationReason: String)
extends Serializable

Define a function that will perform the logic to create the '_id' necessary for the MapRDB JSON table. We will combine Year, Month, DayOfMonth, FlightNum and CRSDepTime.

// Function creating the _id for MapRDB JSON
  def createJsonDocument(r: CancelledFlightsWithReason): FlightJsonDocument = {
    val _id = r.Year + "_" + r.Month + "_" + r.DayofMonth + "#" + r.FlightNum + "#" + r.CRSDepTime
    FlightJsonDocument(_id, r.FlightNum, r.Origin, r.Dest, r.CancellationReason)
  }

Enough of the preparation works, let's create the OJAI RDD!

// Creating OJAI MapRDB-JSON RDD
    val ojaiRDD = cancelledFlightsWithReasonDS2
      .map(createJsonDocument)
      .toJSON
      .rdd
.map(MapRDBSpark.newDocument)


Store the data in MapRDB JSON table


To persist the data to MapRDB JSON, use 'saveToMapRDB' method available with OJAI MAPRDB-JSON RDD.

// Storing the data to MapRDB-JSON table.
    ojaiRDD.saveToMapRDB("/tmp/flighttable",
                         createTable = true,
                         idFieldPath = "_id")

Querying the data using Apache Drill


Now let's query the data using Apache Drill. Execute the first use case, i.e., counting the total number of canceled flights. 

0: jdbc:drill:> select count(*) from dfs.tmp.flighttable;
+---------+
| EXPR$0  |
+---------+
| 137418  |
+---------+
1 row selected (4.897 seconds)
0: jdbc:drill:>

If you take look into the physical plan for the query, you can see that the data is read from the '/tmp/flighttable' MapRDB-JSON table.

tableName=maprfs:///tmp/flighttable

The complete physical plan is given below:

00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {412256.1 rows, 2336119.1 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 225
00-01      Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {412256.0 rows, 2336119.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 224
00-02        StreamAgg(group=[{}], EXPR$0=[$SUM0($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {412255.0 rows, 2336118.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 223
00-03          StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {412254.0 rows, 2336106.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 222
00-04            Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 137418.0, cumulative cost = {274836.0 rows, 687090.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 221
00-05              Scan(table=[[dfs, tmp, flighttable]], groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec [tableName=maprfs:///tmp/flighttable, condition=null], columns=[], maxwidth=1]]) : rowType = RecordType(): rowcount = 137418.0, cumulative cost = {137418.0 rows, 137418.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 220


Create a secondary index on the MapRDB JSON table


Let's move on to the second use case. It is different from the first use case in the aspect that first use was dealing with the complete set of data. In this use case, we are interested in the number of flights canceled to a particular destination. While querying MapRDB JSON, if there is a condition that involves field other than the '_id' field (equivalent to RowKey concept in Apache HBase), then it calls for a complete table scan. Well, that does look like performance friendly!

Example: Find out the total number of canceled flights to 'LAS'.

0: jdbc:drill:> select count(*) from dfs.tmp.flighttable where Dest='LAS';
+---------+
| EXPR$0  |
+---------+
| 1804    |
+---------+
1 row selected (1.319 seconds)

Looking at the physical plan:

00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {206128.1 rows, 1168054.1 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 459
00-01      Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {206128.0 rows, 1168054.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 458
00-02        StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {206127.0 rows, 1168053.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 457
00-03          Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 68709.0, cumulative cost = {137418.0 rows, 343545.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 456
00-04            Scan(table=[[dfs, tmp, flighttable]], groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec [tableName=maprfs:///tmp/flighttable, condition=(Dest = "LAS")], columns=[`Dest`], maxwidth=1]]) : rowType = RecordType(ANY Dest): rowcount = 68709.0, cumulative cost = {68709.0 rows, 68709.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 455

You can see that the cumulative cost for the table scan operation is 68709 rows.
From the Drill query profile page, you will also get the total cost and the total duration of the query.




The secondary indexing feature available from MapR 6.0 comes to the rescue here. We will create a secondary index with the 'Dest' as indexed field.

maprcli table index add -path /tmp/flighttable -index destinationindex -indexedfields Dest -includedfields FlightNum

Re-run the query and observe the results.

0: jdbc:drill:> select count(*) from dfs.tmp.flighttable where Dest='LAS';
+---------+
| EXPR$0  |
+---------+
| 1804    |
+---------+
1 row selected (0.807 seconds)


00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {207.22700979050714 rows, 1169.1530554795404 cpu, 262144.0 io, 0.0 network, 0.0 memory}, id = 751
00-01      Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {207.12700979050715 rows, 1169.0530554795405 cpu, 262144.0 io, 0.0 network, 0.0 memory}, id = 750
00-02        StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {206.12700979050715 rows, 1168.0530554795405 cpu, 262144.0 io, 0.0 network, 0.0 memory}, id = 749
00-03          Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 68.70900326350238, cumulative cost = {137.41800652700476 rows, 343.5450163175119 cpu, 262144.0 io, 0.0 network, 0.0 memory}, id = 748
00-04            Scan(table=[[dfs, tmp, flighttable]], groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec [tableName=maprfs:///tmp/flighttable, condition=(Dest = "LAS"), indexName=destinationindex], columns=[`Dest`], maxwidth=1]]) : rowType = RecordType(ANY Dest): rowcount = 68.70900326350238, cumulative cost = {68.70900326350238 rows, 68.70900326350238 cpu, 262144.0 io, 0.0 network, 0.0 memory}, id = 747

Now we have indexed table in the picture. You can see that in the table scan operation (indexName=destinationindex). As you might have noticed, the time for the query to return the result has reduced as is the cumulative cost. (Wait, you may not appreciate the results here since we are dealing with queries that return results in a second,  but it will make considerable performance impact on a larger dataset.)




Use BI tool to see the report


You can now go ahead to use BI tool of your choice to connect to Drill and produce interesting reports. 

Happy Drilling!




Thursday 5 April 2018

How to use 'mapr dbshell' to create, insert and query MapRDB-JSON table?


Aim:

This post discusses the basics of using 'mapr dbshell' to create, insert and query on MapRDB JSON tables.

Details:


A. Creating a table

[1] To log in to MapRDBJSON shell, execute the following:
mapr dbshell

[mapr@vm60-155 root]$ mapr dbshell
====================================================
*                  MapR-DB Shell                   *
* NOTE: This is a shell for JSON table operations. *
====================================================
Version: 6.0.0-mapr

MapR-DB Shell
maprdb mapr:>

[2] You can use 'create' command to create a MaprDBJSON table from dbshell. 
Let's create a table, say '/tmp/maprdbjsontest'.

maprdb mapr:> create /tmp/maprdbjsontest
Table /tmp/maprdbjsontest created.

B. Inserting data

Now that we have the table created, let's discuss how to insert data to table from dbshell.
Let's say, I have following JSON data to insert:

{"_id":"1","confidence":0.24,"label":"person"}
{"_id":"2","label":"person2"}
{"_id":"3","confidence":0.54,"label":"person3"}
{"_id":"4","confidence":null,"label":"person4"}
{"_id":"5","confidence":1.5,"label":"person5","topleft":{"extra":{"v":50},"x":62,"y":1}}
{"_id":"6","confidence":2.5,"label":"person6","topleft":{"extra":{"v":null},"x":62,"y":1}}
{"_id":"8","confidence":"null","label":"person6","topleft":{"extra":{"v":null},"x":62,"y":1}}

Use the 'insert' command to insert data to table from dbshell. Generic syntax is as follows:
insert --table <path_to_table> --value '<JSON>'

insert --table /tmp/maprdbjsontest/ --value '{"_id": "1", "label": "person", "confidence": 0.24}'
insert --table /tmp/maprdbjsontest/ --value '{"_id": "2", "label": "person2"}'
insert --table /tmp/maprdbjsontest/ --value '{"_id": "3", "label": "person3", "confidence": 0.54}'
insert --table /tmp/maprdbjsontest/ --value '{"_id": "4", "label": "person4", "confidence": null}'
insert --table /tmp/maprdbjsontest/ --value '{"_id":"5","confidence":1.5,"label":"person5","topleft":{"extra":{"v":50},"x":62,"y":1}}'
insert --table /tmp/maprdbjsontest/ --value '{"_id":"6","confidence":2.5,"label":"person6","topleft":{"extra":{"v":null},"x":62,"y":1}}'
insert --t /tmp/maprdbjsontest/ --v '{"_id":"8","confidence":"null","label":"person6","topleft":{"extra":{"v":null},"x":62,"y":1}}'

As you might have noticed (while inserting the document with _id=8), we can use short for --table and --value as --t and --v correspondingly. 


C. Querying data

Now we will see how to query data.

[1] To scan/see data from a table, you can use 'find' command. 'findbyid' can be used to query a document with a specific id.

To see complete data:

maprdb mapr:> find /tmp/maprdbjsontest/
{"_id":"1","confidence":0.24,"label":"person"}
{"_id":"2","label":"person2"}
{"_id":"3","confidence":0.54,"label":"person3"}
{"_id":"4","confidence":null,"label":"person4"}
{"_id":"5","confidence":1.5,"label":"person5","topleft":{"extra":{"v":50},"x":62,"y":1}}
{"_id":"6","confidence":2.5,"label":"person6","topleft":{"extra":{"v":null},"x":62,"y":1}}
{"_id":"8","confidence":"null","label":"person6","topleft":{"extra":{"v":null},"x":62,"y":1}}
7 document(s) found.

To limit the number of records fetched, use '--limit'.

maprdb mapr:> find /tmp/maprdbjsontest/ --limit 4
{"_id":"1","confidence":0.24,"label":"person"}
{"_id":"2","label":"person2"}
{"_id":"3","confidence":0.54,"label":"person3"}
{"_id":"4","confidence":null,"label":"person4"}
4 document(s) found.

To query a specific document use 'findbyid'

maprdb mapr:> findbyid --t /tmp/maprdbjsontest/ --id 1
{"_id":"1","confidence":0.24,"label":"person"}
1 document(s) found.

[2] To query data which satisfies some condition
We will discuss how to fetch records that match certain conditions.

Case 1: Fetch all the records where confidence value is 0.24.
maprdb mapr:> find /tmp/maprdbjsontest/ --c {"$and":[{"$eq":{"confidence":0.24}}]} --fields confidence,label
{"confidence":0.24,"label":"person"}
1 document(s) found.

Case 2: Fetch all the records where confidence value is not equal to 0.24.
maprdb mapr:> find /tmp/maprdbjsontest/ --c {"$and":[{"$ne":{"confidence":0.24}}]} --fields confidence,label
{"label":"person2"}
{"confidence":0.54,"label":"person3"}
{"confidence":null,"label":"person4"}
{"confidence":1.5,"label":"person5"}
{"confidence":2.5,"label":"person6"}
{"confidence":"null","label":"person6"}
6 document(s) found.

Case 3: Fetch all the records where confidence value is 0.24 or 1.5.
maprdb mapr:> find /tmp/maprdbjsontest/ --c {"$or":[{"$eq":{"confidence":0.24}},{"$eq":{"confidence":1.5}}]} --fields confidence,label
{"confidence":0.24,"label":"person"}
{"confidence":1.5,"label":"person5"}
2 document(s) found.
Case 4: Fetch all the records where confidence value greater than 0.24.
maprdb mapr:> find /tmp/maprdbjsontest/ --c {"$and":[{"$gt":{"confidence":0.24}}]} --fields confidence,label
{"confidence":0.54,"label":"person3"}
{"confidence":1.5,"label":"person5"}
{"confidence":2.5,"label":"person6"}
3 document(s) found.

Case 5: Fetch all the records where confidence value less than 0.26.
maprdb mapr:> find /tmp/maprdbjsontest/ --c {"$and":[{"$lt":{"confidence":0.26}}]} --fields confidence,label
{"confidence":0.24,"label":"person"}
1 document(s) found.

[3] To query data which contains a field value as null

There are two aspects to null here. 
As per JSON standards, null means that the field itself is not existing.
So, to query for documents which do not have a field, we use "$notexists".

maprdb mapr:> find /tmp/maprdbjsontest/ --c '{"$notexists":"confidence"}' --fields confidence,label
{"label":"person2"}
1 document(s) found.

Now if you have a field with value as NULL, then to retrieve such records, use "$typeOf" operator.

maprdb mapr:> find /tmp/maprdbjsontest/ --c '{"$typeOf":{"confidence":"null"}}' --fields confidence,label
{"confidence":null,"label":"person4"}
1 document(s) found.

To query all documents that are not null use "$notTypeOf" operator.

maprdb mapr:> find /tmp/maprdbjsontest/ --c '{"$notTypeOf":{"confidence":"null"}}' --fields confidence,label
{"confidence":0.24,"label":"person"}
{"label":"person2"}
{"confidence":0.54,"label":"person3"}
{"confidence":1.5,"label":"person5"}
{"confidence":2.5,"label":"person6"}
{"confidence":"null","label":"person6"}
6 document(s) found.

Please note that for record with 'person6', the confidence value is string "null" and not null.