diff --git a/docker/demo/config/test-suite/complex-dag-cow.yaml b/docker/demo/config/test-suite/complex-dag-cow.yaml index a10026c0b..5fa859683 100644 --- a/docker/demo/config/test-suite/complex-dag-cow.yaml +++ b/docker/demo/config/test-suite/complex-dag-cow.yaml @@ -13,122 +13,56 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -first_insert: - config: - record_size: 70000 - num_insert_partitions: 1 - repeat_count: 1 - num_records_insert: 1000 - type: InsertNode - deps: none -second_insert: - config: - record_size: 70000 - num_insert_partitions: 1 - repeat_count: 1 - num_records_insert: 10000 - deps: first_insert - type: InsertNode -third_insert: - config: - record_size: 70000 - num_insert_partitions: 1 - repeat_count: 1 - num_records_insert: 300 - deps: second_insert - type: InsertNode -first_rollback: - config: - deps: third_insert - type: RollbackNode -first_upsert: - config: - record_size: 70000 - num_insert_partitions: 1 - num_records_insert: 300 - repeat_count: 1 - num_records_upsert: 100 - num_upsert_partitions: 10 - type: UpsertNode - deps: first_rollback -first_hive_sync: - config: - queue_name: "adhoc" - engine: "mr" - type: HiveSyncNode - deps: first_upsert -first_hive_query: - config: - queue_name: "adhoc" - engine: "mr" - hive_queries: - query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" - result1: 0 - query2: "select count(*) from testdb.table1" - result2: 11300 - type: HiveQueryNode - deps: first_hive_sync -second_upsert: - config: - record_size: 70000 - num_insert_partitions: 1 - num_records_insert: 300 - repeat_count: 1 - num_records_upsert: 100 - num_upsert_partitions: 10 - type: UpsertNode - deps: first_hive_query -second_hive_query: - config: - queue_name: "adhoc" - engine: "mr" - hive_queries: - query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" - result1: 0 - query2: "select count(*) from testdb.table1" - result2: 11600 - type: HiveQueryNode - deps: second_upsert -fourth_insert: - config: - record_size: 70000 - num_insert_partitions: 1 - repeat_count: 1 - num_records_insert: 1000 - deps: second_hive_query - type: InsertNode -third_hive_query: - config: - queue_name: "adhoc" - engine: "mr" - hive_queries: - query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" - result1: 0 - query2: "select count(*) from testdb.table1" - result2: 12600 - type: HiveQueryNode - deps: fourth_insert -first_delete: - config: - record_size: 70000 - num_partitions_delete: 1 - num_records_delete: 200 - deps: third_hive_query - type: DeleteNode -fourth_hive_sync: - config: - queue_name: "adhoc" - engine: "mr" - type: HiveSyncNode - deps: first_delete -fourth_hive_query: - config: - queue_name: "adhoc" - engine: "mr" - hive_queries: - query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" - result1: 0 - query2: "select count(*) from testdb.table1" - result2: 12400 - type: HiveQueryNode - deps: fourth_hive_sync \ No newline at end of file +dag_name: cow-long-running-example.yaml +dag_rounds: 2 +dag_intermittent_delay_mins: 1 +dag_content: + first_insert: + config: + record_size: 100 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 1000 + type: InsertNode + deps: none + second_insert: + config: + record_size: 100 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 10000 + deps: first_insert + type: InsertNode + third_insert: + config: + record_size: 100 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 300 + deps: second_insert + type: InsertNode + first_validate: + config: + type: ValidateDatasetNode + deps: third_insert + first_upsert: + config: + record_size: 100 + num_partitions_insert: 1 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 100 + num_partitions_upsert: 1 + type: UpsertNode + deps: first_validate + first_delete: + config: + num_partitions_delete: 1 + num_records_delete: 2000 + type: DeleteNode + deps: first_upsert + second_validate: + config: + delete_input_data: true + type: ValidateDatasetNode + deps: first_delete \ No newline at end of file diff --git a/docker/demo/config/test-suite/complex-dag-mor.yaml b/docker/demo/config/test-suite/complex-dag-mor.yaml index 2652b0307..505e5e294 100644 --- a/docker/demo/config/test-suite/complex-dag-mor.yaml +++ b/docker/demo/config/test-suite/complex-dag-mor.yaml @@ -13,103 +13,107 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -first_insert: - config: - record_size: 70000 - num_insert_partitions: 1 - repeat_count: 1 - num_records_insert: 100 - type: InsertNode - deps: none -second_insert: - config: - record_size: 70000 - num_insert_partitions: 1 - repeat_count: 1 - num_records_insert: 100 - deps: first_insert - type: InsertNode -third_insert: - config: - record_size: 70000 - num_insert_partitions: 1 - repeat_count: 1 - num_records_insert: 300 - deps: second_insert - type: InsertNode -first_rollback: - config: - deps: third_insert - type: RollbackNode -first_upsert: - config: - record_size: 70000 - num_insert_partitions: 1 - num_records_insert: 300 - repeat_count: 1 - num_records_upsert: 100 - num_upsert_partitions: 10 - type: UpsertNode - deps: first_rollback -first_hive_sync: - config: - queue_name: "adhoc" - engine: "mr" - type: HiveSyncNode - deps: first_upsert -first_hive_query: - config: - queue_name: "adhoc" - engine: "mr" - type: HiveQueryNode - deps: first_hive_sync -second_upsert: - config: - record_size: 70000 - num_insert_partitions: 1 - num_records_insert: 300 - repeat_count: 1 - num_records_upsert: 100 - num_upsert_partitions: 10 - type: UpsertNode - deps: first_hive_query -second_hive_query: - config: - queue_name: "adhoc" - engine: "mr" - hive_queries: - query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" - result1: 0 - query2: "select count(*) from testdb.table1" - result2: 1100 - type: HiveQueryNode - deps: second_upsert -first_schedule_compact: - config: - type: ScheduleCompactNode - deps: second_hive_query -third_upsert: - config: - record_size: 70000 - num_insert_partitions: 1 - num_records_insert: 300 - repeat_count: 1 - num_records_upsert: 100 - num_upsert_partitions: 10 - type: UpsertNode - deps: first_schedule_compact -first_compact: - config: - type: CompactNode - deps: first_schedule_compact -third_hive_query: - config: - queue_name: "adhoc" - engine: "mr" - hive_queries: - query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" - result1: 0 - query2: "select count(*) from testdb.table1" - result2: 1400 - type: HiveQueryNode - deps: first_compact +dag_name: complex-dag-mor.yaml +dag_rounds: 1 +dag_intermittent_delay_mins: 10 +dag_content: + first_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 5 + num_records_insert: 100 + type: InsertNode + deps: none + second_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 5 + num_records_insert: 100 + deps: first_insert + type: InsertNode + third_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 2 + num_records_insert: 300 + deps: second_insert + type: InsertNode + first_rollback: + config: + deps: third_insert + type: RollbackNode + first_upsert: + config: + record_size: 70000 + num_partitions_insert: 1 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 100 + num_partitions_upsert: 10 + type: UpsertNode + deps: first_rollback + first_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_upsert + first_hive_query: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveQueryNode + deps: first_hive_sync + second_upsert: + config: + record_size: 70000 + num_partitions_insert: 1 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 100 + num_partitions_upsert: 10 + type: UpsertNode + deps: first_hive_query + second_hive_query: + config: + queue_name: "adhoc" + engine: "mr" + hive_queries: + query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" + result1: 0 + query2: "select count(*) from testdb.table1" + result2: 1100 + type: HiveQueryNode + deps: second_upsert + first_schedule_compact: + config: + type: ScheduleCompactNode + deps: second_hive_query + third_upsert: + config: + record_size: 70000 + num_partitions_insert: 1 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 100 + num_partitions_upsert: 10 + type: UpsertNode + deps: first_schedule_compact + first_compact: + config: + type: CompactNode + deps: first_schedule_compact + third_hive_query: + config: + queue_name: "adhoc" + engine: "mr" + hive_queries: + query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1" + result1: 0 + query2: "select count(*) from testdb.table1" + result2: 1400 + type: HiveQueryNode + deps: first_compact diff --git a/docker/demo/config/test-suite/cow-long-running-example.yaml b/docker/demo/config/test-suite/cow-long-running-example.yaml new file mode 100644 index 000000000..b7026f2dd --- /dev/null +++ b/docker/demo/config/test-suite/cow-long-running-example.yaml @@ -0,0 +1,68 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: cow-long-running-example.yaml +dag_rounds: 20 +dag_intermittent_delay_mins: 10 +dag_content: + first_insert: + config: + record_size: 100 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 1000 + type: InsertNode + deps: none + second_insert: + config: + record_size: 100 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 10000 + deps: first_insert + type: InsertNode + third_insert: + config: + record_size: 100 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 300 + deps: second_insert + type: InsertNode + first_validate: + config: + type: ValidateDatasetNode + deps: third_insert + first_upsert: + config: + record_size: 100 + num_partitions_insert: 1 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 100 + num_partitions_upsert: 1 + type: UpsertNode + deps: first_validate + first_delete: + config: + num_partitions_delete: 1 + num_records_delete: 2000 + type: DeleteNode + deps: first_upsert + second_validate: + config: + delete_input_data: true + type: ValidateDatasetNode + deps: first_delete \ No newline at end of file diff --git a/docker/demo/config/test-suite/test-source.properties b/docker/demo/config/test-suite/test-source.properties deleted file mode 100644 index cc18a39d5..000000000 --- a/docker/demo/config/test-suite/test-source.properties +++ /dev/null @@ -1,37 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# write configs -hoodie.datasource.write.recordkey.field=_row_key -hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator -hoodie.datasource.write.partitionpath.field=timestamp - - -# deltastreamer configs -hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd -hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP -hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-bench/input -hoodie.deltastreamer.schemaprovider.source.schema.file=/var/hoodie/ws/docker/demo/config/test-suite/source.avsc -hoodie.deltastreamer.schemaprovider.target.schema.file=/var/hoodie/ws/docker/demo/config/bench/source.avsc - -#hive sync -hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/ -hoodie.datasource.hive_sync.database=testdb -hoodie.datasource.hive_sync.table=table1 -hoodie.datasource.hive_sync.use_jdbc=false -hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor -hoodie.datasource.hive_sync.assume_date_partitioning=true -hoodie.datasource.hive_sync.use_pre_apache_input_format=true diff --git a/docker/demo/config/test-suite/test.properties b/docker/demo/config/test-suite/test.properties index a7fd3986a..0aa0f45c0 100644 --- a/docker/demo/config/test-suite/test.properties +++ b/docker/demo/config/test-suite/test.properties @@ -1,3 +1,4 @@ + hoodie.insert.shuffle.parallelism=100 hoodie.upsert.shuffle.parallelism=100 hoodie.bulkinsert.shuffle.parallelism=100 @@ -8,6 +9,13 @@ hoodie.deltastreamer.source.test.max_unique_records=100000000 hoodie.embed.timeline.server=false hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector +hoodie.insert.shuffle.parallelism=100 +hoodie.upsert.shuffle.parallelism=100 +hoodie.bulkinsert.shuffle.parallelism=100 + +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector +hoodie.datasource.hive_sync.skip_ro_suffix=true + hoodie.datasource.write.recordkey.field=_row_key hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator hoodie.datasource.write.partitionpath.field=timestamp diff --git a/hudi-integ-test/README.md b/hudi-integ-test/README.md index a6cdd0847..ff64ed124 100644 --- a/hudi-integ-test/README.md +++ b/hudi-integ-test/README.md @@ -142,7 +142,9 @@ Start the Hudi Docker demo: docker/setup_demo.sh ``` -NOTE: We need to make a couple of environment changes for Hive 2.x support. This will be fixed once Hudi moves to Spark 3.x +NOTE: We need to make a couple of environment changes for Hive 2.x support. This will be fixed once Hudi moves to Spark 3.x. +Execute below if you are using Hudi query node in your dag. If not, below section is not required. +Also, for longer running tests, go to next section. ``` docker exec -it adhoc-2 bash @@ -214,7 +216,7 @@ spark-submit \ --conf spark.sql.catalogImplementation=hive \ --class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob \ /opt/hudi-integ-test-bundle-0.6.1-SNAPSHOT.jar \ ---source-ordering-field timestamp \ +--source-ordering-field test_suite_source_ordering_field \ --use-deltastreamer \ --target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \ --input-base-path /user/hive/warehouse/hudi-integ-test-suite/input \ @@ -253,7 +255,7 @@ spark-submit \ --conf spark.sql.catalogImplementation=hive \ --class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob \ /opt/hudi-integ-test-bundle-0.6.1-SNAPSHOT.jar \ ---source-ordering-field timestamp \ +--source-ordering-field test_suite_source_ordering_field \ --use-deltastreamer \ --target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \ --input-base-path /user/hive/warehouse/hudi-integ-test-suite/input \ @@ -267,3 +269,182 @@ spark-submit \ --table-type MERGE_ON_READ \ --compact-scheduling-minshare 1 ``` + +For long running test suite, validation has to be done differently. Idea is to run same dag in a repeated manner. +Hence "ValidateDatasetNode" is introduced which will read entire input data and compare it with hudi contents both via +spark datasource and hive table via spark sql engine. + +If you have "ValidateDatasetNode" in your dag, do not replace hive jars as instructed above. Spark sql engine does not +go well w/ hive2* jars. So, after running docker setup, just copy test.properties and your dag of interest and you are +good to go ahead. + +For repeated runs, two additional configs need to be set. "dag_rounds" and "dag_intermittent_delay_mins". +This means that your dag will be repeated for N times w/ a delay of Y mins between each round. + +Also, ValidateDatasetNode can be configured in two ways. Either with "delete_input_data: true" set or not set. +When "delete_input_data" is set for ValidateDatasetNode, once validation is complete, entire input data will be deleted. +So, suggestion is to use this ValidateDatasetNode as the last node in the dag with "delete_input_data". +Example dag: +``` + Insert + Upsert + ValidateDatasetNode with delete_input_data = true +``` + +If above dag is run with "dag_rounds" = 10 and "dag_intermittent_delay_mins" = 10, then this dag will run for 10 times +with 10 mins delay between every run. At the end of every run, records written as part of this round will be validated. +At the end of each validation, all contents of input are deleted. +For eg: incase of above dag, +``` +Round1: + insert => inputPath/batch1 + upsert -> inputPath/batch2 + Validate with delete_input_data = true + Validates contents from batch1 and batch2 are in hudi and ensures Row equality + Since "delete_input_data" is set, deletes contents from batch1 and batch2. +Round2: + insert => inputPath/batch3 + upsert -> inputPath/batch4 + Validate with delete_input_data = true + Validates contents from batch3 and batch4 are in hudi and ensures Row equality + Since "delete_input_data" is set, deletes contents from batch3 and batch4. +Round3: + insert => inputPath/batch5 + upsert -> inputPath/batch6 + Validate with delete_input_data = true + Validates contents from batch5 and batch6 are in hudi and ensures Row equality + Since "delete_input_data" is set, deletes contents from batch5 and batch6. +. +. +``` +If you wish to do a cumulative validation, do not set delete_input_data in ValidateDatasetNode. But remember that this +may not scale beyond certain point since input data as well as hudi content's keeps occupying the disk and grows for +every cycle. + +Lets see an example where you don't set "delete_input_data" as part of Validation. +``` +Round1: + insert => inputPath/batch1 + upsert -> inputPath/batch2 + Validate: validates contents from batch1 and batch2 are in hudi and ensures Row equality +Round2: + insert => inputPath/batch3 + upsert -> inputPath/batch4 + Validate: validates contents from batch1 to batch4 are in hudi and ensures Row equality +Round3: + insert => inputPath/batch5 + upsert -> inputPath/batch6 + Validate: validates contents from batch1 and batch6 are in hudi and ensures Row equality +. +. +``` + +You could also have validations in the middle of your dag and not set the "delete_input_data". But set it only in the +last node in the dag. +``` +Round1: + insert => inputPath/batch1 + upsert -> inputPath/batch2 + Validate: validates contents from batch1 and batch2 are in hudi and ensures Row equality + insert => inputPath/batch3 + upsert -> inputPath/batch4 + Validate with delete_input_data = true + Validates contents from batch1 to batch4 are in hudi and ensures Row equality + since "delete_input_data" is set to true, this node deletes contents from batch1 and batch4. +Round2: + insert => inputPath/batch5 + upsert -> inputPath/batch6 + Validate: validates contents from batch5 and batch6 are in hudi and ensures Row equality + insert => inputPath/batch7 + upsert -> inputPath/batch8 + Validate: validates contents from batch5 to batch8 are in hudi and ensures Row equality + since "delete_input_data" is set to true, this node deletes contents from batch5 to batch8. +Round3: + insert => inputPath/batch9 + upsert -> inputPath/batch10 + Validate: validates contents from batch9 and batch10 are in hudi and ensures Row equality + insert => inputPath/batch11 + upsert -> inputPath/batch12 + Validate with delete_input_data = true + Validates contents from batch9 to batch12 are in hudi and ensures Row equality + Set "delete_input_data" to true. so this node deletes contents from batch9 to batch12. +. +. +``` +Above dag was just an example for illustration purposes. But you can make it complex as per your needs. +``` + Insert + Upsert + Delete + Validate w/o deleting + Insert + Rollback + Validate w/o deleting + Upsert + Validate w/ deletion +``` +With this dag, you can set the two additional configs "dag_rounds" and "dag_intermittent_delay_mins" and have a long +running test suite. + +``` +dag_rounds: 1 +dag_intermittent_delay_mins: 10 +dag_content: + Insert + Upsert + Delete + Validate w/o deleting + Insert + Rollback + Validate w/o deleting + Upsert + Validate w/ deletion + +``` + +Sample COW command with repeated runs. +``` +spark-submit \ +--packages org.apache.spark:spark-avro_2.11:2.4.0 \ +--conf spark.task.cpus=1 \ +--conf spark.executor.cores=1 \ +--conf spark.task.maxFailures=100 \ +--conf spark.memory.fraction=0.4 \ +--conf spark.rdd.compress=true \ +--conf spark.kryoserializer.buffer.max=2000m \ +--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ +--conf spark.memory.storageFraction=0.1 \ +--conf spark.shuffle.service.enabled=true \ +--conf spark.sql.hive.convertMetastoreParquet=false \ +--conf spark.driver.maxResultSize=12g \ +--conf spark.executor.heartbeatInterval=120s \ +--conf spark.network.timeout=600s \ +--conf spark.yarn.max.executor.failures=10 \ +--conf spark.sql.catalogImplementation=hive \ +--conf spark.driver.extraClassPath=/var/demo/jars/* \ +--conf spark.executor.extraClassPath=/var/demo/jars/* \ +--class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob \ +/opt/hudi-integ-test-bundle-0.6.1-SNAPSHOT.jar \ +--source-ordering-field test_suite_source_ordering_field \ +--use-deltastreamer \ +--target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \ +--input-base-path /user/hive/warehouse/hudi-integ-test-suite/input \ +--target-table table1 \ +--props test.properties \ +--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ +--source-class org.apache.hudi.utilities.sources.AvroDFSSource \ +--input-file-size 125829120 \ +--workload-yaml-path file:/var/hoodie/ws/docker/demo/config/test-suite/complex-dag-cow.yaml \ +--workload-generator-classname org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \ +--table-type COPY_ON_WRITE \ +--compact-scheduling-minshare 1 +``` + +A ready to use dag is available under docker/demo/config/test-suite/ that could give you an idea for long running +dags. +cow-per-round-mixed-validate.yaml + +As of now, "ValidateDatasetNode" uses spark data source and hive tables for comparison. Hence COW and real time view in +MOR can be tested. + + \ No newline at end of file diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java index 0387731d6..8d2f79def 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java @@ -25,13 +25,13 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.utilities.deltastreamer.DeltaSync; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; import org.apache.hudi.utilities.schema.SchemaProvider; + import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; /** - * Extends the {@link HoodieDeltaStreamer} to expose certain operations helpful in running the Test Suite. - * This is done to achieve 2 things 1) Leverage some components of {@link HoodieDeltaStreamer} 2) - * Piggyback on the suite to test {@link HoodieDeltaStreamer} + * Extends the {@link HoodieDeltaStreamer} to expose certain operations helpful in running the Test Suite. This is done to achieve 2 things 1) Leverage some components of {@link HoodieDeltaStreamer} + * 2) Piggyback on the suite to test {@link HoodieDeltaStreamer} */ public class HoodieDeltaStreamerWrapper extends HoodieDeltaStreamer { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java index 7b3324e4b..b5037e995 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteJob.java @@ -18,13 +18,6 @@ package org.apache.hudi.integ.testsuite; -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import java.io.IOException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hudi.DataSourceUtils; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.fs.FSUtils; @@ -35,23 +28,30 @@ import org.apache.hudi.exception.HoodieException; import org.apache.hudi.integ.testsuite.dag.DagUtils; import org.apache.hudi.integ.testsuite.dag.WorkflowDag; import org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator; +import org.apache.hudi.integ.testsuite.dag.WriterContext; import org.apache.hudi.integ.testsuite.dag.scheduler.DagScheduler; import org.apache.hudi.integ.testsuite.reader.DeltaInputType; import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode; -import org.apache.hudi.integ.testsuite.dag.WriterContext; import org.apache.hudi.keygen.BuiltinKeyGenerator; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer; -import org.apache.hudi.utilities.schema.SchemaProvider; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + /** - * This is the entry point for running a Hudi Test Suite. Although this class has similarities with - * {@link HoodieDeltaStreamer} this class does not extend it since do not want to create a dependency on the changes in - * DeltaStreamer. + * This is the entry point for running a Hudi Test Suite. Although this class has similarities with {@link HoodieDeltaStreamer} this class does not extend it since do not want to create a dependency + * on the changes in DeltaStreamer. */ public class HoodieTestSuiteJob { @@ -133,10 +133,10 @@ public class HoodieTestSuiteJob { public WorkflowDag createWorkflowDag() throws IOException { WorkflowDag workflowDag = this.cfg.workloadYamlPath == null ? ((WorkflowDagGenerator) ReflectionUtils - .loadClass((this.cfg).workloadDagGenerator)).build() - : DagUtils.convertYamlPathToDag( - FSUtils.getFs(this.cfg.workloadYamlPath, jsc.hadoopConfiguration(), true), - this.cfg.workloadYamlPath); + .loadClass((this.cfg).workloadDagGenerator)).build() + : DagUtils.convertYamlPathToDag( + FSUtils.getFs(this.cfg.workloadYamlPath, jsc.hadoopConfiguration(), true), + this.cfg.workloadYamlPath); return workflowDag; } @@ -147,7 +147,7 @@ public class HoodieTestSuiteJob { long startTime = System.currentTimeMillis(); WriterContext writerContext = new WriterContext(jsc, props, cfg, keyGenerator, sparkSession); writerContext.initContext(jsc); - DagScheduler dagScheduler = new DagScheduler(workflowDag, writerContext); + DagScheduler dagScheduler = new DagScheduler(workflowDag, writerContext, jsc); dagScheduler.schedule(); log.info("Finished scheduling all tasks, Time taken {}", System.currentTimeMillis() - startTime); } catch (Exception e) { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java index 81f406be3..329ef16bd 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java @@ -87,6 +87,7 @@ public class DeltaConfig implements Serializable { private static String HIVE_LOCAL = "hive_local"; private static String REINIT_CONTEXT = "reinitialize_context"; private static String START_PARTITION = "start_partition"; + private static String DELETE_INPUT_DATA = "delete_input_data"; private Map configsMap; @@ -154,6 +155,10 @@ public class DeltaConfig implements Serializable { return Boolean.valueOf(configsMap.getOrDefault(REINIT_CONTEXT, false).toString()); } + public boolean isDeleteInputData() { + return Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA, false).toString()); + } + public Map getOtherConfigs() { if (configsMap == null) { return new HashMap<>(); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/converter/UpdateConverter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/converter/UpdateConverter.java index 24520a362..1e8acf580 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/converter/UpdateConverter.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/converter/UpdateConverter.java @@ -18,12 +18,14 @@ package org.apache.hudi.integ.testsuite.converter; -import java.util.List; -import org.apache.avro.generic.GenericRecord; import org.apache.hudi.integ.testsuite.generator.LazyRecordGeneratorIterator; import org.apache.hudi.integ.testsuite.generator.UpdateGeneratorIterator; + +import org.apache.avro.generic.GenericRecord; import org.apache.spark.api.java.JavaRDD; +import java.util.List; + /** * This converter creates an update {@link GenericRecord} from an existing {@link GenericRecord}. */ @@ -36,7 +38,7 @@ public class UpdateConverter implements Converter private final List recordKeyFields; private final int minPayloadSize; - public UpdateConverter(String schemaStr, int minPayloadSize, List partitionPathFields, + public UpdateConverter(String schemaStr, int minPayloadSize, List partitionPathFields, List recordKeyFields) { this.schemaStr = schemaStr; this.partitionPathFields = partitionPathFields; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java index d5358238d..1211c0098 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/DagUtils.java @@ -48,6 +48,15 @@ import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; */ public class DagUtils { + public static final String DAG_NAME = "dag_name"; + public static final String DAG_ROUNDS = "dag_rounds"; + public static final String DAG_INTERMITTENT_DELAY_MINS = "dag_intermittent_delay_mins"; + public static final String DAG_CONTENT = "dag_content"; + + public static int DEFAULT_DAG_ROUNDS = 1; + public static int DEFAULT_INTERMITTENT_DELAY_MINS = 10; + public static String DEFAULT_DAG_NAME = "TestDagName"; + static final ObjectMapper MAPPER = new ObjectMapper(); /** @@ -62,15 +71,38 @@ public class DagUtils { * Converts a YAML representation to {@link WorkflowDag}. */ public static WorkflowDag convertYamlToDag(String yaml) throws IOException { + int dagRounds = DEFAULT_DAG_ROUNDS; + int intermittentDelayMins = DEFAULT_INTERMITTENT_DELAY_MINS; + String dagName = DEFAULT_DAG_NAME; Map allNodes = new HashMap<>(); final ObjectMapper yamlReader = new ObjectMapper(new YAMLFactory()); final JsonNode jsonNode = yamlReader.readTree(yaml); Iterator> itr = jsonNode.fields(); while (itr.hasNext()) { Entry dagNode = itr.next(); - allNodes.put(dagNode.getKey(), convertJsonToDagNode(allNodes, dagNode.getKey(), dagNode.getValue())); + String key = dagNode.getKey(); + switch (key) { + case DAG_NAME: + dagName = dagNode.getValue().asText(); + break; + case DAG_ROUNDS: + dagRounds = dagNode.getValue().asInt(); + break; + case DAG_INTERMITTENT_DELAY_MINS: + intermittentDelayMins = dagNode.getValue().asInt(); + break; + case DAG_CONTENT: + JsonNode dagContent = dagNode.getValue(); + Iterator> contentItr = dagContent.fields(); + while(contentItr.hasNext()) { + Entry dagContentNode = contentItr.next(); + allNodes.put(dagContentNode.getKey(), convertJsonToDagNode(allNodes, dagContentNode.getKey(), dagContentNode.getValue())); + } + default: + break; + } } - return new WorkflowDag(findRootNodes(allNodes)); + return new WorkflowDag(dagName, dagRounds, intermittentDelayMins, findRootNodes(allNodes)); } /** diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/SimpleWorkflowDagGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/SimpleWorkflowDagGenerator.java index ad6e9cb0c..1fe229442 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/SimpleWorkflowDagGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/SimpleWorkflowDagGenerator.java @@ -18,8 +18,6 @@ package org.apache.hudi.integ.testsuite.dag; -import java.util.ArrayList; -import java.util.List; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig; import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; @@ -27,9 +25,11 @@ import org.apache.hudi.integ.testsuite.dag.nodes.HiveQueryNode; import org.apache.hudi.integ.testsuite.dag.nodes.InsertNode; import org.apache.hudi.integ.testsuite.dag.nodes.UpsertNode; +import java.util.ArrayList; +import java.util.List; + /** - * An example of how to generate a workflow dag programmatically. This is also used as the default workflow dag if - * none is provided. + * An example of how to generate a workflow dag programmatically. This is also used as the default workflow dag if none is provided. */ public class SimpleWorkflowDagGenerator implements WorkflowDagGenerator { diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WorkflowDag.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WorkflowDag.java index e9171fc47..f622bb7a7 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WorkflowDag.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WorkflowDag.java @@ -18,20 +18,47 @@ package org.apache.hudi.integ.testsuite.dag; -import java.util.List; import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; +import java.util.List; + +import static org.apache.hudi.integ.testsuite.dag.DagUtils.DEFAULT_DAG_NAME; +import static org.apache.hudi.integ.testsuite.dag.DagUtils.DEFAULT_DAG_ROUNDS; +import static org.apache.hudi.integ.testsuite.dag.DagUtils.DEFAULT_INTERMITTENT_DELAY_MINS; + /** * Workflow dag that encapsulates all execute nodes. */ public class WorkflowDag { + private String dagName; + private int rounds; + private int intermittentDelayMins; private List> nodeList; public WorkflowDag(List> nodeList) { + this(DEFAULT_DAG_NAME, DEFAULT_DAG_ROUNDS, DEFAULT_INTERMITTENT_DELAY_MINS, nodeList); + } + + public WorkflowDag(String dagName, int rounds, int intermittentDelayMins, List> nodeList) { + this.dagName = dagName; + this.rounds = rounds; + this.intermittentDelayMins = intermittentDelayMins; this.nodeList = nodeList; } + public String getDagName() { + return dagName; + } + + public int getRounds() { + return rounds; + } + + public int getIntermittentDelayMins() { + return intermittentDelayMins; + } + public List> getNodeList() { return nodeList; } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java index e457f0a8d..650ab1eea 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/WriterContext.java @@ -21,15 +21,16 @@ package org.apache.hudi.integ.testsuite.dag; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig; import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter; import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig; +import org.apache.hudi.integ.testsuite.generator.DeltaGenerator; import org.apache.hudi.integ.testsuite.reader.DeltaInputType; import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode; import org.apache.hudi.keygen.BuiltinKeyGenerator; -import org.apache.hudi.integ.testsuite.generator.DeltaGenerator; -import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.schema.SchemaProvider; + import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaSparkContext; @@ -38,8 +39,7 @@ import org.apache.spark.sql.SparkSession; import java.util.Map; /** - * WriterContext wraps the delta writer/data generator related configuration needed - * to init/reinit. + * WriterContext wraps the delta writer/data generator related configuration needed to init/reinit. */ public class WriterContext { @@ -53,8 +53,9 @@ public class WriterContext { private BuiltinKeyGenerator keyGenerator; private transient SparkSession sparkSession; private transient JavaSparkContext jsc; + public WriterContext(JavaSparkContext jsc, TypedProperties props, HoodieTestSuiteConfig cfg, - BuiltinKeyGenerator keyGenerator, SparkSession sparkSession) { + BuiltinKeyGenerator keyGenerator, SparkSession sparkSession) { this.cfg = cfg; this.props = props; this.keyGenerator = keyGenerator; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java index df54b4c81..05ac242a5 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DagNode.java @@ -41,6 +41,17 @@ public abstract class DagNode implements Comparable> { protected Config config; private boolean isCompleted; + public DagNode clone() { + List> tempChildNodes = new ArrayList<>(); + for(DagNode dagNode: childNodes) { + tempChildNodes.add(dagNode.clone()); + } + this.childNodes = tempChildNodes; + this.result = null; + this.isCompleted = false; + return this; + } + public DagNode addChildNode(DagNode childNode) { childNode.getParentNodes().add(this); getChildNodes().add(childNode); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java new file mode 100644 index 000000000..c0671e8ab --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/DelayNode.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.dag.nodes; + +import org.apache.hudi.integ.testsuite.dag.ExecutionContext; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Delay Node to add delays between each group of test runs. + */ +public class DelayNode extends DagNode { + + private static Logger log = LoggerFactory.getLogger(ValidateDatasetNode.class); + private int delayMins; + + public DelayNode(int delayMins) { + this.delayMins = delayMins; + } + + @Override + public void execute(ExecutionContext context) throws Exception { + log.warn("Waiting for "+ delayMins+" mins before going for next test run"); + Thread.sleep(delayMins * 60 * 1000); + } +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java new file mode 100644 index 000000000..12fc52529 --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/nodes/ValidateDatasetNode.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.dag.nodes; + +import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; +import org.apache.hudi.integ.testsuite.dag.ExecutionContext; +import org.apache.hudi.integ.testsuite.schema.SchemaUtils; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.function.MapFunction; +import org.apache.spark.api.java.function.ReduceFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$; +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; +import org.apache.spark.sql.catalyst.expressions.Attribute; +import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.stream.Collectors; + +import scala.Tuple2; +import scala.collection.JavaConversions; +import scala.collection.JavaConverters; + +/** + * This nodes validates contents from input path are in tact with Hudi. This nodes uses spark datasource for comparison purposes. By default no configs are required for this node. But there is an + * optional config "delete_input_data" that you can set for this node. If set, once validation completes, contents from inputPath are deleted. This will come in handy for long running test suites. + * README has more details under docker set up for usages of this node. + */ +public class ValidateDatasetNode extends DagNode { + + private static Logger log = LoggerFactory.getLogger(ValidateDatasetNode.class); + + public ValidateDatasetNode(Config config) { + this.config = config; + } + + @Override + public void execute(ExecutionContext context) throws Exception { + + SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate(); + + // todo: Fix partitioning schemes. For now, assumes data based partitioning. + String inputPath = context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*"; + String hudiPath = context.getHoodieTestSuiteWriter().getCfg().targetBasePath + "/*/*/*"; + log.warn("ValidateDataset Node: Input path " + inputPath + ", hudi path " + hudiPath); + // listing batches to be validated + String inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath; + FileSystem fs = new Path(inputPathStr) + .getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration()); + FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr)); + for (FileStatus fileStatus : fileStatuses) { + log.debug("Listing all Micro batches to be validated :: " + fileStatus.getPath().toString()); + } + + String recordKeyField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()); + String partitionPathField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY()); + // todo: fix hard coded fields from configs. + // read input and resolve insert, updates, etc. + Dataset inputDf = session.read().format("avro").load(inputPath); + ExpressionEncoder encoder = getEncoder(inputDf.schema()); + Dataset inputSnapshotDf = inputDf.groupByKey( + (MapFunction) value -> partitionPathField + "+" + recordKeyField, Encoders.STRING()) + .reduceGroups((ReduceFunction) (v1, v2) -> { + int ts1 = v1.getAs(SchemaUtils.SOURCE_ORDERING_FIELD); + int ts2 = v2.getAs(SchemaUtils.SOURCE_ORDERING_FIELD); + if (ts1 > ts2) { + return v1; + } else { + return v2; + } + }) + .map((MapFunction, Row>) value -> value._2, encoder) + .filter("_hoodie_is_deleted is NULL"); + + // read from hudi and remove meta columns. + Dataset hudiDf = session.read().format("hudi").load(hudiPath); + Dataset trimmedDf = hudiDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD) + .drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD); + + Dataset intersectionDf = inputSnapshotDf.intersect(trimmedDf); + // the intersected df should be same as inputDf. if not, there is some mismatch. + if (inputSnapshotDf.except(intersectionDf).count() != 0) { + log.error("Data set validation failed. Total count in hudi " + trimmedDf.count() + ", input df count " + inputSnapshotDf.count()); + throw new AssertionError("Hudi contents does not match contents input data. "); + } + + String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY()); + String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY()); + log.warn("Validating hive table with db : " + database + " and table : " + tableName); + Dataset cowDf = session.sql("SELECT * FROM " + database + "." + tableName); + Dataset trimmedCowDf = cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD) + .drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD); + intersectionDf = inputSnapshotDf.intersect(trimmedDf); + // the intersected df should be same as inputDf. if not, there is some mismatch. + if (inputSnapshotDf.except(intersectionDf).count() != 0) { + log.error("Data set validation failed for COW hive table. Total count in hudi " + trimmedCowDf.count() + ", input df count " + inputSnapshotDf.count()); + throw new AssertionError("Hudi hive table contents does not match contents input data. "); + } + + // if delete input data is enabled, erase input data. + if (config.isDeleteInputData()) { + // clean up input data for current group of writes. + inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath; + fs = new Path(inputPathStr) + .getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration()); + fileStatuses = fs.listStatus(new Path(inputPathStr)); + for (FileStatus fileStatus : fileStatuses) { + log.debug("Micro batch to be deleted " + fileStatus.getPath().toString()); + fs.delete(fileStatus.getPath(), true); + } + } + } + + private ExpressionEncoder getEncoder(StructType schema) { + List attributes = JavaConversions.asJavaCollection(schema.toAttributes()).stream() + .map(Attribute::toAttribute).collect(Collectors.toList()); + return RowEncoder.apply(schema) + .resolveAndBind(JavaConverters.asScalaBufferConverter(attributes).asScala().toSeq(), + SimpleAnalyzer$.MODULE$); + } +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java index 5c70ea164..d4074bccc 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/dag/scheduler/DagScheduler.java @@ -23,8 +23,10 @@ import org.apache.hudi.integ.testsuite.dag.ExecutionContext; import org.apache.hudi.integ.testsuite.dag.WorkflowDag; import org.apache.hudi.integ.testsuite.dag.WriterContext; import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; +import org.apache.hudi.integ.testsuite.dag.nodes.DelayNode; import org.apache.hudi.metrics.Metrics; +import org.apache.spark.api.java.JavaSparkContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,9 +52,9 @@ public class DagScheduler { private WorkflowDag workflowDag; private ExecutionContext executionContext; - public DagScheduler(WorkflowDag workflowDag, WriterContext writerContext) { + public DagScheduler(WorkflowDag workflowDag, WriterContext writerContext, JavaSparkContext jsc) { this.workflowDag = workflowDag; - this.executionContext = new ExecutionContext(null, writerContext); + this.executionContext = new ExecutionContext(jsc, writerContext); } /** @@ -63,7 +65,7 @@ public class DagScheduler { public void schedule() throws Exception { ExecutorService service = Executors.newFixedThreadPool(2); try { - execute(service, workflowDag.getNodeList()); + execute(service, workflowDag); service.shutdown(); } finally { if (!service.isShutdown()) { @@ -77,33 +79,47 @@ public class DagScheduler { * Method to start executing the nodes in workflow DAGs. * * @param service ExecutorService - * @param nodes Nodes to be executed + * @param workflowDag instance of workflow dag that needs to be executed * @throws Exception will be thrown if ant error occurred */ - private void execute(ExecutorService service, List nodes) throws Exception { + private void execute(ExecutorService service, WorkflowDag workflowDag) throws Exception { // Nodes at the same level are executed in parallel - Queue queue = new PriorityQueue<>(nodes); log.info("Running workloads"); + List nodes = workflowDag.getNodeList(); + int curRound = 1; do { - List futures = new ArrayList<>(); - Set childNodes = new HashSet<>(); - while (queue.size() > 0) { - DagNode nodeToExecute = queue.poll(); - log.info("Node to execute in dag scheduler " + nodeToExecute.getConfig().toString()); - futures.add(service.submit(() -> executeNode(nodeToExecute))); - if (nodeToExecute.getChildNodes().size() > 0) { - childNodes.addAll(nodeToExecute.getChildNodes()); - } + log.warn("==================================================================="); + log.warn("Running workloads for round num " + curRound); + log.warn("==================================================================="); + Queue queue = new PriorityQueue<>(); + for (DagNode dagNode : nodes) { + queue.add(dagNode.clone()); } - queue.addAll(childNodes); - childNodes.clear(); - for (Future future : futures) { - future.get(1, TimeUnit.HOURS); + do { + List futures = new ArrayList<>(); + Set childNodes = new HashSet<>(); + while (queue.size() > 0) { + DagNode nodeToExecute = queue.poll(); + log.warn("Executing node \"" + nodeToExecute.getConfig().getOtherConfigs().get(CONFIG_NAME) + "\" :: " + nodeToExecute.getConfig()); + futures.add(service.submit(() -> executeNode(nodeToExecute))); + if (nodeToExecute.getChildNodes().size() > 0) { + childNodes.addAll(nodeToExecute.getChildNodes()); + } + } + queue.addAll(childNodes); + childNodes.clear(); + for (Future future : futures) { + future.get(1, TimeUnit.HOURS); + } + } while (queue.size() > 0); + log.info("Finished workloads for round num " + curRound); + if (curRound < workflowDag.getRounds()) { + new DelayNode(workflowDag.getIntermittentDelayMins()).execute(executionContext); } // After each level, report and flush the metrics Metrics.flush(); - } while (queue.size() > 0); + } while (curRound++ < workflowDag.getRounds()); log.info("Finished workloads"); } @@ -119,7 +135,6 @@ public class DagScheduler { try { int repeatCount = node.getConfig().getRepeatCount(); while (repeatCount > 0) { - log.warn("executing node: \"" + node.getConfig().getOtherConfigs().get(CONFIG_NAME) + "\" of type: " + node.getClass() + " :: " + node.getConfig().toString()); node.execute(executionContext); log.info("Finished executing {}", node.getName()); repeatCount--; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java index 53af8eb74..6242cbfc7 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/DeltaGenerator.java @@ -41,6 +41,10 @@ import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.hudi.integ.testsuite.converter.Converter; import org.apache.hudi.integ.testsuite.converter.DeleteConverter; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig; +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig; +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config; import org.apache.hudi.integ.testsuite.converter.UpdateConverter; import org.apache.hudi.integ.testsuite.reader.DFSAvroDeltaInputReader; import org.apache.hudi.integ.testsuite.reader.DFSHoodieDatasetInputReader; @@ -51,6 +55,7 @@ import org.apache.hudi.integ.testsuite.writer.DeltaWriterAdapter; import org.apache.hudi.integ.testsuite.writer.DeltaWriterFactory; import org.apache.hudi.keygen.BuiltinKeyGenerator; +import org.apache.avro.generic.GenericRecord; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; @@ -58,6 +63,17 @@ import org.apache.spark.storage.StorageLevel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.Serializable; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.StreamSupport; import scala.Tuple2; @@ -77,7 +93,7 @@ public class DeltaGenerator implements Serializable { private int batchId; public DeltaGenerator(DFSDeltaConfig deltaOutputConfig, JavaSparkContext jsc, SparkSession sparkSession, - String schemaStr, BuiltinKeyGenerator keyGenerator) { + String schemaStr, BuiltinKeyGenerator keyGenerator) { this.deltaOutputConfig = deltaOutputConfig; this.jsc = jsc; this.sparkSession = sparkSession; @@ -167,7 +183,6 @@ public class DeltaGenerator implements Serializable { log.info("Repartitioning records into " + numPartition + " partitions for updates"); adjustedRDD = adjustedRDD.repartition(numPartition); log.info("Repartitioning records done for updates"); - UpdateConverter converter = new UpdateConverter(schemaStr, config.getRecordSize(), partitionPathFieldNames, recordRowKeyFieldNames); JavaRDD updates = converter.convert(adjustedRDD); diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java index 256dfa49e..5477371a1 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/FlexibleSchemaRecordGenerationIterator.java @@ -20,6 +20,11 @@ package org.apache.hudi.integ.testsuite.generator; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.List; import java.util.HashSet; import java.util.Iterator; @@ -67,7 +72,7 @@ public class FlexibleSchemaRecordGenerationIterator implements Iterator partitionPathFieldNames) { GenericRecord result = new GenericData.Record(schema); for (Schema.Field f : schema.getFields()) { - if (isPartialLongField(f, partitionPathFieldNames)) { - // This is a long field used as partition field. Set it to seconds since epoch. - long value = TimeUnit.SECONDS.convert(partitionIndex, TimeUnit.DAYS); - result.put(f.name(), (long) value); + if (f.name().equals(DEFAULT_HOODIE_IS_DELETED_COL)) { + result.put(f.name(), false); } else { - result.put(f.name(), typeConvert(f)); + if (isPartialLongField(f, partitionPathFieldNames)) { + // This is a long field used as partition field. Set it to seconds since epoch. + long value = TimeUnit.SECONDS.convert(partitionIndex, TimeUnit.DAYS); + result.put(f.name(), (long) value); + } else { + result.put(f.name(), typeConvert(f)); + } } } return result; diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java index 51b1fd9ed..89cda658e 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/generator/UpdateGeneratorIterator.java @@ -18,16 +18,16 @@ package org.apache.hudi.integ.testsuite.generator; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + /** * A lazy update payload generator to generate {@link GenericRecord}s lazily. */ diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java index b7d71f583..94ff3a3ea 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/helpers/DFSTestSuitePathSelector.java @@ -18,20 +18,6 @@ package org.apache.hudi.integ.testsuite.helpers; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FsStatus; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ImmutablePair; @@ -40,13 +26,26 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob; import org.apache.hudi.utilities.sources.helpers.DFSPathSelector; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + /** * A custom dfs path selector used only for the hudi test suite. To be used only if workload is not run inline. */ public class DFSTestSuitePathSelector extends DFSPathSelector { + private static volatile Logger log = LoggerFactory.getLogger(HoodieTestSuiteJob.class); public DFSTestSuitePathSelector(TypedProperties props, Configuration hadoopConf) { @@ -67,6 +66,7 @@ public class DFSTestSuitePathSelector extends DFSPathSelector { lastBatchId = 0; nextBatchId = 1; } + // obtain all eligible files for the batch List eligibleFiles = new ArrayList<>(); FileStatus[] fileStatuses = fs.globStatus( @@ -87,7 +87,8 @@ public class DFSTestSuitePathSelector extends DFSPathSelector { if (!fileStatus.isDirectory() || IGNORE_FILEPREFIX_LIST.stream() .anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) { continue; - } else if (fileStatus.getPath().getName().compareTo(lastBatchId.toString()) > 0) { + } else if (Integer.parseInt(fileStatus.getPath().getName()) > lastBatchId && Integer.parseInt(fileStatus.getPath() + .getName()) <= nextBatchId) { RemoteIterator files = fs.listFiles(fileStatus.getPath(), true); while (files.hasNext()) { eligibleFiles.add(files.next()); @@ -95,7 +96,6 @@ public class DFSTestSuitePathSelector extends DFSPathSelector { } } - log.info("Reading " + eligibleFiles.size() + " files. "); // no data to readAvro if (eligibleFiles.size() == 0) { return new ImmutablePair<>(Option.empty(), diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java index bc7803d9d..43d5fdefa 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java @@ -18,26 +18,6 @@ package org.apache.hudi.integ.testsuite.reader; -import static java.util.Map.Entry.comparingByValue; -import static java.util.stream.Collectors.toMap; - -import java.io.IOException; -import java.io.UncheckedIOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; @@ -51,6 +31,12 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetReaderIterator; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieMemoryConfig; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.avro.AvroReadSupport; import org.apache.spark.api.java.JavaPairRDD; @@ -59,11 +45,27 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + import scala.Tuple2; +import static java.util.Map.Entry.comparingByValue; +import static java.util.stream.Collectors.toMap; + /** - * This class helps to generate updates from an already existing hoodie dataset. It supports generating updates in - * across partitions, files and records. + * This class helps to generate updates from an already existing hoodie dataset. It supports generating updates in across partitions, files and records. */ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { @@ -148,16 +150,22 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { long recordsInSingleFile = iteratorSize(readParquetOrLogFiles(getSingleSliceFromRDD(partitionToFileSlice))); int numFilesToUpdate; long numRecordsToUpdatePerFile; - if (!numFiles.isPresent() || numFiles.get() == 0) { + if (!numFiles.isPresent() || numFiles.get() <= 0) { // If num files are not passed, find the number of files to update based on total records to update and records // per file - numFilesToUpdate = (int)Math.ceil((double)numRecordsToUpdate.get() / recordsInSingleFile); - // recordsInSingleFile is not average so we still need to account for bias is records distribution - // in the files. Limit to the maximum number of files available. - int totalExistingFilesCount = partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get(); - numFilesToUpdate = Math.min(numFilesToUpdate, totalExistingFilesCount); - log.info("Files to update {}", numFilesToUpdate); - numRecordsToUpdatePerFile = recordsInSingleFile; + numFilesToUpdate = (int) Math.floor((double) numRecordsToUpdate.get() / recordsInSingleFile); + if (numFilesToUpdate > 0) { + // recordsInSingleFile is not average so we still need to account for bias is records distribution + // in the files. Limit to the maximum number of files available. + int totalExistingFilesCount = partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get(); + numFilesToUpdate = Math.min(numFilesToUpdate, totalExistingFilesCount); + log.info("Files to update {}, records to update per file {}", numFilesToUpdate, recordsInSingleFile); + numRecordsToUpdatePerFile = recordsInSingleFile; + } else { + numFilesToUpdate = 1; + numRecordsToUpdatePerFile = numRecordsToUpdate.get(); + log.info("Total records passed in < records in single file. Hence setting numFilesToUpdate to 1 and numRecordsToUpdate to {} ", numRecordsToUpdatePerFile); + } } else { // If num files is passed, find the number of records per file based on either percentage or total records to // update and num files passed @@ -171,6 +179,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { partitionPaths.size(), numFilesToUpdate, partitionToFileIdCountMap); JavaRDD updates = projectSchema(generateUpdates(adjustedPartitionToFileIdCountMap, partitionToFileSlice, numFilesToUpdate, (int) numRecordsToUpdatePerFile)); + if (numRecordsToUpdate.isPresent() && numFiles.isPresent() && numFiles.get() != 0 && numRecordsToUpdate.get() != numRecordsToUpdatePerFile * numFiles.get()) { long remainingRecordsToAdd = (numRecordsToUpdate.get() - (numRecordsToUpdatePerFile * numFiles.get())); @@ -215,7 +224,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { LinkedHashMap::new)); // Limit files to be read per partition - int numFilesPerPartition = (int) Math.ceil((double)numFiles / numPartitions); + int numFilesPerPartition = (int) Math.ceil((double) numFiles / numPartitions); Map adjustedPartitionToFileIdCountMap = new HashMap<>(); partitionToFileIdCountSortedMap.entrySet().stream().forEach(e -> { if (e.getValue() <= numFilesPerPartition) { @@ -283,9 +292,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { } /** - * Returns the number of elements remaining in {@code iterator}. The iterator - * will be left exhausted: its {@code hasNext()} method will return - * {@code false}. + * Returns the number of elements remaining in {@code iterator}. The iterator will be left exhausted: its {@code hasNext()} method will return {@code false}. */ private static int iteratorSize(Iterator iterator) { int count = 0; @@ -297,11 +304,8 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { } /** - * Creates an iterator returning the first {@code limitSize} elements of the - * given iterator. If the original iterator does not contain that many - * elements, the returned iterator will have the same behavior as the original - * iterator. The returned iterator supports {@code remove()} if the original - * iterator does. + * Creates an iterator returning the first {@code limitSize} elements of the given iterator. If the original iterator does not contain that many elements, the returned iterator will have the same + * behavior as the original iterator. The returned iterator supports {@code remove()} if the original iterator does. * * @param iterator the iterator to limit * @param limitSize the maximum number of elements in the returned iterator diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/schema/SchemaUtils.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/schema/SchemaUtils.java new file mode 100644 index 000000000..2de945286 --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/schema/SchemaUtils.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.schema; + +public class SchemaUtils { + + public static final String SOURCE_ORDERING_FIELD = "test_suite_source_ordering_field"; + +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/schema/TestSuiteFileBasedSchemaProvider.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/schema/TestSuiteFileBasedSchemaProvider.java new file mode 100644 index 000000000..e67c5afae --- /dev/null +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/schema/TestSuiteFileBasedSchemaProvider.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.schema; + +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.integ.testsuite.dag.WriterContext; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaSparkContext; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Appends source ordering field to both source and target schemas. This is required to assist in validation to differentiate records written in different batches. + */ +public class TestSuiteFileBasedSchemaProvider extends FilebasedSchemaProvider { + + protected static Logger log = LogManager.getLogger(WriterContext.class); + + public TestSuiteFileBasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) { + super(props, jssc); + this.sourceSchema = addSourceOrderingFieldToSchema(sourceSchema); + this.targetSchema = addSourceOrderingFieldToSchema(targetSchema); + } + + private Schema addSourceOrderingFieldToSchema(Schema schema) { + List fields = new ArrayList<>(); + for (Schema.Field field : schema.getFields()) { + Schema.Field newField = new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()); + for (Map.Entry prop : field.getObjectProps().entrySet()) { + newField.addProp(prop.getKey(), prop.getValue()); + } + fields.add(newField); + } + Schema.Field sourceOrderingField = + new Schema.Field(SchemaUtils.SOURCE_ORDERING_FIELD, Schema.create(Type.INT), "", 0); + fields.add(sourceOrderingField); + Schema mergedSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false); + mergedSchema.setFields(fields); + return mergedSchema; + } + +} diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DFSDeltaWriterAdapter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DFSDeltaWriterAdapter.java index 65e4ee13c..4bd096ae0 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DFSDeltaWriterAdapter.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DFSDeltaWriterAdapter.java @@ -18,6 +18,8 @@ package org.apache.hudi.integ.testsuite.writer; +import org.apache.hudi.integ.testsuite.schema.SchemaUtils; + import org.apache.avro.generic.GenericRecord; import java.io.IOException; @@ -30,22 +32,29 @@ import java.util.List; */ public class DFSDeltaWriterAdapter implements DeltaWriterAdapter { - private DeltaInputWriter deltaInputGenerator; + private DeltaInputWriter deltaInputWriter; private List metrics = new ArrayList<>(); + private int preCombineFieldVal = 0; - public DFSDeltaWriterAdapter(DeltaInputWriter deltaInputGenerator) { - this.deltaInputGenerator = deltaInputGenerator; + public DFSDeltaWriterAdapter(DeltaInputWriter deltaInputWriter, int preCombineFieldVal) { + this.deltaInputWriter = deltaInputWriter; + this.preCombineFieldVal = preCombineFieldVal; + } + + public DFSDeltaWriterAdapter(DeltaInputWriter deltaInputWriter) { + this.deltaInputWriter = deltaInputWriter; } @Override public List write(Iterator input) throws IOException { while (input.hasNext()) { GenericRecord next = input.next(); - if (this.deltaInputGenerator.canWrite()) { - this.deltaInputGenerator.writeData(next); - } else if (input.hasNext()) { + next.put(SchemaUtils.SOURCE_ORDERING_FIELD, preCombineFieldVal); + if (this.deltaInputWriter.canWrite()) { + this.deltaInputWriter.writeData(next); + } else { rollOver(); - this.deltaInputGenerator.writeData(next); + this.deltaInputWriter.writeData(next); } } close(); @@ -54,11 +63,11 @@ public class DFSDeltaWriterAdapter implements DeltaWriterAdapter public void rollOver() throws IOException { close(); - this.deltaInputGenerator = this.deltaInputGenerator.getNewWriter(); + this.deltaInputWriter = this.deltaInputWriter.getNewWriter(); } private void close() throws IOException { - this.deltaInputGenerator.close(); - this.metrics.add(this.deltaInputGenerator.getDeltaWriteStats()); + this.deltaInputWriter.close(); + this.metrics.add(this.deltaInputWriter.getDeltaWriteStats()); } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DeltaWriterFactory.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DeltaWriterFactory.java index b4d9b9f89..a00e8e15d 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DeltaWriterFactory.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/writer/DeltaWriterFactory.java @@ -18,16 +18,17 @@ package org.apache.hudi.integ.testsuite.writer; -import java.io.IOException; -import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig; import org.apache.hudi.integ.testsuite.configuration.DeltaConfig; import org.apache.hudi.integ.testsuite.reader.DeltaInputType; +import org.apache.avro.generic.GenericRecord; + +import java.io.IOException; + /** - * A factory to help instantiate different {@link DeltaWriterAdapter}s depending on the {@link DeltaOutputMode} and - * {@link DeltaInputType}. + * A factory to help instantiate different {@link DeltaWriterAdapter}s depending on the {@link DeltaOutputMode} and {@link DeltaInputType}. */ public class DeltaWriterFactory { @@ -44,9 +45,9 @@ public class DeltaWriterFactory { DeltaInputWriter fileDeltaInputGenerator = new AvroFileDeltaInputWriter( dfsDeltaConfig.getConfiguration(), StringUtils - .join(new String[]{dfsDeltaConfig.getDeltaBasePath(), dfsDeltaConfig.getBatchId().toString()}, + .join(new String[] {dfsDeltaConfig.getDeltaBasePath(), dfsDeltaConfig.getBatchId().toString()}, "/"), dfsDeltaConfig.getSchemaStr(), dfsDeltaConfig.getMaxFileSize()); - return new DFSDeltaWriterAdapter(fileDeltaInputGenerator); + return new DFSDeltaWriterAdapter(fileDeltaInputGenerator, batchId); default: throw new IllegalArgumentException("Invalid delta input format " + config.getDeltaInputType()); } diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/configuration/TestWorkflowBuilder.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/configuration/TestWorkflowBuilder.java index 1e5ca6886..82350999e 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/configuration/TestWorkflowBuilder.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/configuration/TestWorkflowBuilder.java @@ -18,17 +18,18 @@ package org.apache.hudi.integ.testsuite.configuration; -import static junit.framework.Assert.assertTrue; -import static junit.framework.TestCase.assertEquals; +import org.apache.hudi.integ.testsuite.dag.WorkflowDag; +import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; +import org.apache.hudi.integ.testsuite.dag.nodes.InsertNode; +import org.apache.hudi.integ.testsuite.dag.nodes.UpsertNode; + +import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.List; -import org.apache.hudi.integ.testsuite.dag.nodes.DagNode; -import org.apache.hudi.integ.testsuite.dag.nodes.InsertNode; -import org.apache.hudi.integ.testsuite.dag.nodes.UpsertNode; -import org.apache.hudi.integ.testsuite.dag.WorkflowDag; -import org.junit.jupiter.api.Test; +import static junit.framework.Assert.assertTrue; +import static junit.framework.TestCase.assertEquals; /** * Unit test for the build process of {@link DagNode} and {@link WorkflowDag}. diff --git a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/dag/TestDagUtils.java b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/dag/TestDagUtils.java index d94174471..70e6da7d3 100644 --- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/dag/TestDagUtils.java +++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/testsuite/dag/TestDagUtils.java @@ -47,6 +47,9 @@ public class TestDagUtils { public void testConvertYamlToDag() throws Exception { WorkflowDag dag = DagUtils.convertYamlToDag(UtilitiesTestBase.Helpers .readFileFromAbsolutePath((System.getProperty("user.dir") + "/.." + COW_DAG_DOCKER_DEMO_RELATIVE_PATH))); + assertEquals(dag.getDagName(), "unit-test-cow-dag"); + assertEquals(dag.getRounds(), 1); + assertEquals(dag.getIntermittentDelayMins(), 10); assertEquals(dag.getNodeList().size(), 1); Assertions.assertEquals(((DagNode) dag.getNodeList().get(0)).getParentNodes().size(), 0); assertEquals(((DagNode) dag.getNodeList().get(0)).getChildNodes().size(), 1); diff --git a/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml b/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml index 96a6c825a..23691659c 100644 --- a/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml +++ b/hudi-integ-test/src/test/resources/unit-test-cow-dag.yaml @@ -13,58 +13,62 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -first_insert: - config: - record_size: 70000 - num_partitions_insert: 1 - repeat_count: 2 - num_records_insert: 100 - type: InsertNode - deps: none -second_insert: - config: - record_size: 70000 - num_partitions_insert: 1 - repeat_count: 1 - num_records_insert: 100 - type: InsertNode - deps: first_insert -first_rollback: - config: - deps: second_insert - type: RollbackNode -third_insert: - config: - record_size: 70000 - num_partitions_insert: 1 - repeat_count: 1 - num_records_insert: 100 - type: InsertNode - deps: first_rollback -first_upsert: - config: - record_size: 70000 - num_partitions_upsert: 1 - repeat_count: 1 - num_records_upsert: 100 - type: UpsertNode - deps: third_insert -first_hive_sync: - config: - queue_name: "adhoc" - engine: "mr" - type: HiveSyncNode - deps: first_upsert -first_hive_query: - config: - hive_props: - prop2: "set spark.yarn.queue=" - prop3: "set hive.strict.checks.large.query=false" - prop4: "set hive.stats.autogather=false" - hive_queries: - query1: "select count(*) from testdb1.table1" - result1: 300 - query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1" - result2: 0 - type: HiveQueryNode - deps: first_hive_sync \ No newline at end of file +dag_name: unit-test-cow-dag +dag_rounds: 1 +dag_intermittent_delay_mins: 10 +dag_content: + first_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 2 + num_records_insert: 100 + type: InsertNode + deps: none + second_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 100 + type: InsertNode + deps: first_insert + first_rollback: + config: + deps: second_insert + type: RollbackNode + third_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 100 + type: InsertNode + deps: first_rollback + first_upsert: + config: + record_size: 70000 + num_partitions_upsert: 1 + repeat_count: 1 + num_records_upsert: 100 + type: UpsertNode + deps: third_insert + first_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_upsert + first_hive_query: + config: + hive_props: + prop2: "set spark.yarn.queue=" + prop3: "set hive.strict.checks.large.query=false" + prop4: "set hive.stats.autogather=false" + hive_queries: + query1: "select count(*) from testdb1.table1" + result1: 300 + query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1" + result2: 0 + type: HiveQueryNode + deps: first_hive_sync \ No newline at end of file diff --git a/hudi-integ-test/src/test/resources/unit-test-mor-dag.yaml b/hudi-integ-test/src/test/resources/unit-test-mor-dag.yaml index 96a6c825a..2ba42455d 100644 --- a/hudi-integ-test/src/test/resources/unit-test-mor-dag.yaml +++ b/hudi-integ-test/src/test/resources/unit-test-mor-dag.yaml @@ -13,58 +13,62 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -first_insert: - config: - record_size: 70000 - num_partitions_insert: 1 - repeat_count: 2 - num_records_insert: 100 - type: InsertNode - deps: none -second_insert: - config: - record_size: 70000 - num_partitions_insert: 1 - repeat_count: 1 - num_records_insert: 100 - type: InsertNode - deps: first_insert -first_rollback: - config: - deps: second_insert - type: RollbackNode -third_insert: - config: - record_size: 70000 - num_partitions_insert: 1 - repeat_count: 1 - num_records_insert: 100 - type: InsertNode - deps: first_rollback -first_upsert: - config: - record_size: 70000 - num_partitions_upsert: 1 - repeat_count: 1 - num_records_upsert: 100 - type: UpsertNode - deps: third_insert -first_hive_sync: - config: - queue_name: "adhoc" - engine: "mr" - type: HiveSyncNode - deps: first_upsert -first_hive_query: - config: - hive_props: - prop2: "set spark.yarn.queue=" - prop3: "set hive.strict.checks.large.query=false" - prop4: "set hive.stats.autogather=false" - hive_queries: - query1: "select count(*) from testdb1.table1" - result1: 300 - query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1" - result2: 0 - type: HiveQueryNode - deps: first_hive_sync \ No newline at end of file +dag_name: unit-test-mor-dag +dag_rounds: 1 +dag_intermittent_delay_mins: 10 +dag_content: + first_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 2 + num_records_insert: 100 + type: InsertNode + deps: none + second_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 100 + type: InsertNode + deps: first_insert + first_rollback: + config: + deps: second_insert + type: RollbackNode + third_insert: + config: + record_size: 70000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 100 + type: InsertNode + deps: first_rollback + first_upsert: + config: + record_size: 70000 + num_partitions_upsert: 1 + repeat_count: 1 + num_records_upsert: 100 + type: UpsertNode + deps: third_insert + first_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_upsert + first_hive_query: + config: + hive_props: + prop2: "set spark.yarn.queue=" + prop3: "set hive.strict.checks.large.query=false" + prop4: "set hive.stats.autogather=false" + hive_queries: + query1: "select count(*) from testdb1.table1" + result1: 300 + query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1" + result2: 0 + type: HiveQueryNode + deps: first_hive_sync \ No newline at end of file diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java index 43f2ff27d..7542755b2 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/FilebasedSchemaProvider.java @@ -46,9 +46,9 @@ public class FilebasedSchemaProvider extends SchemaProvider { private final FileSystem fs; - private final Schema sourceSchema; + protected Schema sourceSchema; - private Schema targetSchema; + protected Schema targetSchema; public FilebasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) { super(props, jssc);