Saturday, 7 April 2018


Use Case: Analyze Flight details using Spark - MapRDB JSON - Apache Drill leveraging the secondary index


Aim:


This post discusses a use case for analyzing flight travel details. 
This analytic use case includes the following:

  • Understand the flight travel statistics data
  • Extract the necessary data from the CSV files using Spark
  • Run queries using Spark SQL
  • Perform data cleaning and business logic using Spark
  • Convert the data to JSON and then to OJAI MapRDB RDD
  • Store the data in MapRDB JSON table
  • Create a secondary index on the MapRDB JSON table
  • Query the data using Apache DRILL
  • Use BI tool to see the report 


The following diagram explains the use case in brief.






The data and complete code is available in my GitHub repository -  https://github.com/jamealwi2/spark-maprdb


The Data


The data contains details of flight travels made in the CSV format.
You can download the data from here.

Extracting the necessary data


At this point, you should be having the sample data with you.
You can see that each row in the file contains 29 fields. Details about each field can be found here.

Let's design two specific use case to work with:
  1. Find the total number of canceled flights.
  2. Find the number of flights canceled to a particular destination.

Let's get started and get our hands dirty ;)

Read the data from the CSV file.

    // Reading the input data
    val csvfile = "/tmp/flightdata/2008.csv"
    var csvDF = spark.read.option("header", "true").csv(csvfile)


Since we are interested only in flight travels which were canceled, we need only the following fields:
Year,Month,DayofMonth,CRSDepTime,FlightNum,Origin,Dest,Cancelled,CancellationCode

Create a case class 'CancelledFlights' that defines necessary fields for the use case.

  // Case Class containing the Year, Month, DayofMonth, Scheduled departure time(CRSDepTime), 
  // FlightNumber, Origin, Destination, Cancellation status and Cancellation code.
  case class CancelledFlights(Year: String,
                              Month: String,
                              DayofMonth: String,
                              CRSDepTime: String,
                              FlightNum: String,
                              Origin: String,
                              Dest: String,
                              Cancelled: String,
                              CancellationCode: String)
extends Serializable


Now, create a temporary view to select the necessary fields.

// Creating a temporary view to query only necessary fields.
    csvDF.createOrReplaceTempView("flightdata")

Filtering the necessary fields:

// Filtering the necessary information.
    // We need only subset of fields from the available 29 fields.
    // Also, we need information only for the cancelled flights.
    // Cancelled flights will have 'Cancelled=1'.
    val cancelledFlightsDS: Dataset[CancelledFlights] = spark
      .sql(
        "select Year,Month,DayofMonth,CRSDepTime,FlightNum,Origin,Dest,Cancelled,CancellationCode from flightdata where Cancelled='1'")
      .as[CancelledFlights]

Perform data cleaning and business logic using Spark


Let's go ahead to clean the data a bit and give more value to it. We will convert the CancellationCode to meaningful CancellationReason. Define a UDF that matches the CancellationCode to CancellationReason.

// Converting the Cancellation code to more meaningful reason.
    val lookup = udf[String, String](cancellationCode =>
      cancellationCode match {
        case "A" => "carrier"
        case "B" => "weather"
        case "C" => "NAS"
        case "D" => "security"
        case _   => "unknown"
})

Apply the business logic to the Spark Dataset.

val cancelledFlightsWithReasonDS = cancelledFlightsDS.withColumn(
      "CancellationReason",
      lookup(cancelledFlightsDS("CancellationCode")))
    cancelledFlightsWithReasonDS.createOrReplaceTempView("cancelledflightdata")

Create a case class with CancellationReason.

// Case Class containing the Year, Month, DayofMonth, Scheduled departure time(CRSDepTime), 
  // FlightNumber, Origin, Destination and Cancellation reason.
  case class CancelledFlightsWithReason(Year: String,
                                        Month: String,
                                        DayofMonth: String,
                                        CRSDepTime: String,
                                        FlightNum: String,
                                        Origin: String,
                                        Dest: String,
                                        CancellationReason: String)
extends Serializable

Create a new dataset with CancellationReason and removing CancellationCode.

val cancelledFlightsWithReasonDS2: Dataset[CancelledFlightsWithReason] =
      spark
        .sql(
          "select Year,Month,DayofMonth,CRSDepTime,FlightNum,Origin,Dest,CancellationReason from cancelledflightdata")
        .as[CancelledFlightsWithReason]

Convert the data to JSON and then to OJAI MapRDB-JSON RDD


We will now create an OJAI MAPRDB-JSON RDD that will then be stored in MapRDB JSON table. Define a case class with necessary details.

// Case class defining the MapRDB-JSON document. 
  case class FlightJsonDocument(_id: String,
                                FlightNum: String,
                                Origin: String,
                                Dest: String,
                                CancellationReason: String)
extends Serializable

Define a function that will perform the logic to create the '_id' necessary for the MapRDB JSON table. We will combine Year, Month, DayOfMonth, FlightNum and CRSDepTime.

// Function creating the _id for MapRDB JSON
  def createJsonDocument(r: CancelledFlightsWithReason): FlightJsonDocument = {
    val _id = r.Year + "_" + r.Month + "_" + r.DayofMonth + "#" + r.FlightNum + "#" + r.CRSDepTime
    FlightJsonDocument(_id, r.FlightNum, r.Origin, r.Dest, r.CancellationReason)
  }

