1
0

[HUDI-758] Modify Integration test to include incremental queries for MOR tables

This commit is contained in:
Satish Kotha
2020-04-02 11:05:54 -07:00
committed by Bhavani Sudha Saktheeswaran
parent f7b55afb74
commit 1f6be820f3
6 changed files with 106 additions and 15 deletions

View File

@@ -0,0 +1,20 @@
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
MIN_COMMIT_TIME=`hdfs dfs -ls -t /user/hive/warehouse/stock_ticks_mor/.hoodie/*.deltacommit | head -1 | awk -F'/' ' { print $7 } ' | awk -F'.' ' { print $1 } '`
echo $MIN_COMMIT_TIME;

View File

@@ -19,7 +19,7 @@ add jar ${hudi.hadoop.bundle};
set hoodie.stock_ticks_cow.consume.mode=INCREMENTAL;
set hoodie.stock_ticks_cow.consume.max.commits=3;
set hoodie.stock_ticks_cow.consume.start.timestamp=${min.commit.time};
set hoodie.stock_ticks_cow.consume.start.timestamp='${min.commit.time}';
select symbol, ts, volume, open, close from stock_ticks_cow where symbol = 'GOOG' and `_hoodie_commit_time` > '${min.commit.time}';

View File

@@ -0,0 +1,27 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
add jar ${hudi.hadoop.bundle};
set hoodie.stock_ticks_mor.consume.mode=INCREMENTAL;
set hoodie.stock_ticks_mor.consume.max.commits=3;
set hoodie.stock_ticks_mor.consume.start.timestamp='${min.commit.time}';
select symbol, ts, volume, open, close from stock_ticks_mor_ro where symbol = 'GOOG' and `_hoodie_commit_time` > '${min.commit.time}';
!quit

View File

@@ -0,0 +1,27 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
add jar ${hudi.hadoop.bundle};
set hoodie.stock_ticks_mor.consume.mode=INCREMENTAL;
set hoodie.stock_ticks_mor.consume.max.commits=3;
set hoodie.stock_ticks_mor.consume.start.timestamp='${min.commit.time}';
select symbol, ts, volume, open, close from stock_ticks_mor_rt where symbol = 'GOOG' and `_hoodie_commit_time` > '${min.commit.time}';
!quit

View File

