1
0

[HUDI-2566] Adding multi-writer test support to integ test (#5065)

This commit is contained in:
Sivabalan Narayanan
2022-03-28 14:05:00 -07:00
committed by GitHub
parent 6ccbae4d2a
commit d074089c62
25 changed files with 741 additions and 87 deletions

View File

@@ -0,0 +1,65 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: simple-deltastreamer.yaml
dag_rounds: 3
dag_intermittent_delay_mins: 0
dag_content:
first_insert:
config:
record_size: 5000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 1000
type: InsertNode
deps: none
second_insert:
config:
record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 100000
deps: first_insert
type: InsertNode
third_insert:
config:
record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 30000
deps: second_insert
type: InsertNode
first_upsert:
config:
record_size: 1000
num_partitions_insert: 1
num_records_insert: 5000
repeat_count: 1
num_records_upsert: 50000
num_partitions_upsert: 1
type: UpsertNode
deps: third_insert
first_delete:
config:
num_partitions_delete : 0
num_records_delete: 100000
type: DeleteNode
deps: first_upsert
second_validate:
config:
validate_hive: false
delete_input_data: true
type: ValidateDatasetNode
deps: first_delete

View File

@@ -0,0 +1,58 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
hoodie.insert.shuffle.parallelism=2
hoodie.upsert.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.delete.shuffle.parallelism=2
hoodie.metadata.enable=false
hoodie.deltastreamer.source.test.num_partitions=100
hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
hoodie.deltastreamer.source.test.max_unique_records=100000000
hoodie.embed.timeline.server=false
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
hoodie.datasource.hive_sync.skip_ro_suffix=true
hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
hoodie.datasource.write.partitionpath.field=timestamp
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
hoodie.write.lock.zookeeper.url=zookeeper:2181
hoodie.write.lock.zookeeper.port=2181
hoodie.write.lock.zookeeper.lock_key=locks
hoodie.write.lock.zookeeper.base_path=/tmp/.locks
hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input1
hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
hoodie.datasource.hive_sync.database=testdb
hoodie.datasource.hive_sync.table=table1
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor

View File

@@ -0,0 +1,52 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: cow-spark-simple.yaml
dag_rounds: 3
dag_intermittent_delay_mins: 0
dag_content:
first_insert:
config:
record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 100000
start_partition: 10
type: SparkInsertNode
deps: none
first_upsert:
config:
record_size: 1000
num_partitions_insert: 1
num_records_insert: 50000
repeat_count: 1
num_records_upsert: 50000
num_partitions_upsert: 1
start_partition: 10
type: SparkUpsertNode
deps: first_insert
first_delete:
config:
num_partitions_delete: 0
num_records_delete: 10000
start_partition: 10
type: SparkDeleteNode
deps: first_upsert
second_validate:
config:
validate_hive: false
delete_input_data: true
type: ValidateDatasetNode
deps: first_delete

View File

@@ -0,0 +1,58 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
hoodie.insert.shuffle.parallelism=2
hoodie.upsert.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.delete.shuffle.parallelism=2
hoodie.metadata.enable=false
hoodie.deltastreamer.source.test.num_partitions=100
hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
hoodie.deltastreamer.source.test.max_unique_records=100000000
hoodie.embed.timeline.server=false
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
hoodie.datasource.hive_sync.skip_ro_suffix=true
hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
hoodie.datasource.write.partitionpath.field=timestamp
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider
hoodie.write.lock.zookeeper.url=zookeeper:2181
hoodie.write.lock.zookeeper.port=2181
hoodie.write.lock.zookeeper.lock_key=locks
hoodie.write.lock.zookeeper.base_path=/tmp/.locks
hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input2
hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
hoodie.datasource.hive_sync.database=testdb
hoodie.datasource.hive_sync.table=table1
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor

View File

@@ -0,0 +1,57 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
hoodie.insert.shuffle.parallelism=2
hoodie.upsert.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.delete.shuffle.parallelism=2
hoodie.metadata.enable=false
hoodie.deltastreamer.source.test.num_partitions=100
hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
hoodie.deltastreamer.source.test.max_unique_records=100000000
hoodie.embed.timeline.server=false
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
hoodie.datasource.hive_sync.skip_ro_suffix=true
hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
hoodie.datasource.write.partitionpath.field=timestamp
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
hoodie.deltastreamer.source.dfs.root=/tmp/hudi/input1
hoodie.deltastreamer.schemaprovider.target.schema.file=file:/tmp/source.avsc
hoodie.deltastreamer.schemaprovider.source.schema.file=file:/tmp/source.avsc
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
hoodie.datasource.hive_sync.database=testdb
hoodie.datasource.hive_sync.table=table1
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor

View File

@@ -0,0 +1,57 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
hoodie.insert.shuffle.parallelism=2
hoodie.upsert.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.delete.shuffle.parallelism=2
hoodie.metadata.enable=false
hoodie.deltastreamer.source.test.num_partitions=100
hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false
hoodie.deltastreamer.source.test.max_unique_records=100000000
hoodie.embed.timeline.server=false
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
hoodie.datasource.hive_sync.skip_ro_suffix=true
hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
hoodie.datasource.write.partitionpath.field=timestamp
hoodie.write.concurrency.mode=optimistic_concurrency_control
hoodie.cleaner.policy.failed.writes=LAZY
hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider
hoodie.deltastreamer.source.dfs.root=/tmp/hudi/input2
hoodie.deltastreamer.schemaprovider.target.schema.file=file:/tmp/source.avsc
hoodie.deltastreamer.schemaprovider.source.schema.file=file:/tmp/source.avsc
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
hoodie.datasource.hive_sync.database=testdb
hoodie.datasource.hive_sync.table=table1
hoodie.datasource.hive_sync.assume_date_partitioning=false
hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor

View File

@@ -181,7 +181,7 @@ public class HoodieMetricsConfig extends HoodieConfig {
hoodieMetricsConfig.setDefaultOnCondition(reporterType == MetricsReporterType.GRAPHITE,
HoodieMetricsGraphiteConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
hoodieMetricsConfig.setDefaultOnCondition(reporterType == MetricsReporterType.CLOUDWATCH,
HoodieMetricsCloudWatchConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
HoodieMetricsCloudWatchConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build());
return hoodieMetricsConfig;
}
}

View File

@@ -0,0 +1,241 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.integ.testsuite;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.utilities.UtilHelpers;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Multi write test suite job to assist in testing multi-writer scenarios. This test spins up one thread per writer as per configurations.
* Three params are of interest to this job in addition to regular HoodieTestsuiteJob.
* --input-base-paths "base_path/input1,base_path/input2"
* --props-paths "file:props_path/multi-writer-1.properties,file:/props_path/multi-writer-2.properties"
* --workload-yaml-paths "file:some_path/multi-writer-1-ds.yaml,file:/some_path/multi-writer-2-sds.yaml"
*
* Each of these should have same number of comma separated entries.
* Each writer will generate data in the corresponding input-base-path.
* and each writer will take in its own properties path and the respective yaml file as well.
*
* Common tests:
* Writer 1 DeltaStreamer ingesting data into partitions 0 to 10, Writer 2 Spark datasource ingesting data into partitions 100 to 110.
* Multiple spark datasource writers, each writing to exclusive set of partitions.
*
* Example comamnd
* spark-submit
* --packages org.apache.spark:spark-avro_2.11:2.4.0
* --conf spark.task.cpus=3
* --conf spark.executor.cores=3
* --conf spark.task.maxFailures=100
* --conf spark.memory.fraction=0.4
* --conf spark.rdd.compress=true
* --conf spark.kryoserializer.buffer.max=2000m
* --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
* --conf spark.memory.storageFraction=0.1
* --conf spark.shuffle.service.enabled=true
* --conf spark.sql.hive.convertMetastoreParquet=false
* --conf spark.driver.maxResultSize=12g
* --conf spark.executor.heartbeatInterval=120s
* --conf spark.network.timeout=600s
* --conf spark.yarn.max.executor.failures=10
* --conf spark.sql.catalogImplementation=hive
* --conf spark.driver.extraClassPath=/var/demo/jars/*
* --conf spark.executor.extraClassPath=/var/demo/jars/*
* --class org.apache.hudi.integ.testsuite.HoodieMultiWriterTestSuiteJob /opt/hudi-integ-test-bundle-0.11.0-SNAPSHOT.jar
* --source-ordering-field test_suite_source_ordering_field
* --use-deltastreamer
* --target-base-path /user/hive/warehouse/hudi-integ-test-suite/output
* --input-base-paths "/user/hive/warehouse/hudi-integ-test-suite/input1,/user/hive/warehouse/hudi-integ-test-suite/input2"
* --target-table hudi_table
* --props-paths "multi-writer-1.properties,multi-writer-2.properties"
* --schemaprovider-class org.apache.hudi.integ.testsuite.schema.TestSuiteFileBasedSchemaProvider
* --source-class org.apache.hudi.utilities.sources.AvroDFSSource --input-file-size 125829120
* --workload-yaml-paths "file:/opt/multi-writer-1-ds.yaml,file:/opt/multi-writer-2-sds.yaml"
* --workload-generator-classname org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator
* --table-type COPY_ON_WRITE --compact-scheduling-minshare 1
* --input-base-path "dummyValue"
* --workload-yaml-path "dummyValue"
* --props "dummyValue"
* --use-hudi-data-to-generate-updates
*
* Example command that works w/ docker.
*
*/
public class HoodieMultiWriterTestSuiteJob {
private static final Logger LOG = LogManager.getLogger(HoodieMultiWriterTestSuiteJob.class);
public static void main(String[] args) throws Exception {
final HoodieMultiWriterTestSuiteConfig cfg = new HoodieMultiWriterTestSuiteConfig();
JCommander cmd = new JCommander(cfg, args);
if (cfg.help || args.length == 0) {
cmd.usage();
System.exit(1);
}
JavaSparkContext jssc = UtilHelpers.buildSparkContext("multi-writer-test-run-" + cfg.outputTypeName
+ "-" + cfg.inputFormatName, cfg.sparkMaster);
String[] inputPaths = cfg.inputBasePaths.split(",");
String[] yamls = cfg.workloadYamlPaths.split(",");
String[] propsFiles = cfg.propsFilePaths.split(",");
if (inputPaths.length != yamls.length || yamls.length != propsFiles.length) {
throw new HoodieException("Input paths, property file and yaml file counts does not match ");
}
ExecutorService executor = Executors.newFixedThreadPool(inputPaths.length);
List<HoodieTestSuiteJob.HoodieTestSuiteConfig> testSuiteConfigList = new ArrayList<>();
int jobIndex = 0;
for (String inputPath : inputPaths) {
HoodieMultiWriterTestSuiteConfig testSuiteConfig = new HoodieMultiWriterTestSuiteConfig();
deepCopyConfigs(cfg, testSuiteConfig);
testSuiteConfig.inputBasePath = inputPath;
testSuiteConfig.workloadYamlPath = yamls[jobIndex];
testSuiteConfig.propsFilePath = propsFiles[jobIndex];
testSuiteConfigList.add(testSuiteConfig);
jobIndex++;
}
AtomicBoolean jobFailed = new AtomicBoolean(false);
AtomicInteger counter = new AtomicInteger(0);
List<CompletableFuture<Boolean>> completableFutureList = new ArrayList<>();
testSuiteConfigList.forEach(hoodieTestSuiteConfig -> {
try {
// start each job at 20 seconds interval so that metaClient instantiation does not overstep
Thread.sleep(counter.get() * 20000);
LOG.info("Starting job " + hoodieTestSuiteConfig.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
completableFutureList.add(CompletableFuture.supplyAsync(() -> {
boolean toReturn = true;
try {
new HoodieTestSuiteJob(hoodieTestSuiteConfig, jssc, false).runTestSuite();
LOG.info("Job completed successfully");
} catch (Exception e) {
if (!jobFailed.getAndSet(true)) {
LOG.error("Exception thrown " + e.getMessage() + ", cause : " + e.getCause());
throw new RuntimeException("HoodieTestSuiteJob Failed " + e.getCause() + ", and msg " + e.getMessage(), e);
} else {
LOG.info("Already a job failed. so, not throwing any exception ");
}
}
return toReturn;
}, executor));
counter.getAndIncrement();
});
LOG.info("Going to await until all jobs complete");
try {
CompletableFuture completableFuture = allOfTerminateOnFailure(completableFutureList);
completableFuture.get();
} finally {
executor.shutdownNow();
if (jssc != null) {
LOG.info("Completed and shutting down spark context ");
LOG.info("Shutting down spark session and JavaSparkContext");
SparkSession.builder().config(jssc.getConf()).enableHiveSupport().getOrCreate().stop();
jssc.close();
}
}
}
public static CompletableFuture allOfTerminateOnFailure(List<CompletableFuture<Boolean>> futures) {
CompletableFuture<?> failure = new CompletableFuture();
AtomicBoolean jobFailed = new AtomicBoolean(false);
for (CompletableFuture<?> f : futures) {
f.exceptionally(ex -> {
if (!jobFailed.getAndSet(true)) {
System.out.println("One of the job failed. Cancelling all other futures. " + ex.getCause() + ", " + ex.getMessage());
futures.forEach(future -> future.cancel(true));
}
return null;
});
}
return CompletableFuture.anyOf(failure, CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])));
}
static void deepCopyConfigs(HoodieMultiWriterTestSuiteConfig globalConfig, HoodieMultiWriterTestSuiteConfig tableConfig) {
tableConfig.enableHiveSync = globalConfig.enableHiveSync;
tableConfig.enableMetaSync = globalConfig.enableMetaSync;
tableConfig.schemaProviderClassName = globalConfig.schemaProviderClassName;
tableConfig.sourceOrderingField = globalConfig.sourceOrderingField;
tableConfig.sourceClassName = globalConfig.sourceClassName;
tableConfig.tableType = globalConfig.tableType;
tableConfig.targetTableName = globalConfig.targetTableName;
tableConfig.operation = globalConfig.operation;
tableConfig.sourceLimit = globalConfig.sourceLimit;
tableConfig.checkpoint = globalConfig.checkpoint;
tableConfig.continuousMode = globalConfig.continuousMode;
tableConfig.filterDupes = globalConfig.filterDupes;
tableConfig.payloadClassName = globalConfig.payloadClassName;
tableConfig.forceDisableCompaction = globalConfig.forceDisableCompaction;
tableConfig.maxPendingCompactions = globalConfig.maxPendingCompactions;
tableConfig.maxPendingClustering = globalConfig.maxPendingClustering;
tableConfig.minSyncIntervalSeconds = globalConfig.minSyncIntervalSeconds;
tableConfig.transformerClassNames = globalConfig.transformerClassNames;
tableConfig.commitOnErrors = globalConfig.commitOnErrors;
tableConfig.compactSchedulingMinShare = globalConfig.compactSchedulingMinShare;
tableConfig.compactSchedulingWeight = globalConfig.compactSchedulingWeight;
tableConfig.deltaSyncSchedulingMinShare = globalConfig.deltaSyncSchedulingMinShare;
tableConfig.deltaSyncSchedulingWeight = globalConfig.deltaSyncSchedulingWeight;
tableConfig.sparkMaster = globalConfig.sparkMaster;
tableConfig.workloadDagGenerator = globalConfig.workloadDagGenerator;
tableConfig.outputTypeName = globalConfig.outputTypeName;
tableConfig.inputFormatName = globalConfig.inputFormatName;
tableConfig.inputParallelism = globalConfig.inputParallelism;
tableConfig.useDeltaStreamer = globalConfig.useDeltaStreamer;
tableConfig.cleanInput = globalConfig.cleanInput;
tableConfig.cleanOutput = globalConfig.cleanOutput;
tableConfig.targetBasePath = globalConfig.targetBasePath;
}
public static class HoodieMultiWriterTestSuiteConfig extends HoodieTestSuiteJob.HoodieTestSuiteConfig {
@Parameter(names = {"--input-base-paths"}, description = "base paths for input data"
+ "(Will be created if did not exist first time around. If exists, more data will be added to that path)",
required = true)
public String inputBasePaths;
@Parameter(names = {
"--workload-yaml-paths"}, description = "Workflow Dag yaml path to generate the workload")
public String workloadYamlPaths;
@Parameter(names = {
"--props-paths"}, description = "Workflow Dag yaml path to generate the workload")
public String propsFilePaths;
}
}

