From d5f202821b11659693a2e934d4b8f4c99f19755e Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Fri, 12 Feb 2021 12:29:21 -0500 Subject: [PATCH] Adding fixes to test suite framework. Adding clustering node and validate async operations node. (#2400) --- .../config/test-suite/complex-dag-cow.yaml | 30 +++-- .../config/test-suite/complex-dag-mor.yaml | 91 ++++--------- .../test-suite/cow-clustering-example.yaml | 76 +++++++++++ .../test-suite/cow-long-running-example.yaml | 39 ++++-- .../cow-long-running-multi-partitions.yaml | 89 +++++++++++++ docker/demo/config/test-suite/test.properties | 4 + .../client/AbstractHoodieWriteClient.java | 3 +- hudi-integ-test/pom.xml | 11 ++ .../integ/testsuite/HoodieTestSuiteJob.java | 1 + .../testsuite/HoodieTestSuiteWriter.java | 39 +++++- .../testsuite/configuration/DeltaConfig.java | 20 +++ .../integ/testsuite/dag/ExecutionContext.java | 2 +- .../integ/testsuite/dag/nodes/CleanNode.java | 2 +- .../testsuite/dag/nodes/ClusteringNode.java | 48 +++++++ .../testsuite/dag/nodes/CompactNode.java | 3 +- .../integ/testsuite/dag/nodes/DagNode.java | 3 +- .../integ/testsuite/dag/nodes/DelayNode.java | 2 +- .../testsuite/dag/nodes/HiveQueryNode.java | 2 +- .../testsuite/dag/nodes/HiveSyncNode.java | 2 +- .../integ/testsuite/dag/nodes/InsertNode.java | 2 +- .../testsuite/dag/nodes/RollbackNode.java | 4 +- .../dag/nodes/ScheduleCompactNode.java | 4 +- .../dag/nodes/SparkSQLQueryNode.java | 3 +- .../dag/nodes/ValidateAsyncOperations.java | 124 ++++++++++++++++++ .../dag/nodes/ValidateDatasetNode.java | 26 ++-- .../testsuite/dag/nodes/ValidateNode.java | 3 +- .../testsuite/dag/scheduler/DagScheduler.java | 9 +- ...lexibleSchemaRecordGenerationIterator.java | 2 +- .../GenericRecordFullPayloadGenerator.java | 2 +- .../utilities/deltastreamer/DeltaSync.java | 7 +- packaging/hudi-integ-test-bundle/pom.xml | 1 + 31 files changed, 531 insertions(+), 123 deletions(-) create mode 100644 docker/demo/config/test-suite/cow-clustering-example.yaml create mode 100644 docker/demo/config/test-suite/cow-long-running-multi-partitions.yaml create mode 100644 hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ClusteringNode.java create mode 100644 hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java diff --git a/docker/demo/config/test-suite/complex-dag-cow.yaml b/docker/demo/config/test-suite/complex-dag-cow.yaml index 5fa859683..acbe287ac 100644 --- a/docker/demo/config/test-suite/complex-dag-cow.yaml +++ b/docker/demo/config/test-suite/complex-dag-cow.yaml @@ -13,13 +13,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -dag_name: cow-long-running-example.yaml -dag_rounds: 2 +dag_name: complex-dag-cow.yaml +dag_rounds: 1 dag_intermittent_delay_mins: 1 dag_content: first_insert: config: - record_size: 100 + record_size: 1000 num_partitions_insert: 1 repeat_count: 1 num_records_insert: 1000 @@ -27,7 +27,7 @@ dag_content: deps: none second_insert: config: - record_size: 100 + record_size: 1000 num_partitions_insert: 1 repeat_count: 1 num_records_insert: 10000 @@ -35,19 +35,26 @@ dag_content: type: InsertNode third_insert: config: - record_size: 100 + record_size: 1000 num_partitions_insert: 1 repeat_count: 1 num_records_insert: 300 deps: second_insert type: InsertNode + first_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: third_insert first_validate: config: + validate_hive: true type: ValidateDatasetNode - deps: third_insert + deps: first_hive_sync first_upsert: config: - record_size: 100 + record_size: 1000 num_partitions_insert: 1 num_records_insert: 300 repeat_count: 1 @@ -61,8 +68,15 @@ dag_content: num_records_delete: 2000 type: DeleteNode deps: first_upsert + second_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_delete second_validate: config: + validate_hive: true delete_input_data: true type: ValidateDatasetNode - deps: first_delete \ No newline at end of file + deps: second_hive_sync diff --git a/docker/demo/config/test-suite/complex-dag-mor.yaml b/docker/demo/config/test-suite/complex-dag-mor.yaml index 505e5e294..24f3a9c3b 100644 --- a/docker/demo/config/test-suite/complex-dag-mor.yaml +++ b/docker/demo/config/test-suite/complex-dag-mor.yaml @@ -15,105 +15,70 @@ # limitations under the License. dag_name: complex-dag-mor.yaml dag_rounds: 1 -dag_intermittent_delay_mins: 10 +dag_intermittent_delay_mins: 1 dag_content: first_insert: config: - record_size: 70000 + record_size: 1000 num_partitions_insert: 1 - repeat_count: 5 + repeat_count: 1 num_records_insert: 100 type: InsertNode deps: none second_insert: config: - record_size: 70000 + record_size: 1000 num_partitions_insert: 1 - repeat_count: 5 - num_records_insert: 100 + repeat_count: 1 + num_records_insert: 1000 deps: first_insert type: InsertNode third_insert: config: - record_size: 70000 + record_size: 1000 num_partitions_insert: 1 - repeat_count: 2 + repeat_count: 1 num_records_insert: 300 deps: second_insert type: InsertNode - first_rollback: - config: - deps: third_insert - type: RollbackNode - first_upsert: - config: - record_size: 70000 - num_partitions_insert: 1 - num_records_insert: 300 - repeat_count: 1 - num_records_upsert: 100 - num_partitions_upsert: 10 - type: UpsertNode - deps: first_rollback first_hive_sync: config: queue_name: "adhoc" engine: "mr" type: HiveSyncNode - deps: first_upsert - first_hive_query: + deps: third_insert + first_validate: config: - queue_name: "adhoc" - engine: "mr" - type: HiveQueryNode + type: ValidateDatasetNode deps: first_hive_sync - second_upsert: + first_upsert: config: - record_size: 70000 + record_size: 1000 num_partitions_insert: 1 num_records_insert: 300 repeat_count: 1 num_records_upsert: 100 - num_partitions_upsert: 10 + num_partitions_upsert: 1 type: UpsertNode - deps: first_hive_query - second_hive_query: - config: - queue_name: "adhoc" - engine: "mr" - hive_queries: - query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" - result1: 0 - query2: "select count(*) from testdb.table1" - result2: 1100 - type: HiveQueryNode - deps: second_upsert + deps: first_validate first_schedule_compact: config: type: ScheduleCompactNode - deps: second_hive_query - third_upsert: + deps: first_upsert + first_delete: config: - record_size: 70000 - num_partitions_insert: 1 - num_records_insert: 300 - repeat_count: 1 - num_records_upsert: 100 - num_partitions_upsert: 10 - type: UpsertNode + num_partitions_delete: 1 + num_records_delete: 500 + type: DeleteNode deps: first_schedule_compact - first_compact: - config: - type: CompactNode - deps: first_schedule_compact - third_hive_query: + second_hive_sync: config: queue_name: "adhoc" engine: "mr" - hive_queries: - query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" - result1: 0 - query2: "select count(*) from testdb.table1" - result2: 1400 - type: HiveQueryNode - deps: first_compact + type: HiveSyncNode + deps: first_delete + second_validate: + config: + delete_input_data: true + type: ValidateDatasetNode + deps: second_hive_sync diff --git a/docker/demo/config/test-suite/cow-clustering-example.yaml b/docker/demo/config/test-suite/cow-clustering-example.yaml new file mode 100644 index 000000000..939e16f55 --- /dev/null +++ b/docker/demo/config/test-suite/cow-clustering-example.yaml @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: cow-clustering-example.yaml +dag_rounds: 3 +dag_intermittent_delay_mins: 0 +dag_content: + first_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 1000 + type: InsertNode + deps: none + second_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 10000 + deps: first_insert + type: InsertNode + third_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 300 + deps: second_insert + type: InsertNode + first_delete: + config: + num_partitions_delete: 1 + num_records_delete: 9000 + type: DeleteNode + deps: third_insert + first_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_delete + first_validate: + config: + validate_hive: true + type: ValidateDatasetNode + deps: first_hive_sync + first_cluster: + config: + execute_itr_count: 2 + type: ClusteringNode + deps: first_validate + second_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_cluster + second_validate: + config: + validate_hive: true + type: ValidateDatasetNode + deps: second_hive_sync diff --git a/docker/demo/config/test-suite/cow-long-running-example.yaml b/docker/demo/config/test-suite/cow-long-running-example.yaml index b7026f2dd..71a34f813 100644 --- a/docker/demo/config/test-suite/cow-long-running-example.yaml +++ b/docker/demo/config/test-suite/cow-long-running-example.yaml @@ -14,12 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. dag_name: cow-long-running-example.yaml -dag_rounds: 20 -dag_intermittent_delay_mins: 10 +dag_rounds: 50 +dag_intermittent_delay_mins: 1 dag_content: first_insert: config: - record_size: 100 + record_size: 1000 num_partitions_insert: 1 repeat_count: 1 num_records_insert: 1000 @@ -27,7 +27,7 @@ dag_content: deps: none second_insert: config: - record_size: 100 + record_size: 1000 num_partitions_insert: 1 repeat_count: 1 num_records_insert: 10000 @@ -35,19 +35,26 @@ dag_content: type: InsertNode third_insert: config: - record_size: 100 + record_size: 1000 num_partitions_insert: 1 repeat_count: 1 num_records_insert: 300 deps: second_insert type: InsertNode + first_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: third_insert first_validate: config: + validate_hive: true type: ValidateDatasetNode - deps: third_insert + deps: first_hive_sync first_upsert: config: - record_size: 100 + record_size: 1000 num_partitions_insert: 1 num_records_insert: 300 repeat_count: 1 @@ -58,11 +65,25 @@ dag_content: first_delete: config: num_partitions_delete: 1 - num_records_delete: 2000 + num_records_delete: 8000 type: DeleteNode deps: first_upsert + second_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_delete second_validate: config: + validate_hive: true delete_input_data: true type: ValidateDatasetNode - deps: first_delete \ No newline at end of file + deps: second_hive_sync + last_validate: + config: + execute_itr_count: 50 + validate_clean: true + validate_archival: true + type: ValidateAsyncOperations + deps: second_validate diff --git a/docker/demo/config/test-suite/cow-long-running-multi-partitions.yaml b/docker/demo/config/test-suite/cow-long-running-multi-partitions.yaml new file mode 100644 index 000000000..b071c4667 --- /dev/null +++ b/docker/demo/config/test-suite/cow-long-running-multi-partitions.yaml @@ -0,0 +1,89 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: cow-long-running-multi-partitions.yaml +dag_rounds: 50 +dag_intermittent_delay_mins: 1 +dag_content: + first_insert: + config: + record_size: 1000 + num_partitions_insert: 5 + repeat_count: 1 + num_records_insert: 1000 + type: InsertNode + deps: none + second_insert: + config: + record_size: 1000 + num_partitions_insert: 50 + repeat_count: 1 + num_records_insert: 10000 + deps: first_insert + type: InsertNode + third_insert: + config: + record_size: 1000 + num_partitions_insert: 2 + repeat_count: 1 + num_records_insert: 300 + deps: second_insert + type: InsertNode + first_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: third_insert + first_validate: + config: + validate_hive: true + type: ValidateDatasetNode + deps: first_hive_sync + first_upsert: + config: + record_size: 1000 + num_partitions_insert: 2 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 100 + num_partitions_upsert: 1 + type: UpsertNode + deps: first_validate + first_delete: + config: + num_partitions_delete: 50 + num_records_delete: 8000 + type: DeleteNode + deps: first_upsert + second_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_delete + second_validate: + config: + validate_hive: true + delete_input_data: true + type: ValidateDatasetNode + deps: second_hive_sync + last_validate: + config: + execute_itr_count: 50 + validate_clean: true + validate_archival: true + type: ValidateAsyncOperations + deps: second_validate diff --git a/docker/demo/config/test-suite/test.properties b/docker/demo/config/test-suite/test.properties index 0aa0f45c0..9dfb465da 100644 --- a/docker/demo/config/test-suite/test.properties +++ b/docker/demo/config/test-suite/test.properties @@ -20,6 +20,10 @@ hoodie.datasource.write.recordkey.field=_row_key hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator hoodie.datasource.write.partitionpath.field=timestamp +hoodie.clustering.plan.strategy.sort.columns=_row_key +hoodie.clustering.plan.strategy.daybased.lookback.partitions=0 +hoodie.clustering.inline.max.commits=1 + hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 63d0bffbe..25ce03939 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -371,7 +371,6 @@ public abstract class AbstractHoodieWriteClient table, String compactionCommitTime); - + /** * Rollback failed compactions. Inflight rollbacks for compactions revert the .inflight file to the .requested file * diff --git a/hudi-integ-test/pom.xml b/hudi-integ-test/pom.xml index b48dd8135..d93c663e8 100644 --- a/hudi-integ-test/pom.xml +++ b/hudi-integ-test/pom.xml @@ -90,6 +90,11 @@ hudi-spark_${scala.binary.version} ${project.version} + + org.apache.hudi + hudi-spark-common + ${project.version} + org.apache.hudi hudi-utilities_${scala.binary.version} @@ -201,6 +206,12 @@ ${project.version} test-jar + + org.apache.hudi + hudi-spark-common + ${project.version} + test-jar + diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java index b5037e995..aeb8748d0 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java @@ -82,6 +82,7 @@ public class HoodieTestSuiteJob { private BuiltinKeyGenerator keyGenerator; public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc) throws IOException { + log.warn("Running spark job w/ app id " + jsc.sc().applicationId()); this.cfg = cfg; this.jsc = jsc; cfg.propsFilePath = FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString(); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java index a06c281ef..3c6129144 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java @@ -18,8 +18,6 @@ package org.apache.hudi.integ.testsuite; -import org.apache.hadoop.conf.Configuration; - import org.apache.hudi.avro.model.HoodieCompactionPlan; import org.apache.hudi.client.HoodieReadClient; import org.apache.hudi.client.SparkRDDWriteClient; @@ -30,6 +28,7 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodiePayloadConfig; @@ -41,9 +40,14 @@ import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; import org.apache.hudi.integ.testsuite.dag.nodes.RollbackNode; import org.apache.hudi.integ.testsuite.dag.nodes.ScheduleCompactNode; import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.utilities.schema.SchemaProvider; + +import org.apache.hadoop.conf.Configuration; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.HashMap; @@ -53,13 +57,15 @@ import java.util.Properties; import java.util.Set; /** - * A writer abstraction for the Hudi test suite. This class wraps different implementations of writers used to perform - * write operations into the target hudi dataset. Current supported writers are {@link HoodieDeltaStreamerWrapper} - * and {@link SparkRDDWriteClient}. + * A writer abstraction for the Hudi test suite. This class wraps different implementations of writers used to perform write operations into the target hudi dataset. Current supported writers are + * {@link HoodieDeltaStreamerWrapper} and {@link SparkRDDWriteClient}. */ public class HoodieTestSuiteWriter { + private static Logger log = LoggerFactory.getLogger(HoodieTestSuiteWriter.class); + private HoodieDeltaStreamerWrapper deltaStreamerWrapper; + private HoodieWriteConfig writeConfig; private SparkRDDWriteClient writeClient; protected HoodieTestSuiteConfig cfg; private Option lastCheckpoint; @@ -85,8 +91,9 @@ public class HoodieTestSuiteWriter { HoodieSparkEngineContext context = new HoodieSparkEngineContext(jsc); this.deltaStreamerWrapper = new HoodieDeltaStreamerWrapper(cfg, jsc); this.hoodieReadClient = new HoodieReadClient(context, cfg.targetBasePath); + this.writeConfig = getHoodieClientConfig(cfg, props, schema); if (!cfg.useDeltaStreamer) { - this.writeClient = new SparkRDDWriteClient(context, getHoodieClientConfig(cfg, props, schema), rollbackInflight); + this.writeClient = new SparkRDDWriteClient(context, writeConfig, rollbackInflight); } this.cfg = cfg; this.configuration = jsc.hadoopConfiguration(); @@ -95,6 +102,10 @@ public class HoodieTestSuiteWriter { this.schema = schema; } + public HoodieWriteConfig getWriteConfig() { + return this.writeConfig; + } + private HoodieWriteConfig getHoodieClientConfig(HoodieTestSuiteConfig cfg, Properties props, String schema) { HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder().combineInput(true, true).withPath(cfg.targetBasePath) @@ -178,6 +189,20 @@ public class HoodieTestSuiteWriter { } } + public void inlineClustering() { + if (!cfg.useDeltaStreamer) { + Option clusteringInstantOpt = writeClient.scheduleClustering(Option.empty()); + clusteringInstantOpt.ifPresent(clusteringInstant -> { + // inline cluster should auto commit as the user is never given control + log.warn("Clustering instant :: " + clusteringInstant); + writeClient.cluster(clusteringInstant, true); + }); + } else { + // TODO: fix clustering to be done async https://issues.apache.org/jira/browse/HUDI-1590 + throw new IllegalArgumentException("Clustering cannot be triggered with deltastreamer"); + } + } + public Option scheduleCompaction(Option> previousCommitExtraMetadata) throws Exception { if (!cfg.useDeltaStreamer) { @@ -189,7 +214,7 @@ public class HoodieTestSuiteWriter { } public void commit(JavaRDD records, JavaRDD generatedDataStats, - Option instantTime) { + Option instantTime) { if (!cfg.useDeltaStreamer) { Map extraMetadata = new HashMap<>(); /** Store the checkpoint in the commit metadata just like diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java index 329ef16bd..193bf2ca1 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java @@ -88,6 +88,10 @@ public class DeltaConfig implements Serializable { private static String REINIT_CONTEXT = "reinitialize_context"; private static String START_PARTITION = "start_partition"; private static String DELETE_INPUT_DATA = "delete_input_data"; + private static String VALIDATE_HIVE = "validate_hive"; + private static String EXECUTE_ITR_COUNT = "execute_itr_count"; + private static String VALIDATE_ARCHIVAL = "validate_archival"; + private static String VALIDATE_CLEAN = "validate_clean"; private Map configsMap; @@ -159,6 +163,22 @@ public class DeltaConfig implements Serializable { return Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA, false).toString()); } + public boolean isValidateHive() { + return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_HIVE, false).toString()); + } + + public int getIterationCountToExecute() { + return Integer.valueOf(configsMap.getOrDefault(EXECUTE_ITR_COUNT, -1).toString()); + } + + public boolean validateArchival() { + return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_ARCHIVAL, false).toString()); + } + + public boolean validateClean() { + return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_CLEAN, false).toString()); + } + public Map getOtherConfigs() { if (configsMap == null) { return new HashMap<>(); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/ExecutionContext.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/ExecutionContext.java index 17148f538..e4cf84a1f 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/ExecutionContext.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/ExecutionContext.java @@ -26,7 +26,7 @@ import org.apache.spark.api.java.JavaSparkContext; /** * This wraps the context needed for an execution of - * a {@link DagNode#execute(ExecutionContext)}. + * a {@link DagNode#execute(ExecutionContext, int)}. */ public class ExecutionContext implements Serializable { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java index 83a8d5e10..0f449a832 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CleanNode.java @@ -32,7 +32,7 @@ public class CleanNode extends DagNode { } @Override - public void execute(ExecutionContext executionContext) throws Exception { + public void execute(ExecutionContext executionContext, int curItrCount) throws Exception { log.info("Executing clean node {}", this.getName()); executionContext.getHoodieTestSuiteWriter().getWriteClient(this).clean(); } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ClusteringNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ClusteringNode.java new file mode 100644 index 000000000..9ee5ca270 --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ClusteringNode.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.dag.nodes; + +import org.apache.hudi.common.util.Option; +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; +import org.apache.hudi.integ.testsuite.dag.ExecutionContext; + +/** + * Triggers inline clustering. Works only with writeClient. Also, add this as last node and end with validation if possible. As of now, after clustering, further inserts/upserts may not work since we + * call deltaStreamer. + */ +public class ClusteringNode extends DagNode> { + + public ClusteringNode(Config config) { + this.config = config; + } + + @Override + public void execute(ExecutionContext executionContext, int curItrCount) throws Exception { + if (config.getIterationCountToExecute() == curItrCount) { + try { + log.warn("Executing ClusteringNode node {}", this.getName()); + executionContext.getHoodieTestSuiteWriter().inlineClustering(); + } catch (Exception e) { + log.warn("Exception thrown in ClusteringNode Node :: " + e.getCause() + ", msg :: " + e.getMessage()); + throw e; + } + } + } + +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java index 4c3ad6155..7c9090d2b 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java @@ -40,10 +40,11 @@ public class CompactNode extends DagNode> { * if it has one. * * @param executionContext Execution context to run this compaction + * @param curItrCount cur interation count. * @throws Exception will be thrown if any error occurred. */ @Override - public void execute(ExecutionContext executionContext) throws Exception { + public void execute(ExecutionContext executionContext, int curItrCount) throws Exception { HoodieTableMetaClient metaClient = new HoodieTableMetaClient(executionContext.getHoodieTestSuiteWriter().getConfiguration(), executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath); Option lastInstant = metaClient.getActiveTimeline() diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java index 05ac242a5..863024335 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java @@ -91,9 +91,10 @@ public abstract class DagNode implements Comparable> { * Execute the {@link DagNode}. * * @param context The context needed for an execution of a node. + * @param curItrCount iteration count for executing the node. * @throws Exception Thrown if the execution failed. */ - public abstract void execute(ExecutionContext context) throws Exception; + public abstract void execute(ExecutionContext context, int curItrCount) throws Exception; public boolean isCompleted() { return isCompleted; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java index c0671e8ab..01b8d4c38 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java @@ -36,7 +36,7 @@ public class DelayNode extends DagNode { } @Override - public void execute(ExecutionContext context) throws Exception { + public void execute(ExecutionContext context, int curItrCount) throws Exception { log.warn("Waiting for "+ delayMins+" mins before going for next test run"); Thread.sleep(delayMins * 60 * 1000); } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java index f36b7d451..bdde58adb 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveQueryNode.java @@ -43,7 +43,7 @@ public class HiveQueryNode extends DagNode { } @Override - public void execute(ExecutionContext executionContext) throws Exception { + public void execute(ExecutionContext executionContext, int curItrCount) throws Exception { log.info("Executing hive query node {}", this.getName()); this.hiveServiceProvider.startLocalHiveServiceIfNeeded(executionContext.getHoodieTestSuiteWriter().getConfiguration()); HiveSyncConfig hiveSyncConfig = DataSourceUtils diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveSyncNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveSyncNode.java index a2b4ee5ee..97a1bee07 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveSyncNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/HiveSyncNode.java @@ -35,7 +35,7 @@ public class HiveSyncNode extends DagNode { } @Override - public void execute(ExecutionContext executionContext) throws Exception { + public void execute(ExecutionContext executionContext, int curItrCount) throws Exception { log.info("Executing hive sync node"); this.hiveServiceProvider.startLocalHiveServiceIfNeeded(executionContext.getHoodieTestSuiteWriter().getConfiguration()); this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter()); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java index 1571349f8..5ca98ccf6 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java @@ -39,7 +39,7 @@ public class InsertNode extends DagNode> { } @Override - public void execute(ExecutionContext executionContext) throws Exception { + public void execute(ExecutionContext executionContext, int curItrCount) throws Exception { // if the insert node has schema override set, reinitialize the table with new schema. if (this.config.getReinitContext()) { log.info(String.format("Reinitializing table with %s", this.config.getOtherConfigs().toString())); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java index 12588ac03..1824cb8d6 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/RollbackNode.java @@ -23,7 +23,6 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.hudi.integ.testsuite.dag.ExecutionContext; import org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector; @@ -42,10 +41,11 @@ public class RollbackNode extends DagNode> { * Method helps to rollback the last commit instant in the timeline, if it has one. * * @param executionContext Execution context to perform this rollback + * @param curItrCount current iteration count. * @throws Exception will be thrown if any error occurred */ @Override - public void execute(ExecutionContext executionContext) throws Exception { + public void execute(ExecutionContext executionContext, int curItrCount) throws Exception { log.info("Executing rollback node {}", this.getName()); // Can only be done with an instantiation of a new WriteClient hence cannot be done during DeltaStreamer // testing for now diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java index 0aa67f417..c54b25a3c 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java @@ -35,7 +35,7 @@ public class ScheduleCompactNode extends DagNode> { } @Override - public void execute(ExecutionContext executionContext) throws Exception { + public void execute(ExecutionContext executionContext, int curItrCount) throws Exception { log.info("Executing schedule compact node {}", this.getName()); // Can only be done with an instantiation of a new WriteClient hence cannot be done during DeltaStreamer // testing for now @@ -48,7 +48,7 @@ public class ScheduleCompactNode extends DagNode> { HoodieCommitMetadata metadata = org.apache.hudi.common.model.HoodieCommitMetadata.fromBytes(metaClient .getActiveTimeline().getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class); Option scheduledInstant = executionContext.getHoodieTestSuiteWriter().scheduleCompaction(Option.of(metadata - .getExtraMetadata())); + .getExtraMetadata())); if (scheduledInstant.isPresent()) { log.info("Scheduling compaction instant {}", scheduledInstant.get()); } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/SparkSQLQueryNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/SparkSQLQueryNode.java index e06d6defe..8efd96c11 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/SparkSQLQueryNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/SparkSQLQueryNode.java @@ -42,10 +42,11 @@ public class SparkSQLQueryNode extends DagNode { * Method helps to execute a sparkSql query from a hive table. * * @param executionContext Execution context to perform this query. + * @param curItrCount current iteration count. * @throws Exception will be thrown if ant error occurred */ @Override - public void execute(ExecutionContext executionContext) throws Exception { + public void execute(ExecutionContext executionContext, int curItrCount) throws Exception { log.info("Executing spark sql query node"); this.hiveServiceProvider.startLocalHiveServiceIfNeeded(executionContext.getHoodieTestSuiteWriter().getConfiguration()); this.hiveServiceProvider.syncToLocalHiveIfNeeded(executionContext.getHoodieTestSuiteWriter()); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java new file mode 100644 index 000000000..de8855f0a --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateAsyncOperations.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.dag.nodes; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; +import org.apache.hudi.integ.testsuite.dag.ExecutionContext; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * Node to validate data set sanity like total file versions retained, has cleaning happened, has archival happened, etc. + */ +public class ValidateAsyncOperations extends DagNode> { + + private static Logger log = LoggerFactory.getLogger(ValidateAsyncOperations.class); + + public ValidateAsyncOperations(Config config) { + this.config = config; + } + + @Override + public void execute(ExecutionContext executionContext, int curItrCount) throws Exception { + if (config.getIterationCountToExecute() == curItrCount) { + try { + log.warn("Executing ValidateHoodieAsyncOperations node {} with target base path {} ", this.getName(), + executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath); + String basePath = executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath; + + int maxCommitsRetained = executionContext.getHoodieTestSuiteWriter().getWriteConfig().getCleanerCommitsRetained() + 1; + FileSystem fs = FSUtils.getFs(basePath, executionContext.getHoodieTestSuiteWriter().getConfiguration()); + Map fileIdCount = new HashMap<>(); + + AtomicInteger maxVal = new AtomicInteger(); + List partitionPaths = FSUtils.getAllPartitionFoldersThreeLevelsDown(fs, basePath); + for (String partitionPath : partitionPaths) { + List fileStatuses = Arrays.stream(FSUtils.getAllDataFilesInPartition(fs, new Path(basePath + "/" + partitionPath))).collect(Collectors.toList()); + fileStatuses.forEach(entry -> { + String fileId = FSUtils.getFileId(entry.getPath().getName()); + fileIdCount.computeIfAbsent(fileId, k -> 0); + fileIdCount.put(fileId, fileIdCount.get(fileId) + 1); + maxVal.set(Math.max(maxVal.get(), fileIdCount.get(fileId))); + }); + } + if (maxVal.get() > maxCommitsRetained) { + throw new AssertionError("Total commits (" + maxVal + ") retained exceeds max value of " + maxCommitsRetained + ", total commits : "); + } + + if (config.validateArchival() || config.validateClean()) { + Pattern ARCHIVE_FILE_PATTERN = + Pattern.compile("\\.commits_\\.archive\\..*"); + Pattern CLEAN_FILE_PATTERN = + Pattern.compile(".*\\.clean\\..*"); + + String metadataPath = executionContext.getHoodieTestSuiteWriter().getCfg().targetBasePath + "/.hoodie"; + FileStatus[] metaFileStatuses = fs.listStatus(new Path(metadataPath)); + boolean archFound = false; + boolean cleanFound = false; + for (FileStatus fileStatus : metaFileStatuses) { + Matcher archFileMatcher = ARCHIVE_FILE_PATTERN.matcher(fileStatus.getPath().getName()); + if (archFileMatcher.matches()) { + archFound = true; + if (config.validateArchival() && !config.validateClean()) { + break; + } + } + Matcher cleanFileMatcher = CLEAN_FILE_PATTERN.matcher(fileStatus.getPath().getName()); + if (cleanFileMatcher.matches()) { + cleanFound = true; + if (!config.validateArchival() && config.validateClean()) { + break; + } + } + if (config.validateClean() && config.validateArchival()) { + if (archFound && cleanFound) { + break; + } + } + } + if (config.validateArchival() && !archFound) { + throw new AssertionError("Archival NotFound in " + metadataPath); + } + + if (config.validateClean() && !cleanFound) { + throw new AssertionError("Clean commits NotFound in " + metadataPath); + } + } + } catch (Exception e) { + log.warn("Exception thrown in ValidateHoodieAsyncOperations Node :: " + e.getCause() + ", msg :: " + e.getMessage()); + throw e; + } + } + } +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java index 12fc52529..22fea9220 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java @@ -62,7 +62,7 @@ public class ValidateDatasetNode extends DagNode { } @Override - public void execute(ExecutionContext context) throws Exception { + public void execute(ExecutionContext context, int curItrCount) throws Exception { SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate(); @@ -111,17 +111,19 @@ public class ValidateDatasetNode extends DagNode { throw new AssertionError("Hudi contents does not match contents input data. "); } - String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY()); - String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY()); - log.warn("Validating hive table with db : " + database + " and table : " + tableName); - Dataset cowDf = session.sql("SELECT * FROM " + database + "." + tableName); - Dataset trimmedCowDf = cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD) - .drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD); - intersectionDf = inputSnapshotDf.intersect(trimmedDf); - // the intersected df should be same as inputDf. if not, there is some mismatch. - if (inputSnapshotDf.except(intersectionDf).count() != 0) { - log.error("Data set validation failed for COW hive table. Total count in hudi " + trimmedCowDf.count() + ", input df count " + inputSnapshotDf.count()); - throw new AssertionError("Hudi hive table contents does not match contents input data. "); + if (config.isValidateHive()) { + String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY()); + String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY()); + log.warn("Validating hive table with db : " + database + " and table : " + tableName); + Dataset cowDf = session.sql("SELECT * FROM " + database + "." + tableName); + Dataset trimmedCowDf = cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD) + .drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD); + intersectionDf = inputSnapshotDf.intersect(trimmedDf); + // the intersected df should be same as inputDf. if not, there is some mismatch. + if (inputSnapshotDf.except(intersectionDf).count() != 0) { + log.error("Data set validation failed for COW hive table. Total count in hudi " + trimmedCowDf.count() + ", input df count " + inputSnapshotDf.count()); + throw new AssertionError("Hudi hive table contents does not match contents input data. "); + } } // if delete input data is enabled, erase input data. diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateNode.java index 37244c0a8..e4c4adb1f 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateNode.java @@ -40,9 +40,10 @@ public class ValidateNode extends DagNode { * was set to true or default, but the parent nodes have not completed yet. * * @param executionContext Context to execute this node + * @param curItrCount current iteration count. */ @Override - public void execute(ExecutionContext executionContext) { + public void execute(ExecutionContext executionContext, int curItrCount) { if (this.getParentNodes().size() > 0 && (Boolean) this.config.getOtherConfigs().getOrDefault("WAIT_FOR_PARENTS", true)) { for (DagNode node : (List) this.getParentNodes()) { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java index d4074bccc..34cb9bc0a 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java @@ -101,7 +101,8 @@ public class DagScheduler { while (queue.size() > 0) { DagNode nodeToExecute = queue.poll(); log.warn("Executing node \"" + nodeToExecute.getConfig().getOtherConfigs().get(CONFIG_NAME) + "\" :: " + nodeToExecute.getConfig()); - futures.add(service.submit(() -> executeNode(nodeToExecute))); + int finalCurRound = curRound; + futures.add(service.submit(() -> executeNode(nodeToExecute, finalCurRound))); if (nodeToExecute.getChildNodes().size() > 0) { childNodes.addAll(nodeToExecute.getChildNodes()); } @@ -114,7 +115,7 @@ public class DagScheduler { } while (queue.size() > 0); log.info("Finished workloads for round num " + curRound); if (curRound < workflowDag.getRounds()) { - new DelayNode(workflowDag.getIntermittentDelayMins()).execute(executionContext); + new DelayNode(workflowDag.getIntermittentDelayMins()).execute(executionContext, curRound); } // After each level, report and flush the metrics @@ -128,14 +129,14 @@ public class DagScheduler { * * @param node The node to be executed */ - private void executeNode(DagNode node) { + private void executeNode(DagNode node, int curRound) { if (node.isCompleted()) { throw new RuntimeException("DagNode already completed! Cannot re-execute"); } try { int repeatCount = node.getConfig().getRepeatCount(); while (repeatCount > 0) { - node.execute(executionContext); + node.execute(executionContext, curRound); log.info("Finished executing {}", node.getName()); repeatCount--; } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java index 787ec844e..00928f3c1 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java @@ -70,7 +70,7 @@ public class FlexibleSchemaRecordGenerationIterator implements Iteratororg.apache.hudi:hudi-common org.apache.hudi:hudi-client-common org.apache.hudi:hudi-spark-client + org.apache.hudi:hudi-spark-common org.apache.hudi:hudi-utilities_${scala.binary.version} org.apache.hudi:hudi-spark_${scala.binary.version} org.apache.hudi:hudi-spark2_${scala.binary.version}