@@ -53,7 +53,8 @@ public class ITTestHoodieDemo extends ITTestBase {
private static final String MOR_TABLE_NAME = "stock_ticks_mor";
private static final String DEMO_CONTAINER_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/setup_demo_container.sh";
private static final String MIN_COMMIT_TIME_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/get_min_commit_time.sh";
private static final String MIN_COMMIT_TIME_COW_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/get_min_commit_time_cow.sh";
private static final String MIN_COMMIT_TIME_MOR_SCRIPT = HOODIE_WS_ROOT + "/docker/demo/get_min_commit_time_mor.sh";
private static final String HUDI_CLI_TOOL = HOODIE_WS_ROOT + "/hudi-cli/hudi-cli.sh";
private static final String COMPACTION_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/compaction.commands";
private static final String SPARKSQL_BATCH1_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/sparksql-batch1.commands";
@@ -62,7 +63,9 @@ public class ITTestHoodieDemo extends ITTestBase {
private static final String HIVE_TBLCHECK_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-table-check.commands";
private static final String HIVE_BATCH1_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-batch1.commands";
private static final String HIVE_BATCH2_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-batch2-after-compaction.commands";
private static final String HIVE_INCREMENTAL_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental.commands";
private static final String HIVE_INCREMENTAL_COW_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental-cow.commands";
private static final String HIVE_INCREMENTAL_MOR_RO_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental-mor-ro.commands";
private static final String HIVE_INCREMENTAL_MOR_RT_COMMANDS = HOODIE_WS_ROOT + "/docker/demo/hive-incremental-mor-rt.commands";
private static String HIVE_SYNC_CMD_FMT =
" --enable-hive-sync --hoodie-conf hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000 "
@@ -87,7 +90,7 @@ public class ITTestHoodieDemo extends ITTestBase {
testHiveAfterSecondBatch();
testPrestoAfterSecondBatch();
testSparkSQLAfterSecondBatch();
testIncrementalHiveQuery();
testIncrementalHiveQueryBeforeCompaction();
testIncrementalSparkSQLQuery();
// compaction
@@ -267,23 +270,37 @@ public class ITTestHoodieDemo extends ITTestBase {
assertStdOutContains(stdOutErrPair, "|GOOG |2018-08-31 10:29:00|3391 |1230.1899|1230.085|");
}
private void testIncrementalHiveQuery() throws Exception {
private void testIncrementalHiveQuery(String minCommitTimeScript, String incrementalCommandsFile,
String expectedOutput, int expectedTimes) throws Exception {
String minCommitTime =
executeCommandStringInDocker(ADHOC_2_CONTAINER, MIN_COMMIT_TIME_SCRIPT, true).getStdout().toString();
executeCommandStringInDocker(ADHOC_2_CONTAINER, minCommitTimeScript, true).getStdout().toString();
Pair<String, String> stdOutErrPair =
executeHiveCommandFile(HIVE_INCREMENTAL_COMMANDS, "min.commit.time=" + minCommitTime + "`");
assertStdOutContains(stdOutErrPair, "| GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |");
executeHiveCommandFile(incrementalCommandsFile, "min.commit.time=" + minCommitTime + "`");
assertStdOutContains(stdOutErrPair, expectedOutput, expectedTimes);
}
private void testIncrementalHiveQueryBeforeCompaction() throws Exception {
String expectedOutputCOW = "| GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |";
// verify that 10:59 is present in COW table because there is no compaction process for COW
testIncrementalHiveQuery(MIN_COMMIT_TIME_COW_SCRIPT, HIVE_INCREMENTAL_COW_COMMANDS, expectedOutputCOW, 1);
// verify that 10:59 is NOT present in RO table because of pending compaction
testIncrementalHiveQuery(MIN_COMMIT_TIME_MOR_SCRIPT, HIVE_INCREMENTAL_MOR_RO_COMMANDS, expectedOutputCOW, 0);
// verify that 10:59 is present in RT table even with pending compaction
testIncrementalHiveQuery(MIN_COMMIT_TIME_MOR_SCRIPT, HIVE_INCREMENTAL_MOR_RT_COMMANDS, expectedOutputCOW, 1);
}
private void testIncrementalHiveQueryAfterCompaction() throws Exception {
String minCommitTime =
executeCommandStringInDocker(ADHOC_2_CONTAINER, MIN_COMMIT_TIME_SCRIPT, true).getStdout().toString();
Pair<String, String> stdOutErrPair =
executeHiveCommandFile(HIVE_INCREMENTAL_COMMANDS, "min.commit.time=" + minCommitTime + "`");
assertStdOutContains(stdOutErrPair,
"| symbol | ts | volume | open | close |\n"
String expectedOutput = "| symbol | ts | volume | open | close |\n"
+ "+---------+----------------------+---------+------------+-----------+\n"
+ "| GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |");
+ "| GOOG | 2018-08-31 10:59:00 | 9021 | 1227.1993 | 1227.215 |";
// verify that 10:59 is present for all views because compaction is complete
testIncrementalHiveQuery(MIN_COMMIT_TIME_COW_SCRIPT, HIVE_INCREMENTAL_COW_COMMANDS, expectedOutput, 1);
testIncrementalHiveQuery(MIN_COMMIT_TIME_MOR_SCRIPT, HIVE_INCREMENTAL_MOR_RO_COMMANDS, expectedOutput, 1);
testIncrementalHiveQuery(MIN_COMMIT_TIME_MOR_SCRIPT, HIVE_INCREMENTAL_MOR_RT_COMMANDS, expectedOutput, 1);
}
private void testIncrementalSparkSQLQuery() throws Exception {