1
0

[HUDI-2530] Adding async compaction support to integ test suite framework (#3750)

This commit is contained in:
Sivabalan Narayanan
2021-10-08 11:30:48 -04:00
committed by GitHub
parent 10e3a9a3fb
commit a818020f72
6 changed files with 198 additions and 10 deletions

View File

@@ -0,0 +1,50 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
hoodie.insert.shuffle.parallelism=100
hoodie.upsert.shuffle.parallelism=100
hoodie.bulkinsert.shuffle.parallelism=100
hoodie.deltastreamer.source.test.num_partitions=100
hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
hoodie.deltastreamer.source.test.max_unique_records=100000000
hoodie.embed.timeline.server=false
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
hoodie.datasource.hive_sync.skip_ro_suffix=true
hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
hoodie.datasource.write.partitionpath.field=timestamp
hoodie.compact.inline.max.delta.commits=2
hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input
hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
hoodie.datasource.hive_sync.mode=jdbc
hoodie.datasource.hive_sync.database=testdb
hoodie.datasource.hive_sync.table=table1
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor

View File

@@ -0,0 +1,126 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Use compact-test.properties for this yaml file.
dag_name: mor-async-compact.yaml
dag_rounds: 4
dag_intermittent_delay_mins: 0
dag_content:
first_insert:
config:
record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 10000
type: InsertNode
deps: none
first_upsert:
config:
record_size: 1000
num_partitions_insert: 1
num_records_insert: 300
repeat_count: 1
num_records_upsert: 2000
num_partitions_upsert: 1
type: UpsertNode
deps: first_insert
second_upsert:
config:
record_size: 1000
num_partitions_insert: 1
num_records_insert: 300
repeat_count: 1
num_records_upsert: 2000
num_partitions_upsert: 1
type: UpsertNode
deps: first_upsert
third_upsert:
config:
record_size: 1000
num_partitions_insert: 1
num_records_insert: 300
repeat_count: 1
num_records_upsert: 2000
num_partitions_upsert: 1
type: UpsertNode
deps: second_upsert
first_validate:
config:
delete_input_data: false
type: ValidateDatasetNode
deps: third_upsert
first_schedule_compact:
config:
type: ScheduleCompactNode
deps: first_validate
fourth_upsert:
config:
record_size: 1000
num_partitions_insert: 1
num_records_insert: 300
repeat_count: 1
num_records_upsert: 2000
num_partitions_upsert: 1
type: UpsertNode
deps: first_schedule_compact
fifth_upsert:
config:
record_size: 1000
num_partitions_insert: 1
num_records_insert: 300
repeat_count: 1
num_records_upsert: 2000
num_partitions_upsert: 1
type: UpsertNode
deps: fourth_upsert
second_insert:
config:
record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 10000
type: InsertNode
deps: fifth_upsert
sixth_upsert:
config:
record_size: 1000
num_partitions_insert: 1
num_records_insert: 300
repeat_count: 1
num_records_upsert: 2000
num_partitions_upsert: 1
type: UpsertNode
deps: second_insert
third_validate:
config:
delete_input_data: false
type: ValidateDatasetNode
deps: sixth_upsert
first_compact:
config:
type: CompactNode
deps: third_validate
first_delete:
config:
num_partitions_delete: 1
num_records_delete: 500
type: DeleteNode
deps: first_compact
fifth_validate:
config:
delete_input_data: false
type: ValidateDatasetNode
deps: first_delete

View File

@@ -25,10 +25,6 @@ hoodie.deltastreamer.source.test.max_unique_records=100000000
hoodie.embed.timeline.server=false hoodie.embed.timeline.server=false
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
hoodie.insert.shuffle.parallelism=100
hoodie.upsert.shuffle.parallelism=100
hoodie.bulkinsert.shuffle.parallelism=100
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
hoodie.datasource.hive_sync.skip_ro_suffix=true hoodie.datasource.hive_sync.skip_ro_suffix=true

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.integ.testsuite; package org.apache.hudi.integ.testsuite;
import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
@@ -236,7 +237,7 @@ public class HoodieTestSuiteWriter implements Serializable {
public Option<String> scheduleCompaction(Option<Map<String, String>> previousCommitExtraMetadata) throws public Option<String> scheduleCompaction(Option<Map<String, String>> previousCommitExtraMetadata) throws
Exception { Exception {
if (!cfg.useDeltaStreamer) { if (cfg.useDeltaStreamer) {
deltaStreamerWrapper.scheduleCompact(); deltaStreamerWrapper.scheduleCompact();
return Option.empty(); return Option.empty();
} else { } else {
@@ -251,7 +252,7 @@ public class HoodieTestSuiteWriter implements Serializable {
/** Store the checkpoint in the commit metadata just like /** Store the checkpoint in the commit metadata just like
* {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/ * {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/
extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get()); extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get());
if (generatedDataStats != null) { if (generatedDataStats != null && generatedDataStats.count() > 1) {
// Just stores the path where this batch of data is generated to // Just stores the path where this batch of data is generated to
extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0)); extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0));
} }
@@ -259,6 +260,21 @@ public class HoodieTestSuiteWriter implements Serializable {
} }
} }
public void commitCompaction(JavaRDD<WriteStatus> records, JavaRDD<DeltaWriteStats> generatedDataStats,
Option<String> instantTime) throws IOException {
if (!cfg.useDeltaStreamer) {
Map<String, String> extraMetadata = new HashMap<>();
/** Store the checkpoint in the commit metadata just like
* {@link HoodieDeltaStreamer#commit(SparkRDDWriteClient, JavaRDD, Option)} **/
extraMetadata.put(HoodieDeltaStreamerWrapper.CHECKPOINT_KEY, lastCheckpoint.get());
if (generatedDataStats != null && generatedDataStats.count() > 1) {
// Just stores the path where this batch of data is generated to
extraMetadata.put(GENERATED_DATA_PATH, generatedDataStats.map(s -> s.getFilePath()).collect().get(0));
}
writeClient.commitCompaction(instantTime.get(), records, Option.of(extraMetadata));
}
}
public SparkRDDWriteClient getWriteClient(DagNode dagNode) throws IllegalAccessException { public SparkRDDWriteClient getWriteClient(DagNode dagNode) throws IllegalAccessException {
if (cfg.useDeltaStreamer & !allowWriteClientAccess(dagNode)) { if (cfg.useDeltaStreamer & !allowWriteClientAccess(dagNode)) {
throw new IllegalAccessException("cannot access write client when testing in deltastreamer mode"); throw new IllegalAccessException("cannot access write client when testing in deltastreamer mode");

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext; import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
/** /**
@@ -40,8 +41,8 @@ public class CompactNode extends DagNode<JavaRDD<WriteStatus>> {
* if it has one. * if it has one.
* *
* @param executionContext Execution context to run this compaction * @param executionContext Execution context to run this compaction
* @param curItrCount cur interation count. * @param curItrCount cur interation count.
* @throws Exception will be thrown if any error occurred. * @throws Exception will be thrown if any error occurred.
*/ */
@Override @Override
public void execute(ExecutionContext executionContext, int curItrCount) throws Exception { public void execute(ExecutionContext executionContext, int curItrCount) throws Exception {
@@ -53,7 +54,7 @@ public class CompactNode extends DagNode<JavaRDD<WriteStatus>> {
if (lastInstant.isPresent()) { if (lastInstant.isPresent()) {
log.info("Compacting instant {}", lastInstant.get()); log.info("Compacting instant {}", lastInstant.get());
this.result = executionContext.getHoodieTestSuiteWriter().compact(Option.of(lastInstant.get().getTimestamp())); this.result = executionContext.getHoodieTestSuiteWriter().compact(Option.of(lastInstant.get().getTimestamp()));
executionContext.getHoodieTestSuiteWriter().commitCompaction(result, executionContext.getJsc().emptyRDD(), Option.of(lastInstant.get().getTimestamp()));
} }
} }
} }

View File

@@ -56,5 +56,4 @@ public class ScheduleCompactNode extends DagNode<Option<String>> {
this.result = scheduledInstant; this.result = scheduledInstant;
} }
} }
} }