View File

@@ -18,7 +18,6 @@
package org.apache.hudi.integ.testsuite;
import org.apache.avro.Schema;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -48,6 +47,7 @@ import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.avro.Schema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -93,13 +93,19 @@ public class HoodieTestSuiteJob {
*/
private transient HiveConf hiveConf;
private boolean stopJsc = true;
private BuiltinKeyGenerator keyGenerator;
private transient HoodieTableMetaClient metaClient;
public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc) throws IOException {
this(cfg, jsc, true);
}
public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc, boolean stopJsc) throws IOException {
log.warn("Running spark job w/ app id " + jsc.sc().applicationId());
this.cfg = cfg;
this.jsc = jsc;
this.stopJsc = stopJsc;
cfg.propsFilePath = FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString();
this.sparkSession = SparkSession.builder().config(jsc.getConf()).enableHiveSupport().getOrCreate();
this.fs = FSUtils.getFs(cfg.inputBasePath, jsc.hadoopConfiguration());
@@ -108,11 +114,15 @@ public class HoodieTestSuiteJob {
this.hiveConf = getDefaultHiveConf(jsc.hadoopConfiguration());
this.keyGenerator = (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
metaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableType(cfg.tableType)
.setTableName(cfg.targetTableName)
.setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
.initTable(jsc.hadoopConfiguration(), cfg.targetBasePath);
if (!fs.exists(new Path(cfg.targetBasePath))) {
metaClient = HoodieTableMetaClient.withPropertyBuilder()
.setTableType(cfg.tableType)
.setTableName(cfg.targetTableName)
.setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue())
.initTable(jsc.hadoopConfiguration(), cfg.targetBasePath);
} else {
metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(cfg.targetBasePath).build();
}
if (cfg.cleanInput) {
Path inputPath = new Path(cfg.inputBasePath);
@@ -167,15 +177,15 @@ public class HoodieTestSuiteJob {
JavaSparkContext jssc = UtilHelpers.buildSparkContext("workload-generator-" + cfg.outputTypeName
+ "-" + cfg.inputFormatName, cfg.sparkMaster);
new HoodieTestSuiteJob(cfg, jssc).runTestSuite();
new HoodieTestSuiteJob(cfg, jssc, true).runTestSuite();
}
public WorkflowDag createWorkflowDag() throws IOException {
WorkflowDag workflowDag = this.cfg.workloadYamlPath == null ? ((WorkflowDagGenerator) ReflectionUtils
.loadClass((this.cfg).workloadDagGenerator)).build()
: DagUtils.convertYamlPathToDag(
FSUtils.getFs(this.cfg.workloadYamlPath, jsc.hadoopConfiguration(), true),
this.cfg.workloadYamlPath);
FSUtils.getFs(this.cfg.workloadYamlPath, jsc.hadoopConfiguration(), true),
this.cfg.workloadYamlPath);
return workflowDag;
}
@@ -207,11 +217,13 @@ public class HoodieTestSuiteJob {
log.error("Failed to run Test Suite ", e);
throw new HoodieException("Failed to run Test Suite ", e);
} finally {
stopQuietly();
if (stopJsc) {
stopQuietly();
}
}
}
private void stopQuietly() {
protected void stopQuietly() {
try {
sparkSession.stop();
jsc.stop();
@@ -295,5 +307,8 @@ public class HoodieTestSuiteJob {
@Parameter(names = {"--start-hive-metastore"}, description = "Start Hive Metastore to use for optimistic lock ")
public Boolean startHiveMetastore = false;
@Parameter(names = {"--use-hudi-data-to-generate-updates"}, description = "Use data from hudi to generate updates for new batches ")
public Boolean useHudiToGenerateUpdates = false;
}
}

View File

@@ -40,18 +40,20 @@ public class DFSDeltaConfig extends DeltaConfig {
private int inputParallelism;
// Whether to delete older input data once it has been ingested
private boolean deleteOldInputData;
private boolean useHudiToGenerateUpdates;
public DFSDeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType deltaInputType,
SerializableConfiguration configuration,
String deltaBasePath, String targetBasePath, String schemaStr, Long maxFileSize,
int inputParallelism, boolean deleteOldInputData) {
super(deltaOutputMode, deltaInputType, configuration);
int inputParallelism, boolean deleteOldInputData, boolean useHudiToGenerateUpdates) {
super(deltaOutputMode, deltaInputType, configuration);
this.deltaBasePath = deltaBasePath;
this.schemaStr = schemaStr;
this.maxFileSize = maxFileSize;
this.datasetOutputPath = targetBasePath;
this.inputParallelism = inputParallelism;
this.deleteOldInputData = deleteOldInputData;
this.useHudiToGenerateUpdates = useHudiToGenerateUpdates;
}
public String getDeltaBasePath() {
@@ -85,4 +87,8 @@ public class DFSDeltaConfig extends DeltaConfig {
public boolean shouldDeleteOldInputData() {
return deleteOldInputData;
}
public boolean shouldUseHudiToGenerateUpdates() {
return useHudiToGenerateUpdates;
}
}

View File

@@ -72,7 +72,7 @@ public class WriterContext {
this.deltaGenerator = new DeltaGenerator(
new DFSDeltaConfig(DeltaOutputMode.valueOf(cfg.outputTypeName), DeltaInputType.valueOf(cfg.inputFormatName),
new SerializableConfiguration(jsc.hadoopConfiguration()), cfg.inputBasePath, cfg.targetBasePath,
schemaStr, cfg.limitFileSize, inputParallelism, cfg.deleteOldInput),
schemaStr, cfg.limitFileSize, inputParallelism, cfg.deleteOldInput, cfg.useHudiToGenerateUpdates),
jsc, sparkSession, schemaStr, keyGenerator);
log.info(String.format("Initialized writerContext with: %s", schemaStr));
} catch (Exception e) {

View File

@@ -38,7 +38,7 @@ public class DeleteNode extends InsertNode {
@Override
protected void generate(DeltaGenerator deltaGenerator) throws Exception {
if (!config.isDisableGenerate()) {
deltaGenerator.writeRecords(deltaGenerator.generateDeletes(config)).count();
deltaGenerator.writeRecords(deltaGenerator.generateDeletes(config)).getValue().count();
}
}

View File

@@ -59,7 +59,7 @@ public class InsertNode extends DagNode<JavaRDD<WriteStatus>> {
protected void generate(DeltaGenerator deltaGenerator) throws Exception {
if (!config.isDisableGenerate()) {
log.info("Generating input data for node {}", this.getName());
this.deltaWriteStatsRDD = deltaGenerator.writeRecords(deltaGenerator.generateInserts(config));
this.deltaWriteStatsRDD = deltaGenerator.writeRecords(deltaGenerator.generateInserts(config)).getValue();
this.deltaWriteStatsRDD.cache();
this.deltaWriteStatsRDD.count();
}

View File

@@ -38,7 +38,7 @@ public class UpsertNode extends InsertNode {
protected void generate(DeltaGenerator deltaGenerator) throws Exception {
if (!config.isDisableGenerate()) {
log.info("Generating input data {}", this.getName());
deltaGenerator.writeRecords(deltaGenerator.generateUpdates(config)).count();
deltaGenerator.writeRecords(deltaGenerator.generateUpdates(config)).getValue().count();
}
}

View File

@@ -18,24 +18,9 @@
package org.apache.hudi.integ.testsuite.generator;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.converter.Converter;
@@ -51,6 +36,9 @@ import org.apache.hudi.integ.testsuite.writer.DeltaWriterAdapter;
import org.apache.hudi.integ.testsuite.writer.DeltaWriterFactory;
import org.apache.hudi.keygen.BuiltinKeyGenerator;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
@@ -58,6 +46,20 @@ import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import scala.Tuple2;
/**
@@ -85,7 +87,7 @@ public class DeltaGenerator implements Serializable {
this.partitionPathFieldNames = keyGenerator.getPartitionPathFields();
}
public JavaRDD<DeltaWriteStats> writeRecords(JavaRDD<GenericRecord> records) {
public Pair<Integer, JavaRDD<DeltaWriteStats>> writeRecords(JavaRDD<GenericRecord> records) {
if (deltaOutputConfig.shouldDeleteOldInputData() && batchId > 1) {
Path oldInputDir = new Path(deltaOutputConfig.getDeltaBasePath(), Integer.toString(batchId - 1));
try {
@@ -107,7 +109,7 @@ public class DeltaGenerator implements Serializable {
}
}).flatMap(List::iterator);
batchId++;
return ws;
return Pair.of(batchId, ws);
}
public int getBatchId() {
@@ -156,15 +158,22 @@ public class DeltaGenerator implements Serializable {
adjustedRDD = deltaInputReader.read(config.getNumRecordsUpsert());
adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc, config.getNumRecordsUpsert());
} else {
deltaInputReader =
new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig) deltaOutputConfig).getDatasetOutputPath(),
schemaStr);
if (config.getFractionUpsertPerFile() > 0) {
adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(),
config.getFractionUpsertPerFile());
if (((DFSDeltaConfig) deltaOutputConfig).shouldUseHudiToGenerateUpdates()) {
deltaInputReader =
new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig) deltaOutputConfig).getDeltaBasePath(),
schemaStr);
if (config.getFractionUpsertPerFile() > 0) {
adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(),
config.getFractionUpsertPerFile());
} else {
adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(), config
.getNumRecordsUpsert());
}
} else {
adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(), config
.getNumRecordsUpsert());
deltaInputReader = new DFSAvroDeltaInputReader(sparkSession, schemaStr,
((DFSDeltaConfig) deltaOutputConfig).getDeltaBasePath(), Option.empty(), Option.empty());
adjustedRDD = deltaInputReader.read(config.getNumRecordsUpsert());
adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc, config.getNumRecordsUpsert());
}
}

View File

@@ -46,7 +46,7 @@ class SparkBulkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus
*/
override def execute(context: ExecutionContext, curItrCount: Int): Unit = {
if (!config.isDisableGenerate) {
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).count()
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).getValue().count()
}
val inputDF = AvroConversionUtils.createDataFrame(context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch,
context.getWriterContext.getHoodieTestSuiteWriter.getSchema,

View File

@@ -19,7 +19,6 @@
package org.apache.hudi.integ.testsuite.dag.nodes
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.client.WriteStatus
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
@@ -51,39 +50,26 @@ class SparkDeleteNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] {
override def execute(context: ExecutionContext, curItrCount: Int): Unit = {
// Deletes can't be fetched using getNextBatch() bcoz, getInsert(schema) from payload will return empty for delete
// records
val genRecsRDD = generateRecordsForDelete(config, context)
val batchIdRecords = context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateDeletes(config))
batchIdRecords.getValue().count()
val pathToRead = context.getWriterContext.getCfg.inputBasePath + "/" + batchIdRecords.getKey()
val avroDf = context.getWriterContext.getSparkSession.read.format("avro").load(pathToRead)
val genRecsRDD = HoodieSparkUtils.createRdd(avroDf, "testStructName", "testNamespace", false,
org.apache.hudi.common.util.Option.of(new Schema.Parser().parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema)))
val inputDF = AvroConversionUtils.createDataFrame(genRecsRDD,
context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
context.getWriterContext.getSparkSession)
inputDF.write.format("hudi")
.options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap))
.option(DataSourceWriteOptions.TABLE_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName)
.option(DataSourceWriteOptions.TABLE_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.tableType)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX.key, "deltastreamer.checkpoint.key")
.option("deltastreamer.checkpoint.key", context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse(""))
.option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName)
.mode(SaveMode.Append)
.save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)
}
/**
* Generates records for delete operations in Spark.
*
* @param config Node configs.
* @param context The context needed for an execution of a node.
* @return Records in {@link RDD}.
*/
private def generateRecordsForDelete(config: Config, context: ExecutionContext): RDD[GenericRecord] = {
if (!config.isDisableGenerate) {
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateDeletes(config)).count()
}
context.getWriterContext.getHoodieTestSuiteWriter.getNextBatchForDeletes()
val pathToRead = context.getWriterContext.getCfg.inputBasePath + "/" + context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse("")
val avroDf = context.getWriterContext.getSparkSession.read.format("avro").load(pathToRead)
HoodieSparkUtils.createRdd(avroDf, "testStructName", "testNamespace", false,
org.apache.hudi.common.util.Option.of(new Schema.Parser().parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema)))
}
}

