Use Case: Analyze Flight details using Spark - MapRDB JSON - Apache Drill leveraging the secondary index
Aim:
This post discusses a use case for analyzing flight travel details.
This analytic use case includes the following:
- Understand the flight travel statistics data
- Extract the necessary data from the CSV files using Spark
- Run queries using Spark SQL
- Perform data cleaning and business logic using Spark
- Convert the data to JSON and then to OJAI MapRDB RDD
- Store the data in MapRDB JSON table
- Create a secondary index on the MapRDB JSON table
- Query the data using Apache DRILL
- Use BI tool to see the report
The following diagram explains the use case in brief.
The data and complete code is available in my GitHub repository - https://github.com/jamealwi2/spark-maprdb
The Data
The data contains details of flight travels made in the CSV format.
You can download the data from here.
Extracting the necessary data
At this point, you should be having the sample data with you.
You can see that each row in the file contains 29 fields. Details about each field can be found here.
Let's design two specific use case to work with:
- Find the total number of canceled flights.
- Find the number of flights canceled to a particular destination.
Let's get started and get our hands dirty ;)
Read the data from the CSV file.
// Reading the input data val csvfile = "/tmp/flightdata/2008.csv" var csvDF = spark.read.option("header", "true").csv(csvfile)
Since we are interested only in flight travels which were canceled, we need only the following fields:
Year,Month,DayofMonth,CRSDepTime,FlightNum,Origin,Dest,Cancelled,CancellationCode
Create a case class 'CancelledFlights' that defines necessary fields for the use case.
// Case Class containing the Year, Month, DayofMonth, Scheduled departure time(CRSDepTime), // FlightNumber, Origin, Destination, Cancellation status and Cancellation code. case class CancelledFlights(Year: String, Month: String, DayofMonth: String, CRSDepTime: String, FlightNum: String, Origin: String, Dest: String, Cancelled: String, CancellationCode: String) extends Serializable
Now, create a temporary view to select the necessary fields.
// Creating a temporary view to query only necessary fields. csvDF.createOrReplaceTempView("flightdata")
Filtering the necessary fields:
// Filtering the necessary information. // We need only subset of fields from the available 29 fields. // Also, we need information only for the cancelled flights. // Cancelled flights will have 'Cancelled=1'. val cancelledFlightsDS: Dataset[CancelledFlights] = spark .sql( "select Year,Month,DayofMonth,CRSDepTime,FlightNum,Origin,Dest,Cancelled,CancellationCode from flightdata where Cancelled='1'") .as[CancelledFlights]
Perform data cleaning and business logic using Spark
Let's go ahead to clean the data a bit and give more value to it. We will convert the CancellationCode to meaningful CancellationReason. Define a UDF that matches the CancellationCode to CancellationReason.
// Converting the Cancellation code to more meaningful reason. val lookup = udf[String, String](cancellationCode => cancellationCode match { case "A" => "carrier" case "B" => "weather" case "C" => "NAS" case "D" => "security" case _ => "unknown" })
Apply the business logic to the Spark Dataset.
val cancelledFlightsWithReasonDS = cancelledFlightsDS.withColumn( "CancellationReason", lookup(cancelledFlightsDS("CancellationCode"))) cancelledFlightsWithReasonDS.createOrReplaceTempView("cancelledflightdata")
Create a case class with CancellationReason.
// Case Class containing the Year, Month, DayofMonth, Scheduled departure time(CRSDepTime), // FlightNumber, Origin, Destination and Cancellation reason. case class CancelledFlightsWithReason(Year: String, Month: String, DayofMonth: String, CRSDepTime: String, FlightNum: String, Origin: String, Dest: String, CancellationReason: String) extends Serializable
Create a new dataset with CancellationReason and removing CancellationCode.
val cancelledFlightsWithReasonDS2: Dataset[CancelledFlightsWithReason] = spark .sql( "select Year,Month,DayofMonth,CRSDepTime,FlightNum,Origin,Dest,CancellationReason from cancelledflightdata") .as[CancelledFlightsWithReason]
Convert the data to JSON and then to OJAI MapRDB-JSON RDD
We will now create an OJAI MAPRDB-JSON RDD that will then be stored in MapRDB JSON table. Define a case class with necessary details.
// Case class defining the MapRDB-JSON document. case class FlightJsonDocument(_id: String, FlightNum: String, Origin: String, Dest: String, CancellationReason: String) extends Serializable
Define a function that will perform the logic to create the '_id' necessary for the MapRDB JSON table. We will combine Year, Month, DayOfMonth, FlightNum and CRSDepTime.
// Function creating the _id for MapRDB JSON def createJsonDocument(r: CancelledFlightsWithReason): FlightJsonDocument = { val _id = r.Year + "_" + r.Month + "_" + r.DayofMonth + "#" + r.FlightNum + "#" + r.CRSDepTime FlightJsonDocument(_id, r.FlightNum, r.Origin, r.Dest, r.CancellationReason) }
Enough of the preparation works, let's create the OJAI RDD!
// Creating OJAI MapRDB-JSON RDD val ojaiRDD = cancelledFlightsWithReasonDS2 .map(createJsonDocument) .toJSON .rdd .map(MapRDBSpark.newDocument)
Store the data in MapRDB JSON table
To persist the data to MapRDB JSON, use 'saveToMapRDB' method available with OJAI MAPRDB-JSON RDD.
// Storing the data to MapRDB-JSON table. ojaiRDD.saveToMapRDB("/tmp/flighttable", createTable = true, idFieldPath = "_id")
Querying the data using Apache Drill
Now let's query the data using Apache Drill. Execute the first use case, i.e., counting the total number of canceled flights.
0: jdbc:drill:> select count(*) from dfs.tmp.flighttable; +---------+ | EXPR$0 | +---------+ | 137418 | +---------+ 1 row selected (4.897 seconds) 0: jdbc:drill:>
If you take look into the physical plan for the query, you can see that the data is read from the '/tmp/flighttable' MapRDB-JSON table.
tableName=maprfs:///tmp/flighttable
The complete physical plan is given below:
00-00 Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {412256.1 rows, 2336119.1 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 225 00-01 Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {412256.0 rows, 2336119.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 224 00-02 StreamAgg(group=[{}], EXPR$0=[$SUM0($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {412255.0 rows, 2336118.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 223 00-03 StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {412254.0 rows, 2336106.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 222 00-04 Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 137418.0, cumulative cost = {274836.0 rows, 687090.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 221 00-05 Scan(table=[[dfs, tmp, flighttable]], groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec [tableName=maprfs:///tmp/flighttable, condition=null], columns=[], maxwidth=1]]) : rowType = RecordType(): rowcount = 137418.0, cumulative cost = {137418.0 rows, 137418.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 220
Create a secondary index on the MapRDB JSON table
Let's move on to the second use case. It is different from the first use case in the aspect that first use was dealing with the complete set of data. In this use case, we are interested in the number of flights canceled to a particular destination. While querying MapRDB JSON, if there is a condition that involves field other than the '_id' field (equivalent to RowKey concept in Apache HBase), then it calls for a complete table scan. Well, that does look like performance friendly!
Example: Find out the total number of canceled flights to 'LAS'.
0: jdbc:drill:> select count(*) from dfs.tmp.flighttable where Dest='LAS'; +---------+ | EXPR$0 | +---------+ | 1804 | +---------+ 1 row selected (1.319 seconds)
Looking at the physical plan:
00-00 Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {206128.1 rows, 1168054.1 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 459 00-01 Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {206128.0 rows, 1168054.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 458 00-02 StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {206127.0 rows, 1168053.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 457 00-03 Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 68709.0, cumulative cost = {137418.0 rows, 343545.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 456 00-04 Scan(table=[[dfs, tmp, flighttable]], groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec [tableName=maprfs:///tmp/flighttable, condition=(Dest = "LAS")], columns=[`Dest`], maxwidth=1]]) : rowType = RecordType(ANY Dest): rowcount = 68709.0, cumulative cost = {68709.0 rows, 68709.0 cpu, 5.33987328E8 io, 0.0 network, 0.0 memory}, id = 455
You can see that the cumulative cost for the table scan operation is 68709 rows.
From the Drill query profile page, you will also get the total cost and the total duration of the query.
The secondary indexing feature available from MapR 6.0 comes to the rescue here. We will create a secondary index with the 'Dest' as indexed field.
maprcli table index add -path /tmp/flighttable -index destinationindex -indexedfields Dest -includedfields FlightNum
Re-run the query and observe the results.
0: jdbc:drill:> select count(*) from dfs.tmp.flighttable where Dest='LAS'; +---------+ | EXPR$0 | +---------+ | 1804 | +---------+ 1 row selected (0.807 seconds)
00-00 Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {207.22700979050714 rows, 1169.1530554795404 cpu, 262144.0 io, 0.0 network, 0.0 memory}, id = 751 00-01 Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {207.12700979050715 rows, 1169.0530554795405 cpu, 262144.0 io, 0.0 network, 0.0 memory}, id = 750 00-02 StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {206.12700979050715 rows, 1168.0530554795405 cpu, 262144.0 io, 0.0 network, 0.0 memory}, id = 749 00-03 Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 68.70900326350238, cumulative cost = {137.41800652700476 rows, 343.5450163175119 cpu, 262144.0 io, 0.0 network, 0.0 memory}, id = 748 00-04 Scan(table=[[dfs, tmp, flighttable]], groupscan=[JsonTableGroupScan [ScanSpec=JsonScanSpec [tableName=maprfs:///tmp/flighttable, condition=(Dest = "LAS"), indexName=destinationindex], columns=[`Dest`], maxwidth=1]]) : rowType = RecordType(ANY Dest): rowcount = 68.70900326350238, cumulative cost = {68.70900326350238 rows, 68.70900326350238 cpu, 262144.0 io, 0.0 network, 0.0 memory}, id = 747
Now we have indexed table in the picture. You can see that in the table scan operation (indexName=destinationindex). As you might have noticed, the time for the query to return the result has reduced as is the cumulative cost. (Wait, you may not appreciate the results here since we are dealing with queries that return results in a second, but it will make considerable performance impact on a larger dataset.)
Use BI tool to see the report
You can now go ahead to use BI tool of your choice to connect to Drill and produce interesting reports.
Happy Drilling!