Enough of the preparation works, let's create the OJAI RDD!

// Creating OJAI MapRDB-JSON RDD
    val ojaiRDD = cancelledFlightsWithReasonDS2
      .map(createJsonDocument)
      .toJSON
      .rdd
.map(MapRDBSpark.newDocument)


Store the data in MapRDB JSON table


To persist the data to MapRDB JSON, use 'saveToMapRDB' method available with OJAI MAPRDB-JSON RDD.

// Storing the data to MapRDB-JSON table.
    ojaiRDD.saveToMapRDB("/tmp/flighttable",
                         createTable = true,
                         idFieldPath = "_id")

Querying the data using Apache Drill


Now let's query the data using Apache Drill. Execute the first use case, i.e., counting the total number of canceled flights. 

0: jdbc:drill:> select count(*) from dfs.tmp.flighttable;
+---------+
| EXPR$0  |
+---------+
| 137418  |
+---------+
1 row selected (4.897 seconds)
0: jdbc:drill:>

If you take look into the physical plan for the query, you can see that the data is read from the '/tmp/flighttable' MapRDB-JSON table.

tableName=maprfs:///tmp/flighttable

The complete physical plan is given below:

00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {412256.1 rows, 2336119.1 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 225
00-01      Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {412256.0 rows, 2336119.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 224
00-02        StreamAgg(group=[{}], EXPR$0=[$SUM0($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {412255.0 rows, 2336118.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 223
00-03          StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {412254.0 rows, 2336106.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 222
00-04            Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 137418.0, cumulative cost = {274836.0 rows, 687090.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 221
00-05              Scan(table=[[dfs, tmp, flighttable]], groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec [tableName=maprfs:///tmp/flighttable, condition=null], columns=[], maxwidth=1]]) : rowType = RecordType(): rowcount = 137418.0, cumulative cost = {137418.0 rows, 137418.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 220


Create a secondary index on the MapRDB JSON table


Let's move on to the second use case. It is different from the first use case in the aspect that first use was dealing with the complete set of data. In this use case, we are interested in the number of flights canceled to a particular destination. While querying MapRDB JSON, if there is a condition that involves field other than the '_id' field (equivalent to RowKey concept in Apache HBase), then it calls for a complete table scan. Well, that does look like performance friendly!

Example: Find out the total number of canceled flights to 'LAS'.

0: jdbc:drill:> select count(*) from dfs.tmp.flighttable where Dest='LAS';
+---------+
| EXPR$0  |
+---------+
| 1804    |
+---------+
1 row selected (1.319 seconds)

Looking at the physical plan:

00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {206128.1 rows, 1168054.1 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 459
00-01      Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {206128.0 rows, 1168054.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 458
00-02        StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {206127.0 rows, 1168053.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 457
00-03          Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 68709.0, cumulative cost = {137418.0 rows, 343545.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 456
00-04            Scan(table=[[dfs, tmp, flighttable]], groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec [tableName=maprfs:///tmp/flighttable, condition=(Dest = "LAS")], columns=[`Dest`], maxwidth=1]]) : rowType = RecordType(ANY Dest): rowcount = 68709.0, cumulative cost = {68709.0 rows, 68709.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 455

You can see that the cumulative cost for the table scan operation is 68709 rows.
From the Drill query profile page, you will also get the total cost and the total duration of the query.




The secondary indexing feature available from MapR 6.0 comes to the rescue here. We will create a secondary index with the 'Dest' as indexed field.

maprcli table index add -path /tmp/flighttable -index destinationindex -indexedfields Dest -includedfields FlightNum

Re-run the query and observe the results.

0: jdbc:drill:> select count(*) from dfs.tmp.flighttable where Dest='LAS';
+---------+
| EXPR$0  |
+---------+
| 1804    |
+---------+
1 row selected (0.807 seconds)


00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {207.22700979050714 rows, 1169.1530554795404 cpu, 262144.0 io, 0.0 network, 0.0 memory}, id = 751
00-01      Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {207.12700979050715 rows, 1169.0530554795405 cpu, 262144.0 io, 0.0 network, 0.0 memory}, id = 750
00-02        StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {206.12700979050715 rows, 1168.0530554795405 cpu, 262144.0 io, 0.0 network, 0.0 memory}, id = 749
00-03          Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 68.70900326350238, cumulative cost = {137.41800652700476 rows, 343.5450163175119 cpu, 262144.0 io, 0.0 network, 0.0 memory}, id = 748
00-04            Scan(table=[[dfs, tmp, flighttable]], groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec [tableName=maprfs:///tmp/flighttable, condition=(Dest = "LAS"), indexName=destinationindex], columns=[`Dest`], maxwidth=1]]) : rowType = RecordType(ANY Dest): rowcount = 68.70900326350238, cumulative cost = {68.70900326350238 rows, 68.70900326350238 cpu, 262144.0 io, 0.0 network, 0.0 memory}, id = 747

Now we have indexed table in the picture. You can see that in the table scan operation (indexName=destinationindex). As you might have noticed, the time for the query to return the result has reduced as is the cumulative cost. (Wait, you may not appreciate the results here since we are dealing with queries that return results in a second,  but it will make considerable performance impact on a larger dataset.)




Use BI tool to see the report


You can now go ahead to use BI tool of your choice to connect to Drill and produce interesting reports. 

Happy Drilling!




No comments:

Post a Comment