From 1f6be820f327503b460f9e8c372c07d6b0918fca Mon Sep 17 00:00:00 2001 From: Satish Kotha Date: Thu, 2 Apr 2020 11:05:54 -0700 Subject: [PATCH] [HUDI-758] Modify Integration test to include incremental queries for MOR tables --- ...mit_time.sh => get_min_commit_time_cow.sh} | 0 docker/demo/get_min_commit_time_mor.sh | 20 +++++++++ ...commands => hive-incremental-cow.commands} | 2 +- docker/demo/hive-incremental-mor-ro.commands | 27 +++++++++++ docker/demo/hive-incremental-mor-rt.commands | 27 +++++++++++ .../apache/hudi/integ/ITTestHoodieDemo.java | 45 +++++++++++++------ 6 files changed, 106 insertions(+), 15 deletions(-) rename docker/demo/{get_min_commit_time.sh => get_min_commit_time_cow.sh} (100%) create mode 100755 docker/demo/get_min_commit_time_mor.sh rename docker/demo/{hive-incremental.commands => hive-incremental-cow.commands} (93%) create mode 100644 docker/demo/hive-incremental-mor-ro.commands create mode 100644 docker/demo/hive-incremental-mor-rt.commands diff --git a/docker/demo/get_min_commit_time.sh b/docker/demo/get_min_commit_time_cow.sh similarity index 100% rename from docker/demo/get_min_commit_time.sh rename to docker/demo/get_min_commit_time_cow.sh diff --git a/docker/demo/get_min_commit_time_mor.sh b/docker/demo/get_min_commit_time_mor.sh new file mode 100755 index 000000000..190ed97bd --- /dev/null +++ b/docker/demo/get_min_commit_time_mor.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +MIN_COMMIT_TIME=`hdfs dfs -ls -t /user/hive/warehouse/stock_ticks_mor/.hoodie/*.deltacommit | head -1 | awk -F'/' ' { print $7 } ' | awk -F'.' ' { print $1 } '` +echo $MIN_COMMIT_TIME; diff --git a/docker/demo/hive-incremental.commands b/docker/demo/hive-incremental-cow.commands similarity index 93% rename from docker/demo/hive-incremental.commands rename to docker/demo/hive-incremental-cow.commands index 9b52c3d2e..7f4354807 100644 --- a/docker/demo/hive-incremental.commands +++ b/docker/demo/hive-incremental-cow.commands @@ -19,7 +19,7 @@ add jar ${hudi.hadoop.bundle}; set hoodie.stock_ticks_cow.consume.mode=INCREMENTAL; set hoodie.stock_ticks_cow.consume.max.commits=3; -set hoodie.stock_ticks_cow.consume.start.timestamp=${min.commit.time}; +set hoodie.stock_ticks_cow.consume.start.timestamp='${min.commit.time}'; select symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG' and `_hoodie_commit_time` > '${min.commit.time}'; diff --git a/docker/demo/hive-incremental-mor-ro.commands b/docker/demo/hive-incremental-mor-ro.commands new file mode 100644 index 000000000..8b97c0aac --- /dev/null +++ b/docker/demo/hive-incremental-mor-ro.commands @@ -0,0 +1,27 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add jar ${hudi.hadoop.bundle}; + +set hoodie.stock_ticks_mor.consume.mode=INCREMENTAL; +set hoodie.stock_ticks_mor.consume.max.commits=3; +set hoodie.stock_ticks_mor.consume.start.timestamp='${min.commit.time}'; + +select symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG' and `_hoodie_commit_time` > '${min.commit.time}'; + +!quit + diff --git a/docker/demo/hive-incremental-mor-rt.commands b/docker/demo/hive-incremental-mor-rt.commands new file mode 100644 index 000000000..a81fb77e0 --- /dev/null +++ b/docker/demo/hive-incremental-mor-rt.commands @@ -0,0 +1,27 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +add jar ${hudi.hadoop.bundle}; + +set hoodie.stock_ticks_mor.consume.mode=INCREMENTAL; +set hoodie.stock_ticks_mor.consume.max.commits=3; +set hoodie.stock_ticks_mor.consume.start.timestamp='${min.commit.time}'; + +select symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG' and `_hoodie_commit_time` > '${min.commit.time}'; + +!quit + diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java index 01eecd0b9..d89003114 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java @@ -53,7 +53,8 @@ public class ITTestHoodieDemo extends ITTestBase { private static final String MOR_TABLE_NAME = "stock_ticks_mor"; private static final String DEMO_CONTAINER_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/setup_demo_container.sh"; - private static final String MIN_COMMIT_TIME_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/get_min_commit_time.sh"; + private static final String MIN_COMMIT_TIME_COW_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/get_min_commit_time_cow.sh"; + private static final String MIN_COMMIT_TIME_MOR_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/get_min_commit_time_mor.sh"; private static final String HUDI_CLI_TOOL = HOODIE_WS_ROOT + "/hudi-cli/hudi-cli.sh"; private static final String COMPACTION_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/compaction.commands"; private static final String SPARKSQL_BATCH1_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sparksql-batch1.commands"; @@ -62,7 +63,9 @@ public class ITTestHoodieDemo extends ITTestBase { private static final String HIVE_TBLCHECK_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-table-check.commands"; private static final String HIVE_BATCH1_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-batch1.commands"; private static final String HIVE_BATCH2_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-batch2-after-compaction.commands"; - private static final String HIVE_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental.commands"; + private static final String HIVE_INCREMENTAL_COW_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental-cow.commands"; + private static final String HIVE_INCREMENTAL_MOR_RO_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental-mor-ro.commands"; + private static final String HIVE_INCREMENTAL_MOR_RT_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental-mor-rt.commands"; private static String HIVE_SYNC_CMD_FMT = " --enable-hive-sync --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000 " @@ -87,7 +90,7 @@ public class ITTestHoodieDemo extends ITTestBase { testHiveAfterSecondBatch(); testPrestoAfterSecondBatch(); testSparkSQLAfterSecondBatch(); - testIncrementalHiveQuery(); + testIncrementalHiveQueryBeforeCompaction(); testIncrementalSparkSQLQuery(); // compaction @@ -267,23 +270,37 @@ public class ITTestHoodieDemo extends ITTestBase { assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085|"); } - private void testIncrementalHiveQuery() throws Exception { + private void testIncrementalHiveQuery(String minCommitTimeScript, String incrementalCommandsFile, + String expectedOutput, int expectedTimes) throws Exception { String minCommitTime = - executeCommandStringInDocker(ADHOC_2_CONTAINER, MIN_COMMIT_TIME_SCRIPT, true).getStdout().toString(); + executeCommandStringInDocker(ADHOC_2_CONTAINER, minCommitTimeScript, true).getStdout().toString(); Pair stdOutErrPair = - executeHiveCommandFile(HIVE_INCREMENTAL_COMMANDS, "min.commit.time=" + minCommitTime + "`"); - assertStdOutContains(stdOutErrPair, "| GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |"); + executeHiveCommandFile(incrementalCommandsFile, "min.commit.time=" + minCommitTime + "`"); + assertStdOutContains(stdOutErrPair, expectedOutput, expectedTimes); + } + + private void testIncrementalHiveQueryBeforeCompaction() throws Exception { + String expectedOutputCOW = "| GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |"; + + // verify that 10:59 is present in COW table because there is no compaction process for COW + testIncrementalHiveQuery(MIN_COMMIT_TIME_COW_SCRIPT, HIVE_INCREMENTAL_COW_COMMANDS, expectedOutputCOW, 1); + + // verify that 10:59 is NOT present in RO table because of pending compaction + testIncrementalHiveQuery(MIN_COMMIT_TIME_MOR_SCRIPT, HIVE_INCREMENTAL_MOR_RO_COMMANDS, expectedOutputCOW, 0); + + // verify that 10:59 is present in RT table even with pending compaction + testIncrementalHiveQuery(MIN_COMMIT_TIME_MOR_SCRIPT, HIVE_INCREMENTAL_MOR_RT_COMMANDS, expectedOutputCOW, 1); } private void testIncrementalHiveQueryAfterCompaction() throws Exception { - String minCommitTime = - executeCommandStringInDocker(ADHOC_2_CONTAINER, MIN_COMMIT_TIME_SCRIPT, true).getStdout().toString(); - Pair stdOutErrPair = - executeHiveCommandFile(HIVE_INCREMENTAL_COMMANDS, "min.commit.time=" + minCommitTime + "`"); - assertStdOutContains(stdOutErrPair, - "| symbol | ts | volume | open | close |\n" + String expectedOutput = "| symbol | ts | volume | open | close |\n" + "+---------+----------------------+---------+------------+-----------+\n" - + "| GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |"); + + "| GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |"; + + // verify that 10:59 is present for all views because compaction is complete + testIncrementalHiveQuery(MIN_COMMIT_TIME_COW_SCRIPT, HIVE_INCREMENTAL_COW_COMMANDS, expectedOutput, 1); + testIncrementalHiveQuery(MIN_COMMIT_TIME_MOR_SCRIPT, HIVE_INCREMENTAL_MOR_RO_COMMANDS, expectedOutput, 1); + testIncrementalHiveQuery(MIN_COMMIT_TIME_MOR_SCRIPT, HIVE_INCREMENTAL_MOR_RT_COMMANDS, expectedOutput, 1); } private void testIncrementalSparkSQLQuery() throws Exception {