Friday, 30 March 2018

DRILL Query on Hive transaction table getting wrong stats


Aim:

This post discusses an interesting DRILL behavior when querying Hive transaction tables.

Details:

From DRILL 1.12 onwards, it supports querying over Hive transaction table.
This is as part of https://issues.apache.org/jira/browse/DRILL-5978. DRILL 1.12 now comes with Hive 2.1 package which supports operations on Hive transaction tables.

Environment setup:
DRILL 1.12/ Hive 2.1 

1. Create a Hive transaction table in Hive. Let's call it txn_test2.

hive> show create table txn_test2;
OK
CREATE TABLE `txn_test2`(
  `a` int,
  `b` string)
PARTITIONED BY (
  `c` string)
CLUSTERED BY (
  a)
INTO 3 BUCKETS
ROW FORMAT SERDE
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
  'maprfs:/user/hive/warehouse/txn_test'
TBLPROPERTIES (
  'transactional'='true',
  'transient_lastDdlTime'='1522439951')
Time taken: 0.151 seconds, Fetched: 19 row(s)
2. Insert some data into the table.

hive> select * from txn_test2;
OK
3       James   1
1       Alwin   1
4       James   1
2       Alwin   1
3       James   2
1       Alwin   2
4       James   2
2       Alwin   2
Time taken: 0.247 seconds, Fetched: 8 row(s)

3. Query the Hive table using DRILL. 

0: jdbc:drill:> use hive;
+-------+-----------------------------------+
|  ok   |              summary              |
+-------+-----------------------------------+
| true  | Default schema changed to [hive]  |
+-------+-----------------------------------+
1 row selected (0.111 seconds)
0: jdbc:drill:> select count(1) from txn_test2;
+---------+
| EXPR$0  |
+---------+
| 8       |
+---------+
1 row selected (1.258 seconds)
0: jdbc:drill:>


4. Now check the physical plan for the corresponding query. 

