From 0ab1a8ec80fcf3432846f12c40eff9b66992d87f Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Tue, 8 Feb 2022 00:40:36 -0500 Subject: [PATCH] [HUDI-3312] Fixing spark yaml and adding hive validation to integ test suite (#4731) --- .../test-suite/cow-spark-long-running.yaml | 16 ++-- .../config/test-suite/cow-spark-simple.yaml | 6 +- ...er-long-running-multi-partitions-hive.yaml | 89 +++++++++++++++++++ ...reamer-long-running-multi-partitions.yaml} | 2 +- ...> detlastreamer-long-running-example.yaml} | 2 +- .../test-suite/simple-clustering-hive.yaml | 76 ++++++++++++++++ ...ng-example.yaml => simple-clustering.yaml} | 6 +- .../test-suite/simple-deltastreamer-hive.yaml | 82 +++++++++++++++++ ...dag-cow.yaml => simple-deltastreamer.yaml} | 2 +- hudi-integ-test/README.md | 28 +++--- .../testsuite/configuration/DeltaConfig.java | 5 ++ .../dag/nodes/BaseValidateDatasetNode.java | 13 +-- .../dag/nodes/ValidateDatasetNode.java | 4 +- .../testsuite/dag/nodes/SparkInsertNode.scala | 2 +- 14 files changed, 295 insertions(+), 38 deletions(-) create mode 100644 docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml rename docker/demo/config/test-suite/{cow-long-running-multi-partitions.yaml => deltastreamer-long-running-multi-partitions.yaml} (97%) rename docker/demo/config/test-suite/{cow-long-running-example.yaml => detlastreamer-long-running-example.yaml} (97%) create mode 100644 docker/demo/config/test-suite/simple-clustering-hive.yaml rename docker/demo/config/test-suite/{cow-clustering-example.yaml => simple-clustering.yaml} (96%) create mode 100644 docker/demo/config/test-suite/simple-deltastreamer-hive.yaml rename docker/demo/config/test-suite/{complex-dag-cow.yaml => simple-deltastreamer.yaml} (98%) diff --git a/docker/demo/config/test-suite/cow-spark-long-running.yaml b/docker/demo/config/test-suite/cow-spark-long-running.yaml index 493ad7a55..8a1e58f84 100644 --- a/docker/demo/config/test-suite/cow-spark-long-running.yaml +++ b/docker/demo/config/test-suite/cow-spark-long-running.yaml @@ -13,13 +13,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -dag_name: cow-spark-long-running-multi-partitions.yaml -dag_rounds: 50 -dag_intermittent_delay_mins: 1 +dag_name: cow-spark-deltastreamer-long-running-multi-partitions.yaml +dag_rounds: 30 +dag_intermittent_delay_mins: 0 dag_content: first_insert: config: - record_size: 1000 + record_size: 200 num_partitions_insert: 50 repeat_count: 1 num_records_insert: 10000 @@ -33,12 +33,12 @@ dag_content: deps: first_insert first_validate: config: - validate_hive: true + validate_hive: false type: ValidateDatasetNode deps: first_hive_sync first_upsert: config: - record_size: 1000 + record_size: 200 num_partitions_insert: 50 num_records_insert: 300 repeat_count: 1 @@ -60,13 +60,13 @@ dag_content: deps: first_delete second_validate: config: - validate_hive: true + validate_hive: false delete_input_data: true type: ValidateDatasetNode deps: second_hive_sync last_validate: config: - execute_itr_count: 50 + execute_itr_count: 30 validate_clean: true validate_archival: true type: ValidateAsyncOperations diff --git a/docker/demo/config/test-suite/cow-spark-simple.yaml b/docker/demo/config/test-suite/cow-spark-simple.yaml index 21e7e6bbe..0859c6320 100644 --- a/docker/demo/config/test-suite/cow-spark-simple.yaml +++ b/docker/demo/config/test-suite/cow-spark-simple.yaml @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. dag_name: cow-spark-simple.yaml -dag_rounds: 2 +dag_rounds: 1 dag_intermittent_delay_mins: 1 dag_content: first_insert: @@ -33,7 +33,7 @@ dag_content: deps: first_insert first_validate: config: - validate_hive: true + validate_hive: false type: ValidateDatasetNode deps: first_hive_sync first_upsert: @@ -60,7 +60,7 @@ dag_content: deps: first_delete second_validate: config: - validate_hive: true + validate_hive: false delete_input_data: false type: ValidateDatasetNode deps: second_hive_sync \ No newline at end of file diff --git a/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml new file mode 100644 index 000000000..324a4b4a6 --- /dev/null +++ b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions-hive.yaml @@ -0,0 +1,89 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: deltastreamer-long-running-multi-partitions.yaml +dag_rounds: 50 +dag_intermittent_delay_mins: 1 +dag_content: + first_insert: + config: + record_size: 1000 + num_partitions_insert: 5 + repeat_count: 1 + num_records_insert: 1000 + type: InsertNode + deps: none + second_insert: + config: + record_size: 1000 + num_partitions_insert: 50 + repeat_count: 1 + num_records_insert: 10000 + deps: first_insert + type: InsertNode + third_insert: + config: + record_size: 1000 + num_partitions_insert: 2 + repeat_count: 1 + num_records_insert: 300 + deps: second_insert + type: InsertNode + first_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: third_insert + first_validate: + config: + validate_hive: false + type: ValidateDatasetNode + deps: first_hive_sync + first_upsert: + config: + record_size: 1000 + num_partitions_insert: 2 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 100 + num_partitions_upsert: 1 + type: UpsertNode + deps: first_validate + first_delete: + config: + num_partitions_delete: 50 + num_records_delete: 8000 + type: DeleteNode + deps: first_upsert + second_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_delete + second_validate: + config: + validate_hive: true + delete_input_data: true + type: ValidateDatasetNode + deps: second_hive_sync + last_validate: + config: + execute_itr_count: 50 + validate_clean: true + validate_archival: true + type: ValidateAsyncOperations + deps: second_validate diff --git a/docker/demo/config/test-suite/cow-long-running-multi-partitions.yaml b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml similarity index 97% rename from docker/demo/config/test-suite/cow-long-running-multi-partitions.yaml rename to docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml index 0ce529805..834670a0c 100644 --- a/docker/demo/config/test-suite/cow-long-running-multi-partitions.yaml +++ b/docker/demo/config/test-suite/deltastreamer-long-running-multi-partitions.yaml @@ -13,7 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -dag_name: cow-long-running-multi-partitions.yaml +dag_name: deltastreamer-long-running-multi-partitions.yaml dag_rounds: 50 dag_intermittent_delay_mins: 1 dag_content: diff --git a/docker/demo/config/test-suite/cow-long-running-example.yaml b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml similarity index 97% rename from docker/demo/config/test-suite/cow-long-running-example.yaml rename to docker/demo/config/test-suite/detlastreamer-long-running-example.yaml index 29b6858bf..28578eb9b 100644 --- a/docker/demo/config/test-suite/cow-long-running-example.yaml +++ b/docker/demo/config/test-suite/detlastreamer-long-running-example.yaml @@ -13,7 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -dag_name: cow-long-running-example.yaml +dag_name: detlastreamer-long-running-example.yaml dag_rounds: 50 dag_intermittent_delay_mins: 1 dag_content: diff --git a/docker/demo/config/test-suite/simple-clustering-hive.yaml b/docker/demo/config/test-suite/simple-clustering-hive.yaml new file mode 100644 index 000000000..e1f79bfe9 --- /dev/null +++ b/docker/demo/config/test-suite/simple-clustering-hive.yaml @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: simple-clustering-hive.yaml +dag_rounds: 30 +dag_intermittent_delay_mins: 0 +dag_content: + first_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 1000 + type: InsertNode + deps: none + second_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 10000 + deps: first_insert + type: InsertNode + third_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 300 + deps: second_insert + type: InsertNode + first_delete: + config: + num_partitions_delete: 1 + num_records_delete: 9000 + type: DeleteNode + deps: third_insert + first_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_delete + first_validate: + config: + validate_hive: false + type: ValidateDatasetNode + deps: first_hive_sync + first_cluster: + config: + execute_itr_count: 20 + type: ClusteringNode + deps: first_validate + second_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_cluster + second_validate: + config: + validate_hive: true + type: ValidateDatasetNode + deps: second_hive_sync diff --git a/docker/demo/config/test-suite/cow-clustering-example.yaml b/docker/demo/config/test-suite/simple-clustering.yaml similarity index 96% rename from docker/demo/config/test-suite/cow-clustering-example.yaml rename to docker/demo/config/test-suite/simple-clustering.yaml index 95932317c..7389ee3eb 100644 --- a/docker/demo/config/test-suite/cow-clustering-example.yaml +++ b/docker/demo/config/test-suite/simple-clustering.yaml @@ -13,8 +13,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -dag_name: cow-clustering-example.yaml -dag_rounds: 3 +dag_name: simple-clustering.yaml +dag_rounds: 30 dag_intermittent_delay_mins: 0 dag_content: first_insert: @@ -60,7 +60,7 @@ dag_content: deps: first_hive_sync first_cluster: config: - execute_itr_count: 2 + execute_itr_count: 25 type: ClusteringNode deps: first_validate second_hive_sync: diff --git a/docker/demo/config/test-suite/simple-deltastreamer-hive.yaml b/docker/demo/config/test-suite/simple-deltastreamer-hive.yaml new file mode 100644 index 000000000..e6738b694 --- /dev/null +++ b/docker/demo/config/test-suite/simple-deltastreamer-hive.yaml @@ -0,0 +1,82 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: simple-deltastreamer.yaml +dag_rounds: 1 +dag_intermittent_delay_mins: 1 +dag_content: + first_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 1000 + type: InsertNode + deps: none + second_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 10000 + deps: first_insert + type: InsertNode + third_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 300 + deps: second_insert + type: InsertNode + first_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: third_insert + first_validate: + config: + validate_hive: false + type: ValidateDatasetNode + deps: first_hive_sync + first_upsert: + config: + record_size: 1000 + num_partitions_insert: 1 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 100 + num_partitions_upsert: 1 + type: UpsertNode + deps: first_validate + first_delete: + config: + num_partitions_delete: 1 + num_records_delete: 2000 + type: DeleteNode + deps: first_upsert + second_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_delete + second_validate: + config: + validate_hive: true + delete_input_data: true + type: ValidateDatasetNode + deps: second_hive_sync diff --git a/docker/demo/config/test-suite/complex-dag-cow.yaml b/docker/demo/config/test-suite/simple-deltastreamer.yaml similarity index 98% rename from docker/demo/config/test-suite/complex-dag-cow.yaml rename to docker/demo/config/test-suite/simple-deltastreamer.yaml index 3a84b0a0a..f49a41baf 100644 --- a/docker/demo/config/test-suite/complex-dag-cow.yaml +++ b/docker/demo/config/test-suite/simple-deltastreamer.yaml @@ -13,7 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -dag_name: complex-dag-cow.yaml +dag_name: simple-deltastreamer.yaml dag_rounds: 1 dag_intermittent_delay_mins: 1 dag_content: diff --git a/hudi-integ-test/README.md b/hudi-integ-test/README.md index ffdedf849..7ee4598ba 100644 --- a/hudi-integ-test/README.md +++ b/hudi-integ-test/README.md @@ -82,8 +82,8 @@ spark-submit 2.YAML file -Choose to write up the entire DAG of operations in YAML, take a look at `complex-dag-cow.yaml` or -`complex-dag-mor.yaml`. +Choose to write up the entire DAG of operations in YAML, take a look at `simple-deltastreamer.yaml` or +`simple-deltastreamer.yaml`. Once you're ready with the DAG you want to execute, simply pass the yaml file path as follows: ``` @@ -177,7 +177,7 @@ cd /opt Copy the integration tests jar into the docker container ``` -docker cp packaging/hudi-integ-test-bundle/target/hudi-integ-test-bundle-0.10.0-SNAPSHOT.jar adhoc-2:/opt +docker cp packaging/hudi-integ-test-bundle/target/hudi-integ-test-bundle-0.11.0-SNAPSHOT.jar adhoc-2:/opt ``` ``` @@ -217,7 +217,7 @@ spark-submit \ --conf spark.driver.extraClassPath=/var/demo/jars/* \ --conf spark.executor.extraClassPath=/var/demo/jars/* \ --class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob \ -/opt/hudi-integ-test-bundle-0.10.0-SNAPSHOT.jar \ +/opt/hudi-integ-test-bundle-0.11.0-SNAPSHOT.jar \ --source-ordering-field test_suite_source_ordering_field \ --use-deltastreamer \ --target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \ @@ -227,7 +227,7 @@ spark-submit \ --schemaprovider-class org.apache.hudi.integ.testsuite.schema.TestSuiteFileBasedSchemaProvider \ --source-class org.apache.hudi.utilities.sources.AvroDFSSource \ --input-file-size 125829120 \ ---workload-yaml-path file:/var/hoodie/ws/docker/demo/config/test-suite/complex-dag-cow.yaml \ +--workload-yaml-path file:/var/hoodie/ws/docker/demo/config/test-suite/simple-deltastreamer.yaml \ --workload-generator-classname org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \ --table-type COPY_ON_WRITE \ --compact-scheduling-minshare 1 \ @@ -264,7 +264,7 @@ spark-submit \ --conf spark.driver.extraClassPath=/var/demo/jars/* \ --conf spark.executor.extraClassPath=/var/demo/jars/* \ --class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob \ -/opt/hudi-integ-test-bundle-0.10.0-SNAPSHOT.jar \ +/opt/hudi-integ-test-bundle-0.11.0-SNAPSHOT.jar \ --source-ordering-field test_suite_source_ordering_field \ --use-deltastreamer \ --target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \ @@ -274,7 +274,7 @@ spark-submit \ --schemaprovider-class org.apache.hudi.integ.testsuite.schema.TestSuiteFileBasedSchemaProvider \ --source-class org.apache.hudi.utilities.sources.AvroDFSSource \ --input-file-size 125829120 \ ---workload-yaml-path file:/var/hoodie/ws/docker/demo/config/test-suite/complex-dag-mor.yaml \ +--workload-yaml-path file:/var/hoodie/ws/docker/demo/config/test-suite/simple-deltastreamer.yaml \ --workload-generator-classname org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \ --table-type MERGE_ON_READ \ --compact-scheduling-minshare 1 \ @@ -308,16 +308,16 @@ contents both via spark datasource and hive table via spark sql engine. Hive val If you have "ValidateDatasetNode" in your dag, do not replace hive jars as instructed above. Spark sql engine does not go well w/ hive2* jars. So, after running docker setup, follow the below steps. ``` -docker cp packaging/hudi-integ-test-bundle/target/hudi-integ-test-bundle-0.10.0-SNAPSHOT.jar adhoc-2:/opt/ +docker cp packaging/hudi-integ-test-bundle/target/hudi-integ-test-bundle-0.11.0-SNAPSHOT.jar adhoc-2:/opt/ docker cp docker/demo/config/test-suite/test.properties adhoc-2:/opt/ ``` Also copy your dag of interest to adhoc-2:/opt/ ``` -docker cp docker/demo/config/test-suite/complex-dag-cow.yaml adhoc-2:/opt/ +docker cp docker/demo/config/test-suite/simple-deltastreamer.yaml adhoc-2:/opt/ ``` For repeated runs, two additional configs need to be set. "dag_rounds" and "dag_intermittent_delay_mins". -This means that your dag will be repeated for N times w/ a delay of Y mins between each round. Note: complex-dag-cow.yaml +This means that your dag will be repeated for N times w/ a delay of Y mins between each round. Note: simple-deltastreamer.yaml already has all these configs set. So no changes required just to try it out. Also, ValidateDatasetNode can be configured in two ways. Either with "delete_input_data" set to true or without @@ -457,7 +457,7 @@ spark-submit \ --conf spark.driver.extraClassPath=/var/demo/jars/* \ --conf spark.executor.extraClassPath=/var/demo/jars/* \ --class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob \ -/opt/hudi-integ-test-bundle-0.10.0-SNAPSHOT.jar \ +/opt/hudi-integ-test-bundle-0.11.0-SNAPSHOT.jar \ --source-ordering-field test_suite_source_ordering_field \ --use-deltastreamer \ --target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \ @@ -467,7 +467,7 @@ spark-submit \ --schemaprovider-class org.apache.hudi.integ.testsuite.schema.TestSuiteFileBasedSchemaProvider \ --source-class org.apache.hudi.utilities.sources.AvroDFSSource \ --input-file-size 125829120 \ ---workload-yaml-path file:/opt/complex-dag-cow.yaml \ +--workload-yaml-path file:/opt/simple-deltastreamer.yaml \ --workload-generator-classname org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \ --table-type COPY_ON_WRITE \ --compact-scheduling-minshare 1 \ @@ -486,8 +486,8 @@ If you wish to enable metrics add below properties as well Few ready to use dags are available under docker/demo/config/test-suite/ that could give you an idea for long running dags. ``` -complex-dag-cow.yaml: simple 1 round dag for COW table. -complex-dag-mor.yaml: simple 1 round dag for MOR table. +simple-deltastreamer.yaml: simple 1 round dag for COW table. +simple-deltastreamer.yaml: simple 1 round dag for MOR table. cow-clustering-example.yaml : dag with 3 rounds, in which inline clustering will trigger during 2nd iteration. cow-long-running-example.yaml : long running dag with 50 iterations. only 1 partition is used. cow-long-running-multi-partitions.yaml: long running dag wit 50 iterations with multiple partitions. diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java index b0ae06b60..56aa390ff 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java @@ -95,6 +95,7 @@ public class DeltaConfig implements Serializable { private static String SCHEMA_VERSION = "schema_version"; private static String NUM_ROLLBACKS = "num_rollbacks"; private static String ENABLE_ROW_WRITING = "enable_row_writing"; + private static String ENABLE_METADATA_VALIDATE = "enable_metadata_validate"; // Spark SQL Create Table private static String TABLE_TYPE = "table_type"; @@ -149,6 +150,10 @@ public class DeltaConfig implements Serializable { return Integer.valueOf(configsMap.getOrDefault(RECORD_SIZE, 1024).toString()); } + public boolean isEnableMetadataValidate() { + return Boolean.valueOf(configsMap.getOrDefault(ENABLE_METADATA_VALIDATE, false).toString()); + } + public int getNumInsertPartitions() { return Integer.valueOf(configsMap.getOrDefault(NUM_PARTITIONS_INSERT, 1).toString()); } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java index 1ae6d948f..986e97328 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/BaseValidateDatasetNode.java @@ -111,14 +111,17 @@ public abstract class BaseValidateDatasetNode extends DagNode { String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key()); String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key()); log.warn("Validating hive table with db : " + database + " and table : " + tableName); - Dataset cowDf = session.sql("SELECT * FROM " + database + "." + tableName); - Dataset trimmedCowDf = cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD) - .drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD); - intersectionDf = inputSnapshotDf.intersect(trimmedCowDf); + session.sql("REFRESH TABLE " + database + "." + tableName); + Dataset cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " + + "test_suite_source_ordering_field FROM " + database + "." + tableName); + Dataset reorderedInputDf = inputSnapshotDf.select("_row_key","rider","driver","begin_lat","begin_lon","end_lat","end_lon","fare", + "_hoodie_is_deleted","test_suite_source_ordering_field"); + + Dataset intersectedHiveDf = reorderedInputDf.intersect(cowDf); outputCount = trimmedHudiDf.count(); log.warn("Input count: " + inputCount + "; output count: " + outputCount); // the intersected df should be same as inputDf. if not, there is some mismatch. - if (outputCount == 0 || inputSnapshotDf.except(intersectionDf).count() != 0) { + if (outputCount == 0 || reorderedInputDf.except(intersectedHiveDf).count() != 0) { log.error("Data set validation failed for COW hive table. Total count in hudi " + outputCount + ", input df count " + inputCount); throw new AssertionError("Hudi hive table contents does not match contents input data. "); } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java index 03b37a9fc..cc293ea47 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java @@ -18,6 +18,7 @@ package org.apache.hudi.integ.testsuite.dag.nodes; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.hudi.integ.testsuite.dag.ExecutionContext; @@ -49,7 +50,8 @@ public class ValidateDatasetNode extends BaseValidateDatasetNode { StructType inputSchema) { String hudiPath = context.getHoodieTestSuiteWriter().getCfg().targetBasePath + "/*/*/*"; log.info("Validate data in target hudi path " + hudiPath); - Dataset hudiDf = session.read().format("hudi").load(hudiPath); + Dataset hudiDf = session.read().option(HoodieMetadataConfig.ENABLE.key(), String.valueOf(config.isEnableMetadataValidate())) + .format("hudi").load(hudiPath); return hudiDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD) .drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD); } diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala index 1b69cf8fa..db17a6ee3 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala @@ -60,7 +60,7 @@ class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] { .option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX.key, "deltastreamer.checkpoint.key") .option("deltastreamer.checkpoint.key", context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse("")) .option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName) - .mode(SaveMode.Overwrite) + .mode(SaveMode.Append) .save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath) } }