From a818020f72d37c8fd7a88414b769be9b23ee0ba6 Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Fri, 8 Oct 2021 11:30:48 -0400 Subject: [PATCH] [HUDI-2530] Adding async compaction support to integ test suite framework (#3750) --- .../config/test-suite/compact-test.properties | 50 +++++++ .../config/test-suite/mor-async-compact.yaml | 126 ++++++++++++++++++ docker/demo/config/test-suite/test.properties | 4 - .../testsuite/HoodieTestSuiteWriter.java | 20 ++- .../testsuite/dag/nodes/CompactNode.java | 7 +- .../dag/nodes/ScheduleCompactNode.java | 1 - 6 files changed, 198 insertions(+), 10 deletions(-) create mode 100644 docker/demo/config/test-suite/compact-test.properties create mode 100644 docker/demo/config/test-suite/mor-async-compact.yaml diff --git a/docker/demo/config/test-suite/compact-test.properties b/docker/demo/config/test-suite/compact-test.properties new file mode 100644 index 000000000..2eca88de3 --- /dev/null +++ b/docker/demo/config/test-suite/compact-test.properties @@ -0,0 +1,50 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +hoodie.insert.shuffle.parallelism=100 +hoodie.upsert.shuffle.parallelism=100 +hoodie.bulkinsert.shuffle.parallelism=100 + +hoodie.deltastreamer.source.test.num_partitions=100 +hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false +hoodie.deltastreamer.source.test.max_unique_records=100000000 +hoodie.embed.timeline.server=false +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector + +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector +hoodie.datasource.hive_sync.skip_ro_suffix=true + +hoodie.datasource.write.recordkey.field=_row_key +hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator +hoodie.datasource.write.partitionpath.field=timestamp + +hoodie.compact.inline.max.delta.commits=2 + +hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input +hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc +hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc +hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP +hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd + +hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/ +hoodie.datasource.hive_sync.mode=jdbc +hoodie.datasource.hive_sync.database=testdb +hoodie.datasource.hive_sync.table=table1 +hoodie.datasource.hive_sync.assume_date_partitioning=false +hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path +hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor + diff --git a/docker/demo/config/test-suite/mor-async-compact.yaml b/docker/demo/config/test-suite/mor-async-compact.yaml new file mode 100644 index 000000000..4ee9c535e --- /dev/null +++ b/docker/demo/config/test-suite/mor-async-compact.yaml @@ -0,0 +1,126 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Use compact-test.properties for this yaml file. +dag_name: mor-async-compact.yaml +dag_rounds: 4 +dag_intermittent_delay_mins: 0 +dag_content: + first_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 10000 + type: InsertNode + deps: none + first_upsert: + config: + record_size: 1000 + num_partitions_insert: 1 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 2000 + num_partitions_upsert: 1 + type: UpsertNode + deps: first_insert + second_upsert: + config: + record_size: 1000 + num_partitions_insert: 1 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 2000 + num_partitions_upsert: 1 + type: UpsertNode + deps: first_upsert + third_upsert: + config: + record_size: 1000 + num_partitions_insert: 1 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 2000 + num_partitions_upsert: 1 + type: UpsertNode + deps: second_upsert + first_validate: + config: + delete_input_data: false + type: ValidateDatasetNode + deps: third_upsert + first_schedule_compact: + config: + type: ScheduleCompactNode + deps: first_validate + fourth_upsert: + config: + record_size: 1000 + num_partitions_insert: 1 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 2000 + num_partitions_upsert: 1 + type: UpsertNode + deps: first_schedule_compact + fifth_upsert: + config: + record_size: 1000 + num_partitions_insert: 1 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 2000 + num_partitions_upsert: 1 + type: UpsertNode + deps: fourth_upsert + second_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 10000 + type: InsertNode + deps: fifth_upsert + sixth_upsert: + config: + record_size: 1000 + num_partitions_insert: 1 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 2000 + num_partitions_upsert: 1 + type: UpsertNode + deps: second_insert + third_validate: + config: + delete_input_data: false + type: ValidateDatasetNode + deps: sixth_upsert + first_compact: + config: + type: CompactNode + deps: third_validate + first_delete: + config: + num_partitions_delete: 1 + num_records_delete: 500 + type: DeleteNode + deps: first_compact + fifth_validate: + config: + delete_input_data: false + type: ValidateDatasetNode + deps: first_delete \ No newline at end of file diff --git a/docker/demo/config/test-suite/test.properties b/docker/demo/config/test-suite/test.properties index b4f69d9cb..30cd1c1f0 100644 --- a/docker/demo/config/test-suite/test.properties +++ b/docker/demo/config/test-suite/test.properties @@ -25,10 +25,6 @@ hoodie.deltastreamer.source.test.max_unique_records=100000000 hoodie.embed.timeline.server=false hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector -hoodie.insert.shuffle.parallelism=100 -hoodie.upsert.shuffle.parallelism=100 -hoodie.bulkinsert.shuffle.parallelism=100 - hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector hoodie.datasource.hive_sync.skip_ro_suffix=true diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java index 9ac917acb..41ef3f4ab 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java @@ -18,6 +18,7 @@ package org.apache.hudi.integ.testsuite; +import java.io.IOException; import java.io.Serializable; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -236,7 +237,7 @@ public class HoodieTestSuiteWriter implements Serializable { public Option scheduleCompaction(Option> previousCommitExtraMetadata) throws Exception { - if (!cfg.useDeltaStreamer) { + if (cfg.useDeltaStreamer) { deltaStreamerWrapper.scheduleCompact(); return Option.empty(); } else { @@ -251,7 +252,7 @@ public class HoodieTestSuiteWriter implements Serializable { /** Store the checkpoint in the commit metadata just like * {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/ extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get()); - if (generatedDataStats != null) { + if (generatedDataStats != null && generatedDataStats.count() > 1) { // Just stores the path where this batch of data is generated to extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0)); } @@ -259,6 +260,21 @@ public class HoodieTestSuiteWriter implements Serializable { } } + public void commitCompaction(JavaRDD records, JavaRDD generatedDataStats, + Option instantTime) throws IOException { + if (!cfg.useDeltaStreamer) { + Map extraMetadata = new HashMap<>(); + /** Store the checkpoint in the commit metadata just like + * {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/ + extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get()); + if (generatedDataStats != null && generatedDataStats.count() > 1) { + // Just stores the path where this batch of data is generated to + extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0)); + } + writeClient.commitCompaction(instantTime.get(), records, Option.of(extraMetadata)); + } + } + public SparkRDDWriteClient getWriteClient(DagNode dagNode) throws IllegalAccessException { if (cfg.useDeltaStreamer & !allowWriteClientAccess(dagNode)) { throw new IllegalAccessException("cannot access write client when testing in deltastreamer mode"); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java index 61306d12b..dd7d880f6 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/CompactNode.java @@ -24,6 +24,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.hudi.integ.testsuite.dag.ExecutionContext; + import org.apache.spark.api.java.JavaRDD; /** @@ -40,8 +41,8 @@ public class CompactNode extends DagNode> { * if it has one. * * @param executionContext Execution context to run this compaction - * @param curItrCount cur interation count. - * @throws Exception will be thrown if any error occurred. + * @param curItrCount cur interation count. + * @throws Exception will be thrown if any error occurred. */ @Override public void execute(ExecutionContext executionContext, int curItrCount) throws Exception { @@ -53,7 +54,7 @@ public class CompactNode extends DagNode> { if (lastInstant.isPresent()) { log.info("Compacting instant {}", lastInstant.get()); this.result = executionContext.getHoodieTestSuiteWriter().compact(Option.of(lastInstant.get().getTimestamp())); + executionContext.getHoodieTestSuiteWriter().commitCompaction(result, executionContext.getJsc().emptyRDD(), Option.of(lastInstant.get().getTimestamp())); } } - } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java index 62bf9b09a..0297bc703 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ScheduleCompactNode.java @@ -56,5 +56,4 @@ public class ScheduleCompactNode extends DagNode> { this.result = scheduledInstant; } } - }