00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.1 rows, 17.1 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 646
00-01      Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0 rows, 17.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 645
00-02        StreamAgg(group=[{}], EXPR$0=[COUNT($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {2.0 rows, 16.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 644
00-03          Project($f0=[1]) : rowType = RecordType(INTEGER $f0): rowcount = 1.0, cumulative cost = {1.0 rows, 4.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 643
00-04            Scan(table=[[hive, txn_test2]], groupscan=[HiveScan [table=Table(dbName:default, tableName:txn_test2), columns=[], numPartitions=2, partitions= [Partition(values:[1]), Partition(values:[2])], inputDirectories=[maprfs:/user/hive/warehouse/txn_test/c=1, maprfs:/user/hive/warehouse/txn_test/c=2]]]) : rowType = RecordType(): rowcount = 1.0, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 642

Did you observe anything interesting? If not, take a closer look at the cumulative cost of the table scan operation. You can see that the cumulative cost shows the total number of rows to be 0. Now, that's weird! It should show me 8 since the Hive table has 8 rows. (Well, if not exactly 8, something nearby at least!)

If you are familiar with DRILL, then you will know that the one property that the number of minor fragments (~ number of threads) depends on is the slice_target. slice_target depends on the cost calculated. If the cost is 0, it will spin only one minor fragment. This will be a problem when you are dealing with millions of records. 

Here, the issue is with statistics returned from Hive. 

hive> describe formatted txn_test2 partition (c=1);
OK
# col_name              data_type               comment

a                       int
b                       string

# Partition Information
# col_name              data_type               comment

c                       string

# Detailed Partition Information
Partition Value:        [1]
Database:               default
Table:                  txn_test2
CreateTime:             Fri Mar 30 13:01:30 PDT 2018
LastAccessTime:         UNKNOWN
Location:               maprfs:/user/hive/warehouse/txn_test/c=1
Partition Parameters:
        numFiles                3
        numRows                 0
        rawDataSize             0
        totalSize               1940
        transient_lastDdlTime   1522440090

# Storage Information
SerDe Library:          org.apache.hadoop.hive.ql.io.orc.OrcSerde
InputFormat:            org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
Compressed:             No
Num Buckets:            3
Bucket Columns:         [a]
Sort Columns:           []
Storage Desc Params:
        serialization.format    1
Time taken: 0.278 seconds, Fetched: 34 row(s)

You can see that the number of rows (numRows) is 0. 


Solution:


Compute statistics in Hive using the command:
https://cwiki.apache.org/confluence/display/Hive/StatsDev

hive> analyze table txn_test2 partition(c) compute statistics;
Partition default.txn_test2{c=2} stats: [numFiles=3, numRows=4, totalSize=1940, rawDataSize=500]
Partition default.txn_test2{c=1} stats: [numFiles=3, numRows=4, totalSize=1940, rawDataSize=500]
OK
Time taken: 0.677 seconds
hive>

Now verify the statistics again:

hive> describe formatted txn_test2 partition (c=1);
OK
# col_name              data_type               comment

a                       int
b                       string

# Partition Information
# col_name              data_type               comment

c                       string

# Detailed Partition Information
Partition Value:        [1]
Database:               default
Table:                  txn_test2
CreateTime:             Fri Mar 30 13:01:30 PDT 2018
LastAccessTime:         UNKNOWN
Location:               maprfs:/user/hive/warehouse/txn_test/c=1
Partition Parameters:
        COLUMN_STATS_ACCURATE   {\"BASIC_STATS\":\"true\"}
        numFiles                3
        numRows                 4
        rawDataSize             500
        totalSize               1940
        transient_lastDdlTime   1522440837

# Storage Information
SerDe Library:          org.apache.hadoop.hive.ql.io.orc.OrcSerde
InputFormat:            org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
OutputFormat:           org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
Compressed:             No
Num Buckets:            3
Bucket Columns:         [a]
Sort Columns:           []
Storage Desc Params:
        serialization.format    1
Time taken: 0.305 seconds, Fetched: 35 row(s)

You can see that the numRows have changed to 4. (This is because we are checking stats for partition c=1)

Fire the query from DRILL again and check the physical plan.

00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {25.1 rows, 289.1 cpu, 3880.0 io, 0.0 network, 0.0 memory}, id = 808
00-01      Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {25.0 rows, 289.0 cpu, 3880.0 io, 0.0 network, 0.0 memory}, id = 807
00-02        StreamAgg(group=[{}], EXPR$0=[COUNT($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {24.0 rows, 288.0 cpu, 3880.0 io, 0.0 network, 0.0 memory}, id = 806
00-03          Project($f0=[1]) : rowType = RecordType(INTEGER $f0): rowcount = 8.0, cumulative cost = {16.0 rows, 192.0 cpu, 3880.0 io, 0.0 network, 0.0 memory}, id = 805
00-04            Scan(table=[[hive, txn_test2]], groupscan=[HiveScan [table=Table(dbName:default, tableName:txn_test2), columns=[], numPartitions=2, partitions= [Partition(values:[1]), Partition(values:[2])], inputDirectories=[maprfs:/user/hive/warehouse/txn_test/c=1, maprfs:/user/hive/warehouse/txn_test/c=2]]]) : rowType = RecordType(): rowcount = 8.0, cumulative cost = {8.0 rows, 160.0 cpu, 3880.0 io, 0.0 network, 0.0 memory}, id = 804

Now you can see that the estimate is correct. This is the expected behavior.

Happy Drilling!

2 comments:

  1. When we learn about these kinds of amazing things, then we believe in the future. Now it is outstanding to assume that technology can regulate itself. Machine learning consulting companies should guide the world in the right direction for better growth.

    ReplyDelete
  2. As we know, Big data consulting services is the future of the industries these days, this article helps me to figure out which language I need to learn to pursue the future in this field.

    ReplyDelete