From d074089c62db9aabe3b6c47ec8323f4487fa2b0b Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Mon, 28 Mar 2022 14:05:00 -0700 Subject: [PATCH] [HUDI-2566] Adding multi-writer test support to integ test (#5065) --- .../config/test-suite/multi-writer-1-ds.yaml | 65 +++++ .../test-suite/multi-writer-1.properties | 58 +++++ .../config/test-suite/multi-writer-2-sds.yaml | 52 ++++ .../test-suite/multi-writer-2.properties | 58 +++++ .../multi-writer-local-1.properties | 57 +++++ .../multi-writer-local-2.properties | 57 +++++ .../config/metrics/HoodieMetricsConfig.java | 2 +- .../HoodieMultiWriterTestSuiteJob.java | 241 ++++++++++++++++++ .../integ/testsuite/HoodieTestSuiteJob.java | 37 ++- .../configuration/DFSDeltaConfig.java | 10 +- .../integ/testsuite/dag/WriterContext.java | 2 +- .../integ/testsuite/dag/nodes/DeleteNode.java | 2 +- .../integ/testsuite/dag/nodes/InsertNode.java | 2 +- .../integ/testsuite/dag/nodes/UpsertNode.java | 2 +- .../testsuite/generator/DeltaGenerator.java | 61 +++-- .../dag/nodes/SparkBulkInsertNode.scala | 2 +- .../testsuite/dag/nodes/SparkDeleteNode.scala | 34 +-- .../testsuite/dag/nodes/SparkInsertNode.scala | 31 ++- .../testsuite/dag/nodes/SparkUpsertNode.scala | 43 +++- .../nodes/spark/sql/BaseSparkSqlNode.scala | 2 +- .../spark/sql/SparkSqlCreateTableNode.scala | 2 +- .../nodes/spark/sql/SparkSqlDeleteNode.scala | 2 +- .../nodes/spark/sql/SparkSqlMergeNode.scala | 2 +- .../nodes/spark/sql/SparkSqlUpdateNode.scala | 2 +- .../TestDFSHoodieTestSuiteWriterAdapter.java | 2 +- 25 files changed, 741 insertions(+), 87 deletions(-) create mode 100644 docker/demo/config/test-suite/multi-writer-1-ds.yaml create mode 100644 docker/demo/config/test-suite/multi-writer-1.properties create mode 100644 docker/demo/config/test-suite/multi-writer-2-sds.yaml create mode 100644 docker/demo/config/test-suite/multi-writer-2.properties create mode 100644 docker/demo/config/test-suite/multi-writer-local-1.properties create mode 100644 docker/demo/config/test-suite/multi-writer-local-2.properties create mode 100644 hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieMultiWriterTestSuiteJob.java diff --git a/docker/demo/config/test-suite/multi-writer-1-ds.yaml b/docker/demo/config/test-suite/multi-writer-1-ds.yaml new file mode 100644 index 000000000..3fe33b671 --- /dev/null +++ b/docker/demo/config/test-suite/multi-writer-1-ds.yaml @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: simple-deltastreamer.yaml +dag_rounds: 3 +dag_intermittent_delay_mins: 0 +dag_content: + first_insert: + config: + record_size: 5000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 1000 + type: InsertNode + deps: none + second_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 100000 + deps: first_insert + type: InsertNode + third_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 30000 + deps: second_insert + type: InsertNode + first_upsert: + config: + record_size: 1000 + num_partitions_insert: 1 + num_records_insert: 5000 + repeat_count: 1 + num_records_upsert: 50000 + num_partitions_upsert: 1 + type: UpsertNode + deps: third_insert + first_delete: + config: + num_partitions_delete : 0 + num_records_delete: 100000 + type: DeleteNode + deps: first_upsert + second_validate: + config: + validate_hive: false + delete_input_data: true + type: ValidateDatasetNode + deps: first_delete diff --git a/docker/demo/config/test-suite/multi-writer-1.properties b/docker/demo/config/test-suite/multi-writer-1.properties new file mode 100644 index 000000000..502a1b771 --- /dev/null +++ b/docker/demo/config/test-suite/multi-writer-1.properties @@ -0,0 +1,58 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +hoodie.insert.shuffle.parallelism=2 +hoodie.upsert.shuffle.parallelism=2 +hoodie.bulkinsert.shuffle.parallelism=2 +hoodie.delete.shuffle.parallelism=2 + +hoodie.metadata.enable=false + +hoodie.deltastreamer.source.test.num_partitions=100 +hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false +hoodie.deltastreamer.source.test.max_unique_records=100000000 +hoodie.embed.timeline.server=false +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector + +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector +hoodie.datasource.hive_sync.skip_ro_suffix=true + +hoodie.datasource.write.recordkey.field=_row_key +hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator +hoodie.datasource.write.partitionpath.field=timestamp + +hoodie.write.concurrency.mode=optimistic_concurrency_control +hoodie.cleaner.policy.failed.writes=LAZY +hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider +hoodie.write.lock.zookeeper.url=zookeeper:2181 +hoodie.write.lock.zookeeper.port=2181 +hoodie.write.lock.zookeeper.lock_key=locks +hoodie.write.lock.zookeeper.base_path=/tmp/.locks + +hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input1 +hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc +hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc +hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP +hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd + +hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/ +hoodie.datasource.hive_sync.database=testdb +hoodie.datasource.hive_sync.table=table1 +hoodie.datasource.hive_sync.assume_date_partitioning=false +hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path +hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor + diff --git a/docker/demo/config/test-suite/multi-writer-2-sds.yaml b/docker/demo/config/test-suite/multi-writer-2-sds.yaml new file mode 100644 index 000000000..9242dd260 --- /dev/null +++ b/docker/demo/config/test-suite/multi-writer-2-sds.yaml @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: cow-spark-simple.yaml +dag_rounds: 3 +dag_intermittent_delay_mins: 0 +dag_content: + first_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 100000 + start_partition: 10 + type: SparkInsertNode + deps: none + first_upsert: + config: + record_size: 1000 + num_partitions_insert: 1 + num_records_insert: 50000 + repeat_count: 1 + num_records_upsert: 50000 + num_partitions_upsert: 1 + start_partition: 10 + type: SparkUpsertNode + deps: first_insert + first_delete: + config: + num_partitions_delete: 0 + num_records_delete: 10000 + start_partition: 10 + type: SparkDeleteNode + deps: first_upsert + second_validate: + config: + validate_hive: false + delete_input_data: true + type: ValidateDatasetNode + deps: first_delete \ No newline at end of file diff --git a/docker/demo/config/test-suite/multi-writer-2.properties b/docker/demo/config/test-suite/multi-writer-2.properties new file mode 100644 index 000000000..80db8912b --- /dev/null +++ b/docker/demo/config/test-suite/multi-writer-2.properties @@ -0,0 +1,58 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +hoodie.insert.shuffle.parallelism=2 +hoodie.upsert.shuffle.parallelism=2 +hoodie.bulkinsert.shuffle.parallelism=2 +hoodie.delete.shuffle.parallelism=2 + +hoodie.metadata.enable=false + +hoodie.deltastreamer.source.test.num_partitions=100 +hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false +hoodie.deltastreamer.source.test.max_unique_records=100000000 +hoodie.embed.timeline.server=false +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector + +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector +hoodie.datasource.hive_sync.skip_ro_suffix=true + +hoodie.datasource.write.recordkey.field=_row_key +hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator +hoodie.datasource.write.partitionpath.field=timestamp + +hoodie.write.concurrency.mode=optimistic_concurrency_control +hoodie.cleaner.policy.failed.writes=LAZY +hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider +hoodie.write.lock.zookeeper.url=zookeeper:2181 +hoodie.write.lock.zookeeper.port=2181 +hoodie.write.lock.zookeeper.lock_key=locks +hoodie.write.lock.zookeeper.base_path=/tmp/.locks + +hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-integ-test-suite/input2 +hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc +hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc +hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP +hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd + +hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/ +hoodie.datasource.hive_sync.database=testdb +hoodie.datasource.hive_sync.table=table1 +hoodie.datasource.hive_sync.assume_date_partitioning=false +hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path +hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor + diff --git a/docker/demo/config/test-suite/multi-writer-local-1.properties b/docker/demo/config/test-suite/multi-writer-local-1.properties new file mode 100644 index 000000000..be16f91c1 --- /dev/null +++ b/docker/demo/config/test-suite/multi-writer-local-1.properties @@ -0,0 +1,57 @@ + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +hoodie.insert.shuffle.parallelism=2 +hoodie.upsert.shuffle.parallelism=2 +hoodie.bulkinsert.shuffle.parallelism=2 +hoodie.delete.shuffle.parallelism=2 + +hoodie.metadata.enable=false + +hoodie.deltastreamer.source.test.num_partitions=100 +hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false +hoodie.deltastreamer.source.test.max_unique_records=100000000 +hoodie.embed.timeline.server=false +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector + +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector +hoodie.datasource.hive_sync.skip_ro_suffix=true + +hoodie.datasource.write.recordkey.field=_row_key +hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator +hoodie.datasource.write.partitionpath.field=timestamp + +hoodie.write.concurrency.mode=optimistic_concurrency_control +hoodie.cleaner.policy.failed.writes=LAZY +hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider + +hoodie.deltastreamer.source.dfs.root=/tmp/hudi/input1 +hoodie.deltastreamer.schemaprovider.target.schema.file=file:/tmp/source.avsc +hoodie.deltastreamer.schemaprovider.source.schema.file=file:/tmp/source.avsc +hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP +hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd + +hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/ +hoodie.datasource.hive_sync.database=testdb +hoodie.datasource.hive_sync.table=table1 +hoodie.datasource.hive_sync.assume_date_partitioning=false +hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path +hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor + diff --git a/docker/demo/config/test-suite/multi-writer-local-2.properties b/docker/demo/config/test-suite/multi-writer-local-2.properties new file mode 100644 index 000000000..08f294ce1 --- /dev/null +++ b/docker/demo/config/test-suite/multi-writer-local-2.properties @@ -0,0 +1,57 @@ + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +hoodie.insert.shuffle.parallelism=2 +hoodie.upsert.shuffle.parallelism=2 +hoodie.bulkinsert.shuffle.parallelism=2 +hoodie.delete.shuffle.parallelism=2 + +hoodie.metadata.enable=false + +hoodie.deltastreamer.source.test.num_partitions=100 +hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false +hoodie.deltastreamer.source.test.max_unique_records=100000000 +hoodie.embed.timeline.server=false +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector + +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector +hoodie.datasource.hive_sync.skip_ro_suffix=true + +hoodie.datasource.write.recordkey.field=_row_key +hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator +hoodie.datasource.write.partitionpath.field=timestamp + +hoodie.write.concurrency.mode=optimistic_concurrency_control +hoodie.cleaner.policy.failed.writes=LAZY +hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.InProcessLockProvider + +hoodie.deltastreamer.source.dfs.root=/tmp/hudi/input2 +hoodie.deltastreamer.schemaprovider.target.schema.file=file:/tmp/source.avsc +hoodie.deltastreamer.schemaprovider.source.schema.file=file:/tmp/source.avsc +hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP +hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd + +hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/ +hoodie.datasource.hive_sync.database=testdb +hoodie.datasource.hive_sync.table=table1 +hoodie.datasource.hive_sync.assume_date_partitioning=false +hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path +hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor + diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java index 839654039..85f98935f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java @@ -181,7 +181,7 @@ public class HoodieMetricsConfig extends HoodieConfig { hoodieMetricsConfig.setDefaultOnCondition(reporterType == MetricsReporterType.GRAPHITE, HoodieMetricsGraphiteConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build()); hoodieMetricsConfig.setDefaultOnCondition(reporterType == MetricsReporterType.CLOUDWATCH, - HoodieMetricsCloudWatchConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build()); + HoodieMetricsCloudWatchConfig.newBuilder().fromProperties(hoodieMetricsConfig.getProps()).build()); return hoodieMetricsConfig; } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieMultiWriterTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieMultiWriterTestSuiteJob.java new file mode 100644 index 000000000..6cff49982 --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieMultiWriterTestSuiteJob.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.utilities.UtilHelpers; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.SparkSession; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Multi write test suite job to assist in testing multi-writer scenarios. This test spins up one thread per writer as per configurations. + * Three params are of interest to this job in addition to regular HoodieTestsuiteJob. + * --input-base-paths "base_path/input1,base_path/input2" + * --props-paths "file:props_path/multi-writer-1.properties,file:/props_path/multi-writer-2.properties" + * --workload-yaml-paths "file:some_path/multi-writer-1-ds.yaml,file:/some_path/multi-writer-2-sds.yaml" + * + * Each of these should have same number of comma separated entries. + * Each writer will generate data in the corresponding input-base-path. + * and each writer will take in its own properties path and the respective yaml file as well. + * + * Common tests: + * Writer 1 DeltaStreamer ingesting data into partitions 0 to 10, Writer 2 Spark datasource ingesting data into partitions 100 to 110. + * Multiple spark datasource writers, each writing to exclusive set of partitions. + * + * Example comamnd + * spark-submit + * --packages org.apache.spark:spark-avro_2.11:2.4.0 + * --conf spark.task.cpus=3 + * --conf spark.executor.cores=3 + * --conf spark.task.maxFailures=100 + * --conf spark.memory.fraction=0.4 + * --conf spark.rdd.compress=true + * --conf spark.kryoserializer.buffer.max=2000m + * --conf spark.serializer=org.apache.spark.serializer.KryoSerializer + * --conf spark.memory.storageFraction=0.1 + * --conf spark.shuffle.service.enabled=true + * --conf spark.sql.hive.convertMetastoreParquet=false + * --conf spark.driver.maxResultSize=12g + * --conf spark.executor.heartbeatInterval=120s + * --conf spark.network.timeout=600s + * --conf spark.yarn.max.executor.failures=10 + * --conf spark.sql.catalogImplementation=hive + * --conf spark.driver.extraClassPath=/var/demo/jars/* + * --conf spark.executor.extraClassPath=/var/demo/jars/* + * --class org.apache.hudi.integ.testsuite.HoodieMultiWriterTestSuiteJob /opt/hudi-integ-test-bundle-0.11.0-SNAPSHOT.jar + * --source-ordering-field test_suite_source_ordering_field + * --use-deltastreamer + * --target-base-path /user/hive/warehouse/hudi-integ-test-suite/output + * --input-base-paths "/user/hive/warehouse/hudi-integ-test-suite/input1,/user/hive/warehouse/hudi-integ-test-suite/input2" + * --target-table hudi_table + * --props-paths "multi-writer-1.properties,multi-writer-2.properties" + * --schemaprovider-class org.apache.hudi.integ.testsuite.schema.TestSuiteFileBasedSchemaProvider + * --source-class org.apache.hudi.utilities.sources.AvroDFSSource --input-file-size 125829120 + * --workload-yaml-paths "file:/opt/multi-writer-1-ds.yaml,file:/opt/multi-writer-2-sds.yaml" + * --workload-generator-classname org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator + * --table-type COPY_ON_WRITE --compact-scheduling-minshare 1 + * --input-base-path "dummyValue" + * --workload-yaml-path "dummyValue" + * --props "dummyValue" + * --use-hudi-data-to-generate-updates + * + * Example command that works w/ docker. + * + */ +public class HoodieMultiWriterTestSuiteJob { + + private static final Logger LOG = LogManager.getLogger(HoodieMultiWriterTestSuiteJob.class); + + public static void main(String[] args) throws Exception { + final HoodieMultiWriterTestSuiteConfig cfg = new HoodieMultiWriterTestSuiteConfig(); + JCommander cmd = new JCommander(cfg, args); + if (cfg.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + + JavaSparkContext jssc = UtilHelpers.buildSparkContext("multi-writer-test-run-" + cfg.outputTypeName + + "-" + cfg.inputFormatName, cfg.sparkMaster); + + String[] inputPaths = cfg.inputBasePaths.split(","); + String[] yamls = cfg.workloadYamlPaths.split(","); + String[] propsFiles = cfg.propsFilePaths.split(","); + + if (inputPaths.length != yamls.length || yamls.length != propsFiles.length) { + throw new HoodieException("Input paths, property file and yaml file counts does not match "); + } + + ExecutorService executor = Executors.newFixedThreadPool(inputPaths.length); + + List testSuiteConfigList = new ArrayList<>(); + int jobIndex = 0; + for (String inputPath : inputPaths) { + HoodieMultiWriterTestSuiteConfig testSuiteConfig = new HoodieMultiWriterTestSuiteConfig(); + deepCopyConfigs(cfg, testSuiteConfig); + testSuiteConfig.inputBasePath = inputPath; + testSuiteConfig.workloadYamlPath = yamls[jobIndex]; + testSuiteConfig.propsFilePath = propsFiles[jobIndex]; + testSuiteConfigList.add(testSuiteConfig); + jobIndex++; + } + + AtomicBoolean jobFailed = new AtomicBoolean(false); + AtomicInteger counter = new AtomicInteger(0); + List> completableFutureList = new ArrayList<>(); + testSuiteConfigList.forEach(hoodieTestSuiteConfig -> { + try { + // start each job at 20 seconds interval so that metaClient instantiation does not overstep + Thread.sleep(counter.get() * 20000); + LOG.info("Starting job " + hoodieTestSuiteConfig.toString()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + completableFutureList.add(CompletableFuture.supplyAsync(() -> { + boolean toReturn = true; + try { + new HoodieTestSuiteJob(hoodieTestSuiteConfig, jssc, false).runTestSuite(); + LOG.info("Job completed successfully"); + } catch (Exception e) { + if (!jobFailed.getAndSet(true)) { + LOG.error("Exception thrown " + e.getMessage() + ", cause : " + e.getCause()); + throw new RuntimeException("HoodieTestSuiteJob Failed " + e.getCause() + ", and msg " + e.getMessage(), e); + } else { + LOG.info("Already a job failed. so, not throwing any exception "); + } + } + return toReturn; + }, executor)); + counter.getAndIncrement(); + }); + + LOG.info("Going to await until all jobs complete"); + try { + CompletableFuture completableFuture = allOfTerminateOnFailure(completableFutureList); + completableFuture.get(); + } finally { + executor.shutdownNow(); + if (jssc != null) { + LOG.info("Completed and shutting down spark context "); + LOG.info("Shutting down spark session and JavaSparkContext"); + SparkSession.builder().config(jssc.getConf()).enableHiveSupport().getOrCreate().stop(); + jssc.close(); + } + } + } + + public static CompletableFuture allOfTerminateOnFailure(List> futures) { + CompletableFuture failure = new CompletableFuture(); + AtomicBoolean jobFailed = new AtomicBoolean(false); + for (CompletableFuture f : futures) { + f.exceptionally(ex -> { + if (!jobFailed.getAndSet(true)) { + System.out.println("One of the job failed. Cancelling all other futures. " + ex.getCause() + ", " + ex.getMessage()); + futures.forEach(future -> future.cancel(true)); + } + return null; + }); + } + return CompletableFuture.anyOf(failure, CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))); + } + + static void deepCopyConfigs(HoodieMultiWriterTestSuiteConfig globalConfig, HoodieMultiWriterTestSuiteConfig tableConfig) { + tableConfig.enableHiveSync = globalConfig.enableHiveSync; + tableConfig.enableMetaSync = globalConfig.enableMetaSync; + tableConfig.schemaProviderClassName = globalConfig.schemaProviderClassName; + tableConfig.sourceOrderingField = globalConfig.sourceOrderingField; + tableConfig.sourceClassName = globalConfig.sourceClassName; + tableConfig.tableType = globalConfig.tableType; + tableConfig.targetTableName = globalConfig.targetTableName; + tableConfig.operation = globalConfig.operation; + tableConfig.sourceLimit = globalConfig.sourceLimit; + tableConfig.checkpoint = globalConfig.checkpoint; + tableConfig.continuousMode = globalConfig.continuousMode; + tableConfig.filterDupes = globalConfig.filterDupes; + tableConfig.payloadClassName = globalConfig.payloadClassName; + tableConfig.forceDisableCompaction = globalConfig.forceDisableCompaction; + tableConfig.maxPendingCompactions = globalConfig.maxPendingCompactions; + tableConfig.maxPendingClustering = globalConfig.maxPendingClustering; + tableConfig.minSyncIntervalSeconds = globalConfig.minSyncIntervalSeconds; + tableConfig.transformerClassNames = globalConfig.transformerClassNames; + tableConfig.commitOnErrors = globalConfig.commitOnErrors; + tableConfig.compactSchedulingMinShare = globalConfig.compactSchedulingMinShare; + tableConfig.compactSchedulingWeight = globalConfig.compactSchedulingWeight; + tableConfig.deltaSyncSchedulingMinShare = globalConfig.deltaSyncSchedulingMinShare; + tableConfig.deltaSyncSchedulingWeight = globalConfig.deltaSyncSchedulingWeight; + tableConfig.sparkMaster = globalConfig.sparkMaster; + tableConfig.workloadDagGenerator = globalConfig.workloadDagGenerator; + tableConfig.outputTypeName = globalConfig.outputTypeName; + tableConfig.inputFormatName = globalConfig.inputFormatName; + tableConfig.inputParallelism = globalConfig.inputParallelism; + tableConfig.useDeltaStreamer = globalConfig.useDeltaStreamer; + tableConfig.cleanInput = globalConfig.cleanInput; + tableConfig.cleanOutput = globalConfig.cleanOutput; + tableConfig.targetBasePath = globalConfig.targetBasePath; + } + + public static class HoodieMultiWriterTestSuiteConfig extends HoodieTestSuiteJob.HoodieTestSuiteConfig { + + @Parameter(names = {"--input-base-paths"}, description = "base paths for input data" + + "(Will be created if did not exist first time around. If exists, more data will be added to that path)", + required = true) + public String inputBasePaths; + + @Parameter(names = { + "--workload-yaml-paths"}, description = "Workflow Dag yaml path to generate the workload") + public String workloadYamlPaths; + + @Parameter(names = { + "--props-paths"}, description = "Workflow Dag yaml path to generate the workload") + public String propsFilePaths; + } +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java index fe81f0c07..2d9f841ae 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java @@ -18,7 +18,6 @@ package org.apache.hudi.integ.testsuite; -import org.apache.avro.Schema; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -48,6 +47,7 @@ import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; +import org.apache.avro.Schema; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -93,13 +93,19 @@ public class HoodieTestSuiteJob { */ private transient HiveConf hiveConf; + private boolean stopJsc = true; private BuiltinKeyGenerator keyGenerator; private transient HoodieTableMetaClient metaClient; public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc) throws IOException { + this(cfg, jsc, true); + } + + public HoodieTestSuiteJob(HoodieTestSuiteConfig cfg, JavaSparkContext jsc, boolean stopJsc) throws IOException { log.warn("Running spark job w/ app id " + jsc.sc().applicationId()); this.cfg = cfg; this.jsc = jsc; + this.stopJsc = stopJsc; cfg.propsFilePath = FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString(); this.sparkSession = SparkSession.builder().config(jsc.getConf()).enableHiveSupport().getOrCreate(); this.fs = FSUtils.getFs(cfg.inputBasePath, jsc.hadoopConfiguration()); @@ -108,11 +114,15 @@ public class HoodieTestSuiteJob { this.hiveConf = getDefaultHiveConf(jsc.hadoopConfiguration()); this.keyGenerator = (BuiltinKeyGenerator) HoodieSparkKeyGeneratorFactory.createKeyGenerator(props); - metaClient = HoodieTableMetaClient.withPropertyBuilder() - .setTableType(cfg.tableType) - .setTableName(cfg.targetTableName) - .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue()) - .initTable(jsc.hadoopConfiguration(), cfg.targetBasePath); + if (!fs.exists(new Path(cfg.targetBasePath))) { + metaClient = HoodieTableMetaClient.withPropertyBuilder() + .setTableType(cfg.tableType) + .setTableName(cfg.targetTableName) + .setArchiveLogFolder(ARCHIVELOG_FOLDER.defaultValue()) + .initTable(jsc.hadoopConfiguration(), cfg.targetBasePath); + } else { + metaClient = HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(cfg.targetBasePath).build(); + } if (cfg.cleanInput) { Path inputPath = new Path(cfg.inputBasePath); @@ -167,15 +177,15 @@ public class HoodieTestSuiteJob { JavaSparkContext jssc = UtilHelpers.buildSparkContext("workload-generator-" + cfg.outputTypeName + "-" + cfg.inputFormatName, cfg.sparkMaster); - new HoodieTestSuiteJob(cfg, jssc).runTestSuite(); + new HoodieTestSuiteJob(cfg, jssc, true).runTestSuite(); } public WorkflowDag createWorkflowDag() throws IOException { WorkflowDag workflowDag = this.cfg.workloadYamlPath == null ? ((WorkflowDagGenerator) ReflectionUtils .loadClass((this.cfg).workloadDagGenerator)).build() : DagUtils.convertYamlPathToDag( - FSUtils.getFs(this.cfg.workloadYamlPath, jsc.hadoopConfiguration(), true), - this.cfg.workloadYamlPath); + FSUtils.getFs(this.cfg.workloadYamlPath, jsc.hadoopConfiguration(), true), + this.cfg.workloadYamlPath); return workflowDag; } @@ -207,11 +217,13 @@ public class HoodieTestSuiteJob { log.error("Failed to run Test Suite ", e); throw new HoodieException("Failed to run Test Suite ", e); } finally { - stopQuietly(); + if (stopJsc) { + stopQuietly(); + } } } - private void stopQuietly() { + protected void stopQuietly() { try { sparkSession.stop(); jsc.stop(); @@ -295,5 +307,8 @@ public class HoodieTestSuiteJob { @Parameter(names = {"--start-hive-metastore"}, description = "Start Hive Metastore to use for optimistic lock ") public Boolean startHiveMetastore = false; + + @Parameter(names = {"--use-hudi-data-to-generate-updates"}, description = "Use data from hudi to generate updates for new batches ") + public Boolean useHudiToGenerateUpdates = false; } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java index 0ac36687f..f6c8c8fc3 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DFSDeltaConfig.java @@ -40,18 +40,20 @@ public class DFSDeltaConfig extends DeltaConfig { private int inputParallelism; // Whether to delete older input data once it has been ingested private boolean deleteOldInputData; + private boolean useHudiToGenerateUpdates; public DFSDeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType deltaInputType, SerializableConfiguration configuration, String deltaBasePath, String targetBasePath, String schemaStr, Long maxFileSize, - int inputParallelism, boolean deleteOldInputData) { - super(deltaOutputMode, deltaInputType, configuration); + int inputParallelism, boolean deleteOldInputData, boolean useHudiToGenerateUpdates) { + super(deltaOutputMode, deltaInputType, configuration); this.deltaBasePath = deltaBasePath; this.schemaStr = schemaStr; this.maxFileSize = maxFileSize; this.datasetOutputPath = targetBasePath; this.inputParallelism = inputParallelism; this.deleteOldInputData = deleteOldInputData; + this.useHudiToGenerateUpdates = useHudiToGenerateUpdates; } public String getDeltaBasePath() { @@ -85,4 +87,8 @@ public class DFSDeltaConfig extends DeltaConfig { public boolean shouldDeleteOldInputData() { return deleteOldInputData; } + + public boolean shouldUseHudiToGenerateUpdates() { + return useHudiToGenerateUpdates; + } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java index 27760f711..d31ef195e 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java @@ -72,7 +72,7 @@ public class WriterContext { this.deltaGenerator = new DeltaGenerator( new DFSDeltaConfig(DeltaOutputMode.valueOf(cfg.outputTypeName), DeltaInputType.valueOf(cfg.inputFormatName), new SerializableConfiguration(jsc.hadoopConfiguration()), cfg.inputBasePath, cfg.targetBasePath, - schemaStr, cfg.limitFileSize, inputParallelism, cfg.deleteOldInput), + schemaStr, cfg.limitFileSize, inputParallelism, cfg.deleteOldInput, cfg.useHudiToGenerateUpdates), jsc, sparkSession, schemaStr, keyGenerator); log.info(String.format("Initialized writerContext with: %s", schemaStr)); } catch (Exception e) { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DeleteNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DeleteNode.java index b538b01d1..8eaea6541 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DeleteNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DeleteNode.java @@ -38,7 +38,7 @@ public class DeleteNode extends InsertNode { @Override protected void generate(DeltaGenerator deltaGenerator) throws Exception { if (!config.isDisableGenerate()) { - deltaGenerator.writeRecords(deltaGenerator.generateDeletes(config)).count(); + deltaGenerator.writeRecords(deltaGenerator.generateDeletes(config)).getValue().count(); } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java index f5cf56b99..33cce79e0 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/InsertNode.java @@ -59,7 +59,7 @@ public class InsertNode extends DagNode> { protected void generate(DeltaGenerator deltaGenerator) throws Exception { if (!config.isDisableGenerate()) { log.info("Generating input data for node {}", this.getName()); - this.deltaWriteStatsRDD = deltaGenerator.writeRecords(deltaGenerator.generateInserts(config)); + this.deltaWriteStatsRDD = deltaGenerator.writeRecords(deltaGenerator.generateInserts(config)).getValue(); this.deltaWriteStatsRDD.cache(); this.deltaWriteStatsRDD.count(); } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/UpsertNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/UpsertNode.java index 1377a4d6b..427ee74b6 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/UpsertNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/UpsertNode.java @@ -38,7 +38,7 @@ public class UpsertNode extends InsertNode { protected void generate(DeltaGenerator deltaGenerator) throws Exception { if (!config.isDisableGenerate()) { log.info("Generating input data {}", this.getName()); - deltaGenerator.writeRecords(deltaGenerator.generateUpdates(config)).count(); + deltaGenerator.writeRecords(deltaGenerator.generateUpdates(config)).getValue().count(); } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java index 69e32dfbc..e7bc7b00a 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java @@ -18,24 +18,9 @@ package org.apache.hudi.integ.testsuite.generator; -import java.io.IOException; -import java.io.Serializable; -import java.io.UncheckedIOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import java.util.stream.StreamSupport; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.hudi.integ.testsuite.converter.Converter; @@ -51,6 +36,9 @@ import org.apache.hudi.integ.testsuite.writer.DeltaWriterAdapter; import org.apache.hudi.integ.testsuite.writer.DeltaWriterFactory; import org.apache.hudi.keygen.BuiltinKeyGenerator; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; @@ -58,6 +46,20 @@ import org.apache.spark.storage.StorageLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.Serializable; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.StreamSupport; + import scala.Tuple2; /** @@ -85,7 +87,7 @@ public class DeltaGenerator implements Serializable { this.partitionPathFieldNames = keyGenerator.getPartitionPathFields(); } - public JavaRDD writeRecords(JavaRDD records) { + public Pair> writeRecords(JavaRDD records) { if (deltaOutputConfig.shouldDeleteOldInputData() && batchId > 1) { Path oldInputDir = new Path(deltaOutputConfig.getDeltaBasePath(), Integer.toString(batchId - 1)); try { @@ -107,7 +109,7 @@ public class DeltaGenerator implements Serializable { } }).flatMap(List::iterator); batchId++; - return ws; + return Pair.of(batchId, ws); } public int getBatchId() { @@ -156,15 +158,22 @@ public class DeltaGenerator implements Serializable { adjustedRDD = deltaInputReader.read(config.getNumRecordsUpsert()); adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc, config.getNumRecordsUpsert()); } else { - deltaInputReader = - new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig) deltaOutputConfig).getDatasetOutputPath(), - schemaStr); - if (config.getFractionUpsertPerFile() > 0) { - adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(), - config.getFractionUpsertPerFile()); + if (((DFSDeltaConfig) deltaOutputConfig).shouldUseHudiToGenerateUpdates()) { + deltaInputReader = + new DFSHoodieDatasetInputReader(jsc, ((DFSDeltaConfig) deltaOutputConfig).getDeltaBasePath(), + schemaStr); + if (config.getFractionUpsertPerFile() > 0) { + adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(), + config.getFractionUpsertPerFile()); + } else { + adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(), config + .getNumRecordsUpsert()); + } } else { - adjustedRDD = deltaInputReader.read(config.getNumUpsertPartitions(), config.getNumUpsertFiles(), config - .getNumRecordsUpsert()); + deltaInputReader = new DFSAvroDeltaInputReader(sparkSession, schemaStr, + ((DFSDeltaConfig) deltaOutputConfig).getDeltaBasePath(), Option.empty(), Option.empty()); + adjustedRDD = deltaInputReader.read(config.getNumRecordsUpsert()); + adjustedRDD = adjustRDDToGenerateExactNumUpdates(adjustedRDD, jsc, config.getNumRecordsUpsert()); } } diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkBulkInsertNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkBulkInsertNode.scala index 6654264a9..ac254bea8 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkBulkInsertNode.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkBulkInsertNode.scala @@ -46,7 +46,7 @@ class SparkBulkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus */ override def execute(context: ExecutionContext, curItrCount: Int): Unit = { if (!config.isDisableGenerate) { - context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).count() + context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).getValue().count() } val inputDF = AvroConversionUtils.createDataFrame(context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch, context.getWriterContext.getHoodieTestSuiteWriter.getSchema, diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeleteNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeleteNode.scala index 645787a87..ecf94b94e 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeleteNode.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeleteNode.scala @@ -19,7 +19,6 @@ package org.apache.hudi.integ.testsuite.dag.nodes import org.apache.avro.Schema -import org.apache.avro.generic.GenericRecord import org.apache.hudi.client.WriteStatus import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config @@ -51,39 +50,26 @@ class SparkDeleteNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] { override def execute(context: ExecutionContext, curItrCount: Int): Unit = { // Deletes can't be fetched using getNextBatch() bcoz, getInsert(schema) from payload will return empty for delete // records - val genRecsRDD = generateRecordsForDelete(config, context) + + val batchIdRecords = context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateDeletes(config)) + batchIdRecords.getValue().count() + + val pathToRead = context.getWriterContext.getCfg.inputBasePath + "/" + batchIdRecords.getKey() + val avroDf = context.getWriterContext.getSparkSession.read.format("avro").load(pathToRead) + val genRecsRDD = HoodieSparkUtils.createRdd(avroDf, "testStructName", "testNamespace", false, + org.apache.hudi.common.util.Option.of(new Schema.Parser().parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema))) + val inputDF = AvroConversionUtils.createDataFrame(genRecsRDD, context.getWriterContext.getHoodieTestSuiteWriter.getSchema, context.getWriterContext.getSparkSession) + inputDF.write.format("hudi") .options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap)) .option(DataSourceWriteOptions.TABLE_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName) .option(DataSourceWriteOptions.TABLE_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.tableType) .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL) - .option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX.key, "deltastreamer.checkpoint.key") - .option("deltastreamer.checkpoint.key", context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse("")) .option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName) .mode(SaveMode.Append) .save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath) } - - /** - * Generates records for delete operations in Spark. - * - * @param config Node configs. - * @param context The context needed for an execution of a node. - * @return Records in {@link RDD}. - */ - private def generateRecordsForDelete(config: Config, context: ExecutionContext): RDD[GenericRecord] = { - if (!config.isDisableGenerate) { - context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateDeletes(config)).count() - } - - context.getWriterContext.getHoodieTestSuiteWriter.getNextBatchForDeletes() - val pathToRead = context.getWriterContext.getCfg.inputBasePath + "/" + context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse("") - - val avroDf = context.getWriterContext.getSparkSession.read.format("avro").load(pathToRead) - HoodieSparkUtils.createRdd(avroDf, "testStructName", "testNamespace", false, - org.apache.hudi.common.util.Option.of(new Schema.Parser().parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema))) - } } diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala index b0bec48a4..bea5ae3d6 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkInsertNode.scala @@ -18,11 +18,16 @@ package org.apache.hudi.integ.testsuite.dag.nodes +import org.apache.avro.Schema import org.apache.hudi.client.WriteStatus +import org.apache.hudi.common.util.collection.Pair import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config import org.apache.hudi.integ.testsuite.dag.ExecutionContext -import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions} +import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats +import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkUtils} +import org.apache.log4j.LogManager +import org.apache.spark.api.java.JavaRDD import org.apache.spark.rdd.RDD import org.apache.spark.sql.SaveMode @@ -35,6 +40,7 @@ import scala.collection.JavaConverters._ */ class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] { + private val log = LogManager.getLogger(getClass) config = dagNodeConfig /** @@ -45,21 +51,26 @@ class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] { * @throws Exception Thrown if the execution failed. */ override def execute(context: ExecutionContext, curItrCount: Int): Unit = { - if (!config.isDisableGenerate) { - println("Generating input data for node {}", this.getName) - writeRecords(context) - } - val inputDF = AvroConversionUtils.createDataFrame(context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch, + println("Generating input data for node {}", this.getName) + + val batchIdRecords = writeRecords(context) + batchIdRecords.getValue().count() + + val pathToRead = context.getWriterContext.getCfg.inputBasePath + "/" + batchIdRecords.getKey() + val avroDf = context.getWriterContext.getSparkSession.read.format("avro").load(pathToRead) + val genRecsRDD = HoodieSparkUtils.createRdd(avroDf, "testStructName", "testNamespace", false, + org.apache.hudi.common.util.Option.of(new Schema.Parser().parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema))) + + val inputDF = AvroConversionUtils.createDataFrame(genRecsRDD, context.getWriterContext.getHoodieTestSuiteWriter.getSchema, context.getWriterContext.getSparkSession) + inputDF.write.format("hudi") .options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap)) .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "test_suite_source_ordering_field") .option(DataSourceWriteOptions.TABLE_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName) .option(DataSourceWriteOptions.TABLE_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.tableType) .option(DataSourceWriteOptions.OPERATION.key, getOperation()) - .option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX.key, "deltastreamer.checkpoint.key") - .option("deltastreamer.checkpoint.key", context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse("")) .option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName) .mode(SaveMode.Append) .save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath) @@ -69,7 +80,7 @@ class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] { DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL } - def writeRecords(context: ExecutionContext): Unit = { - context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).count() + def writeRecords(context: ExecutionContext): Pair[Integer, JavaRDD[DeltaWriteStats]] = { + context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)) } } diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala index f83bc5563..76e7576b1 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkUpsertNode.scala @@ -19,8 +19,12 @@ package org.apache.hudi.integ.testsuite.dag.nodes import org.apache.hudi.DataSourceWriteOptions +import org.apache.hudi.common.util.collection.Pair import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config import org.apache.hudi.integ.testsuite.dag.ExecutionContext +import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats +import org.apache.log4j.LogManager +import org.apache.spark.api.java.JavaRDD /** * Spark datasource based upsert node @@ -29,11 +33,46 @@ import org.apache.hudi.integ.testsuite.dag.ExecutionContext */ class SparkUpsertNode(dagNodeConfig: Config) extends SparkInsertNode(dagNodeConfig) { + private val log = LogManager.getLogger(getClass) + override def getOperation(): String = { DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL } - override def writeRecords(context: ExecutionContext): Unit = { - context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateUpdates(config)).count() + override def writeRecords(context: ExecutionContext): Pair[Integer, JavaRDD[DeltaWriteStats]] = { + context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateUpdates(config)) } + + /** + * Execute the {@link DagNode}. + * + * @param context The context needed for an execution of a node. + * @param curItrCount iteration count for executing the node. + * @throws Exception Thrown if the execution failed. + */ + /*override def execute(context: ExecutionContext, curItrCount: Int): Unit = { + println("Generating input data for node {}", this.getName) + + val batchIdRecords = writeRecords(context) + batchIdRecords.getValue().count() + + val pathToRead = context.getWriterContext.getCfg.inputBasePath + "/" + batchIdRecords.getKey() + val avroDf = context.getWriterContext.getSparkSession.read.format("avro").load(pathToRead) + val genRecsRDD = HoodieSparkUtils.createRdd(avroDf, "testStructName", "testNamespace", false, + org.apache.hudi.common.util.Option.of(new Schema.Parser().parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema))) + + val inputDF = AvroConversionUtils.createDataFrame(genRecsRDD, + context.getWriterContext.getHoodieTestSuiteWriter.getSchema, + context.getWriterContext.getSparkSession) + + inputDF.write.format("hudi") + .options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap)) + .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), "test_suite_source_ordering_field") + .option(DataSourceWriteOptions.TABLE_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName) + .option(DataSourceWriteOptions.TABLE_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.tableType) + .option(DataSourceWriteOptions.OPERATION.key, getOperation()) + .option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName) + .mode(SaveMode.Append) + .save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath) + }*/ } diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/BaseSparkSqlNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/BaseSparkSqlNode.scala index ce6a40efb..83e5598d4 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/BaseSparkSqlNode.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/BaseSparkSqlNode.scala @@ -57,7 +57,7 @@ abstract class BaseSparkSqlNode(dagNodeConfig: Config) extends DagNode[RDD[Write */ def prepareData(context: ExecutionContext): RDD[GenericRecord] = { if (!config.isDisableGenerate) { - context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).count() + context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).getValue().count() } context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch } diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlCreateTableNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlCreateTableNode.scala index 3db6aa2cc..dabe54d82 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlCreateTableNode.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlCreateTableNode.scala @@ -58,7 +58,7 @@ class SparkSqlCreateTableNode(dagNodeConfig: Config) extends DagNode[RDD[WriteSt if (config.shouldUseCtas) { // Prepares data for CTAS query if (!config.isDisableGenerate) { - context.getDeltaGenerator.writeRecords(context.getDeltaGenerator.generateInserts(config)).count() + context.getDeltaGenerator.writeRecords(context.getDeltaGenerator.generateInserts(config)).getValue().count() } val nextBatch = context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch val sparkSession = context.getWriterContext.getSparkSession diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlDeleteNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlDeleteNode.scala index 847381f8c..645f2030b 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlDeleteNode.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlDeleteNode.scala @@ -48,7 +48,7 @@ class SparkSqlDeleteNode(dagNodeConfig: Config) extends BaseSparkSqlNode(dagNode context.getWriterContext.getCfg.targetTableName, sparkSession.sparkContext.defaultParallelism) LOG.info("Number of records to delete: " + recordsToDelete.count()) // The update records corresponding to the SQL are only used for data validation - context.getDeltaGenerator().writeRecords(recordsToDelete).count() + context.getDeltaGenerator().writeRecords(recordsToDelete).getValue().count() recordsToDelete } diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlMergeNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlMergeNode.scala index b03230beb..52ba6be21 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlMergeNode.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlMergeNode.scala @@ -42,7 +42,7 @@ class SparkSqlMergeNode(dagNodeConfig: Config) extends BaseSparkSqlNode(dagNodeC */ override def prepareData(context: ExecutionContext): RDD[GenericRecord] = { if (!config.isDisableGenerate) { - context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateUpdates(config)).count() + context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateUpdates(config)).getValue().count() } context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch } diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlUpdateNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlUpdateNode.scala index fdc799fea..7405d3ff4 100644 --- a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlUpdateNode.scala +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/spark/sql/SparkSqlUpdateNode.scala @@ -48,7 +48,7 @@ class SparkSqlUpdateNode(dagNodeConfig: Config) extends BaseSparkSqlNode(dagNode context.getWriterContext.getCfg.targetTableName, sparkSession.sparkContext.defaultParallelism) LOG.info("Number of records to update: " + recordsToUpdate.count()) // The update records corresponding to the SQL are only used for data validation - context.getDeltaGenerator().writeRecords(recordsToUpdate).count() + context.getDeltaGenerator().writeRecords(recordsToUpdate).getValue().count() recordsToUpdate } diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java index ff92bd037..4a148da79 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/TestDFSHoodieTestSuiteWriterAdapter.java @@ -125,7 +125,7 @@ public class TestDFSHoodieTestSuiteWriterAdapter extends UtilitiesTestBase { public void testDFSWorkloadSinkWithMultipleFilesFunctional() throws IOException { DeltaConfig dfsSinkConfig = new DFSDeltaConfig(DeltaOutputMode.DFS, DeltaInputType.AVRO, new SerializableConfiguration(jsc.hadoopConfiguration()), dfsBasePath, dfsBasePath, - schemaProvider.getSourceSchema().toString(), 10240L, jsc.defaultParallelism(), false); + schemaProvider.getSourceSchema().toString(), 10240L, jsc.defaultParallelism(), false, false); DeltaWriterAdapter dfsDeltaWriterAdapter = DeltaWriterFactory .getDeltaWriterAdapter(dfsSinkConfig, 1); FlexibleSchemaRecordGenerationIterator itr = new FlexibleSchemaRecordGenerationIterator(1000,