View File

@@ -18,11 +18,16 @@
package org.apache.hudi.integ.testsuite.dag.nodes
import org.apache.avro.Schema
import org.apache.hudi.client.WriteStatus
import org.apache.hudi.common.util.collection.Pair
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
import org.apache.hudi.integ.testsuite.dag.ExecutionContext
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions}
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkUtils}
import org.apache.log4j.LogManager
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SaveMode
@@ -35,6 +40,7 @@ import scala.collection.JavaConverters._
*/
class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] {
private val log = LogManager.getLogger(getClass)
config = dagNodeConfig
/**
@@ -45,21 +51,26 @@ class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] {
* @throws Exception Thrown if the execution failed.
*/
override def execute(context: ExecutionContext, curItrCount: Int): Unit = {
if (!config.isDisableGenerate) {
println("Generating input data for node {}", this.getName)
writeRecords(context)
}
val inputDF = AvroConversionUtils.createDataFrame(context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch,
println("Generating input data for node {}", this.getName)
val batchIdRecords = writeRecords(context)
batchIdRecords.getValue().count()
val pathToRead = context.getWriterContext.getCfg.inputBasePath + "/" + batchIdRecords.getKey()
val avroDf = context.getWriterContext.getSparkSession.read.format("avro").load(pathToRead)
val genRecsRDD = HoodieSparkUtils.createRdd(avroDf, "testStructName", "testNamespace", false,
org.apache.hudi.common.util.Option.of(new Schema.Parser().parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema)))
val inputDF = AvroConversionUtils.createDataFrame(genRecsRDD,
context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
context.getWriterContext.getSparkSession)
inputDF.write.format("hudi")
.options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap))
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "test_suite_source_ordering_field")
.option(DataSourceWriteOptions.TABLE_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName)
.option(DataSourceWriteOptions.TABLE_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.tableType)
.option(DataSourceWriteOptions.OPERATION.key, getOperation())
.option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX.key, "deltastreamer.checkpoint.key")
.option("deltastreamer.checkpoint.key", context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse(""))
.option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName)
.mode(SaveMode.Append)
.save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)
@@ -69,7 +80,7 @@ class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] {
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL
}
def writeRecords(context: ExecutionContext): Unit = {
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).count()
def writeRecords(context: ExecutionContext): Pair[Integer, JavaRDD[DeltaWriteStats]] = {
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config))
}
}

