DRILL Query on Hive transaction table getting wrong stats
Aim:
This post discusses an interesting DRILL behavior when querying Hive transaction tables.
Details:
From DRILL 1.12 onwards, it supports querying over Hive transaction table.
This is as part of https://issues.apache.org/jira/browse/DRILL-5978. DRILL 1.12 now comes with Hive 2.1 package which supports operations on Hive transaction tables.
Environment setup:
DRILL 1.12/ Hive 2.1
1. Create a Hive transaction table in Hive. Let's call it txn_test2.
hive> show create table txn_test2; OK CREATE TABLE `txn_test2`( `a` int, `b` string) PARTITIONED BY ( `c` string) CLUSTERED BY ( a) INTO 3 BUCKETS ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' LOCATION 'maprfs:/user/hive/warehouse/txn_test' TBLPROPERTIES ( 'transactional'='true', 'transient_lastDdlTime'='1522439951') Time taken: 0.151 seconds, Fetched: 19 row(s)
2. Insert some data into the table.
hive> select * from txn_test2; OK 3 James 1 1 Alwin 1 4 James 1 2 Alwin 1 3 James 2 1 Alwin 2 4 James 2 2 Alwin 2 Time taken: 0.247 seconds, Fetched: 8 row(s)
3. Query the Hive table using DRILL.
0: jdbc:drill:> use hive; +-------+-----------------------------------+ | ok | summary | +-------+-----------------------------------+ | true | Default schema changed to [hive] | +-------+-----------------------------------+ 1 row selected (0.111 seconds) 0: jdbc:drill:> select count(1) from txn_test2; +---------+ | EXPR$0 | +---------+ | 8 | +---------+ 1 row selected (1.258 seconds) 0: jdbc:drill:>
4. Now check the physical plan for the corresponding query.
00-00 Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.1 rows, 17.1 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 646 00-01 Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {3.0 rows, 17.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 645 00-02 StreamAgg(group=[{}], EXPR$0=[COUNT($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {2.0 rows, 16.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 644 00-03 Project($f0=[1]) : rowType = RecordType(INTEGER $f0): rowcount = 1.0, cumulative cost = {1.0 rows, 4.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 643 00-04 Scan(table=[[hive, txn_test2]], groupscan=[HiveScan [table=Table(dbName:default, tableName:txn_test2), columns=[], numPartitions=2, partitions= [Partition(values:[1]), Partition(values:[2])], inputDirectories=[maprfs:/user/hive/warehouse/txn_test/c=1, maprfs:/user/hive/warehouse/txn_test/c=2]]]) : rowType = RecordType(): rowcount = 1.0, cumulative cost = {0.0 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 642
Did you observe anything interesting? If not, take a closer look at the cumulative cost of the table scan operation. You can see that the cumulative cost shows the total number of rows to be 0. Now, that's weird! It should show me 8 since the Hive table has 8 rows. (Well, if not exactly 8, something nearby at least!)
If you are familiar with DRILL, then you will know that the one property that the number of minor fragments (~ number of threads) depends on is the slice_target. slice_target depends on the cost calculated. If the cost is 0, it will spin only one minor fragment. This will be a problem when you are dealing with millions of records.
Here, the issue is with statistics returned from Hive.
hive> describe formatted txn_test2 partition (c=1); OK # col_name data_type comment a int b string # Partition Information # col_name data_type comment c string # Detailed Partition Information Partition Value: [1] Database: default Table: txn_test2 CreateTime: Fri Mar 30 13:01:30 PDT 2018 LastAccessTime: UNKNOWN Location: maprfs:/user/hive/warehouse/txn_test/c=1 Partition Parameters: numFiles 3 numRows 0 rawDataSize 0 totalSize 1940 transient_lastDdlTime 1522440090 # Storage Information SerDe Library: org.apache.hadoop.hive.ql.io.orc.OrcSerde InputFormat: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat Compressed: No Num Buckets: 3 Bucket Columns: [a] Sort Columns: [] Storage Desc Params: serialization.format 1 Time taken: 0.278 seconds, Fetched: 34 row(s)
You can see that the number of rows (numRows) is 0.
Solution:
Compute statistics in Hive using the command:
https://cwiki.apache.org/confluence/display/Hive/StatsDev
hive> analyze table txn_test2 partition(c) compute statistics; Partition default.txn_test2{c=2} stats: [numFiles=3, numRows=4, totalSize=1940, rawDataSize=500] Partition default.txn_test2{c=1} stats: [numFiles=3, numRows=4, totalSize=1940, rawDataSize=500] OK Time taken: 0.677 seconds hive>
Now verify the statistics again:
hive> describe formatted txn_test2 partition (c=1); OK # col_name data_type comment a int b string # Partition Information # col_name data_type comment c string # Detailed Partition Information Partition Value: [1] Database: default Table: txn_test2 CreateTime: Fri Mar 30 13:01:30 PDT 2018 LastAccessTime: UNKNOWN Location: maprfs:/user/hive/warehouse/txn_test/c=1 Partition Parameters: COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"} numFiles 3 numRows 4 rawDataSize 500 totalSize 1940 transient_lastDdlTime 1522440837 # Storage Information SerDe Library: org.apache.hadoop.hive.ql.io.orc.OrcSerde InputFormat: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat Compressed: No Num Buckets: 3 Bucket Columns: [a] Sort Columns: [] Storage Desc Params: serialization.format 1 Time taken: 0.305 seconds, Fetched: 35 row(s)
You can see that the numRows have changed to 4. (This is because we are checking stats for partition c=1)
Fire the query from DRILL again and check the physical plan.
00-00 Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {25.1 rows, 289.1 cpu, 3880.0 io, 0.0 network, 0.0 memory}, id = 808 00-01 Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {25.0 rows, 289.0 cpu, 3880.0 io, 0.0 network, 0.0 memory}, id = 807 00-02 StreamAgg(group=[{}], EXPR$0=[COUNT($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {24.0 rows, 288.0 cpu, 3880.0 io, 0.0 network, 0.0 memory}, id = 806 00-03 Project($f0=[1]) : rowType = RecordType(INTEGER $f0): rowcount = 8.0, cumulative cost = {16.0 rows, 192.0 cpu, 3880.0 io, 0.0 network, 0.0 memory}, id = 805 00-04 Scan(table=[[hive, txn_test2]], groupscan=[HiveScan [table=Table(dbName:default, tableName:txn_test2), columns=[], numPartitions=2, partitions= [Partition(values:[1]), Partition(values:[2])], inputDirectories=[maprfs:/user/hive/warehouse/txn_test/c=1, maprfs:/user/hive/warehouse/txn_test/c=2]]]) : rowType = RecordType(): rowcount = 8.0, cumulative cost = {8.0 rows, 160.0 cpu, 3880.0 io, 0.0 network, 0.0 memory}, id = 804
Now you can see that the estimate is correct. This is the expected behavior.
Happy Drilling!
When we learn about these kinds of amazing things, then we believe in the future. Now it is outstanding to assume that technology can regulate itself. Machine learning consulting companies should guide the world in the right direction for better growth.
ReplyDeleteAs we know, Big data consulting services is the future of the industries these days, this article helps me to figure out which language I need to learn to pursue the future in this field.
ReplyDelete