View File

@@ -19,8 +19,12 @@
package org.apache.hudi.integ.testsuite.dag.nodes
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.common.util.collection.Pair
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
import org.apache.hudi.integ.testsuite.dag.ExecutionContext
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats
import org.apache.log4j.LogManager
import org.apache.spark.api.java.JavaRDD
/**
* Spark datasource based upsert node
@@ -29,11 +33,46 @@ import org.apache.hudi.integ.testsuite.dag.ExecutionContext
*/
class SparkUpsertNode(dagNodeConfig: Config) extends SparkInsertNode(dagNodeConfig) {
private val log = LogManager.getLogger(getClass)
override def getOperation(): String = {
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL
}
override def writeRecords(context: ExecutionContext): Unit = {
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateUpdates(config)).count()
override def writeRecords(context: ExecutionContext): Pair[Integer, JavaRDD[DeltaWriteStats]] = {
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateUpdates(config))
}
/**
* Execute the {@link DagNode}.
*
* @param context The context needed for an execution of a node.
* @param curItrCount iteration count for executing the node.
* @throws Exception Thrown if the execution failed.
*/
/*override def execute(context: ExecutionContext, curItrCount: Int): Unit = {
println("Generating input data for node {}", this.getName)
val batchIdRecords = writeRecords(context)
batchIdRecords.getValue().count()
val pathToRead = context.getWriterContext.getCfg.inputBasePath + "/" + batchIdRecords.getKey()
val avroDf = context.getWriterContext.getSparkSession.read.format("avro").load(pathToRead)
val genRecsRDD = HoodieSparkUtils.createRdd(avroDf, "testStructName", "testNamespace", false,
org.apache.hudi.common.util.Option.of(new Schema.Parser().parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema)))
val inputDF = AvroConversionUtils.createDataFrame(genRecsRDD,
context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
context.getWriterContext.getSparkSession)
inputDF.write.format("hudi")
.options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap))
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "test_suite_source_ordering_field")
.option(DataSourceWriteOptions.TABLE_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName)
.option(DataSourceWriteOptions.TABLE_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.tableType)
.option(DataSourceWriteOptions.OPERATION.key, getOperation())
.option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName)
.mode(SaveMode.Append)
.save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)
}*/
}

View File

@@ -57,7 +57,7 @@ abstract class BaseSparkSqlNode(dagNodeConfig: Config) extends DagNode[RDD[Write
*/
def prepareData(context: ExecutionContext): RDD[GenericRecord] = {
if (!config.isDisableGenerate) {
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).count()
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).getValue().count()
}
context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch
}

View File

@@ -58,7 +58,7 @@ class SparkSqlCreateTableNode(dagNodeConfig: Config) extends DagNode[RDD[WriteSt
if (config.shouldUseCtas) {
// Prepares data for CTAS query
if (!config.isDisableGenerate) {
context.getDeltaGenerator.writeRecords(context.getDeltaGenerator.generateInserts(config)).count()
context.getDeltaGenerator.writeRecords(context.getDeltaGenerator.generateInserts(config)).getValue().count()
}
val nextBatch = context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch
val sparkSession = context.getWriterContext.getSparkSession

View File

@@ -48,7 +48,7 @@ class SparkSqlDeleteNode(dagNodeConfig: Config) extends BaseSparkSqlNode(dagNode
context.getWriterContext.getCfg.targetTableName, sparkSession.sparkContext.defaultParallelism)
LOG.info("Number of records to delete: " + recordsToDelete.count())
// The update records corresponding to the SQL are only used for data validation
context.getDeltaGenerator().writeRecords(recordsToDelete).count()
context.getDeltaGenerator().writeRecords(recordsToDelete).getValue().count()
recordsToDelete
}

View File

@@ -42,7 +42,7 @@ class SparkSqlMergeNode(dagNodeConfig: Config) extends BaseSparkSqlNode(dagNodeC
*/
override def prepareData(context: ExecutionContext): RDD[GenericRecord] = {
if (!config.isDisableGenerate) {
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateUpdates(config)).count()
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateUpdates(config)).getValue().count()
}
context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch
}

View File

@@ -48,7 +48,7 @@ class SparkSqlUpdateNode(dagNodeConfig: Config) extends BaseSparkSqlNode(dagNode
context.getWriterContext.getCfg.targetTableName, sparkSession.sparkContext.defaultParallelism)
LOG.info("Number of records to update: " + recordsToUpdate.count())
// The update records corresponding to the SQL are only used for data validation
context.getDeltaGenerator().writeRecords(recordsToUpdate).count()
context.getDeltaGenerator().writeRecords(recordsToUpdate).getValue().count()
recordsToUpdate
}

View File

@@ -125,7 +125,7 @@ public class TestDFSHoodieTestSuiteWriterAdapter extends UtilitiesTestBase {
public void testDFSWorkloadSinkWithMultipleFilesFunctional() throws IOException {
DeltaConfig dfsSinkConfig = new DFSDeltaConfig(DeltaOutputMode.DFS, DeltaInputType.AVRO,
new SerializableConfiguration(jsc.hadoopConfiguration()), dfsBasePath, dfsBasePath,
schemaProvider.getSourceSchema().toString(), 10240L, jsc.defaultParallelism(), false);
schemaProvider.getSourceSchema().toString(), 10240L, jsc.defaultParallelism(), false, false);
DeltaWriterAdapter<GenericRecord> dfsDeltaWriterAdapter = DeltaWriterFactory
.getDeltaWriterAdapter(dfsSinkConfig, 1);
FlexibleSchemaRecordGenerationIterator itr = new FlexibleSchemaRecordGenerationIterator(1000,