[HUDI-1331] Adding support for validating entire dataset and long running tests in test suite framework (#2168)
* trigger rebuild * [HUDI-1156] Remove unused dependencies from HoodieDeltaStreamerWrapper Class (#1927) * Adding support for validating records and long running tests in test sutie framework * Adding partial validate node * Fixing spark session initiation in Validate nodes * Fixing validation * Adding hive table validation to ValidateDatasetNode * Rebasing with latest commits from master * Addressing feedback * Addressing comments Co-authored-by: lamber-ken <lamberken@163.com> Co-authored-by: linshan-ma <mabin194046@163.com>
This commit is contained in:
committed by
GitHub
parent
3ec9270e8e
commit
8cf6a7223f
@@ -13,122 +13,56 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
first_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_insert_partitions: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 1000
|
||||
type: InsertNode
|
||||
deps: none
|
||||
second_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_insert_partitions: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 10000
|
||||
deps: first_insert
|
||||
type: InsertNode
|
||||
third_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_insert_partitions: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 300
|
||||
deps: second_insert
|
||||
type: InsertNode
|
||||
first_rollback:
|
||||
config:
|
||||
deps: third_insert
|
||||
type: RollbackNode
|
||||
first_upsert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_insert_partitions: 1
|
||||
num_records_insert: 300
|
||||
repeat_count: 1
|
||||
num_records_upsert: 100
|
||||
num_upsert_partitions: 10
|
||||
type: UpsertNode
|
||||
deps: first_rollback
|
||||
first_hive_sync:
|
||||
config:
|
||||
queue_name: "adhoc"
|
||||
engine: "mr"
|
||||
type: HiveSyncNode
|
||||
deps: first_upsert
|
||||
first_hive_query:
|
||||
config:
|
||||
queue_name: "adhoc"
|
||||
engine: "mr"
|
||||
hive_queries:
|
||||
query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1"
|
||||
result1: 0
|
||||
query2: "select count(*) from testdb.table1"
|
||||
result2: 11300
|
||||
type: HiveQueryNode
|
||||
deps: first_hive_sync
|
||||
second_upsert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_insert_partitions: 1
|
||||
num_records_insert: 300
|
||||
repeat_count: 1
|
||||
num_records_upsert: 100
|
||||
num_upsert_partitions: 10
|
||||
type: UpsertNode
|
||||
deps: first_hive_query
|
||||
second_hive_query:
|
||||
config:
|
||||
queue_name: "adhoc"
|
||||
engine: "mr"
|
||||
hive_queries:
|
||||
query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1"
|
||||
result1: 0
|
||||
query2: "select count(*) from testdb.table1"
|
||||
result2: 11600
|
||||
type: HiveQueryNode
|
||||
deps: second_upsert
|
||||
fourth_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_insert_partitions: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 1000
|
||||
deps: second_hive_query
|
||||
type: InsertNode
|
||||
third_hive_query:
|
||||
config:
|
||||
queue_name: "adhoc"
|
||||
engine: "mr"
|
||||
hive_queries:
|
||||
query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1"
|
||||
result1: 0
|
||||
query2: "select count(*) from testdb.table1"
|
||||
result2: 12600
|
||||
type: HiveQueryNode
|
||||
deps: fourth_insert
|
||||
first_delete:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_delete: 1
|
||||
num_records_delete: 200
|
||||
deps: third_hive_query
|
||||
type: DeleteNode
|
||||
fourth_hive_sync:
|
||||
config:
|
||||
queue_name: "adhoc"
|
||||
engine: "mr"
|
||||
type: HiveSyncNode
|
||||
deps: first_delete
|
||||
fourth_hive_query:
|
||||
config:
|
||||
queue_name: "adhoc"
|
||||
engine: "mr"
|
||||
hive_queries:
|
||||
query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1"
|
||||
result1: 0
|
||||
query2: "select count(*) from testdb.table1"
|
||||
result2: 12400
|
||||
type: HiveQueryNode
|
||||
deps: fourth_hive_sync
|
||||
dag_name: cow-long-running-example.yaml
|
||||
dag_rounds: 2
|
||||
dag_intermittent_delay_mins: 1
|
||||
dag_content:
|
||||
first_insert:
|
||||
config:
|
||||
record_size: 100
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 1000
|
||||
type: InsertNode
|
||||
deps: none
|
||||
second_insert:
|
||||
config:
|
||||
record_size: 100
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 10000
|
||||
deps: first_insert
|
||||
type: InsertNode
|
||||
third_insert:
|
||||
config:
|
||||
record_size: 100
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 300
|
||||
deps: second_insert
|
||||
type: InsertNode
|
||||
first_validate:
|
||||
config:
|
||||
type: ValidateDatasetNode
|
||||
deps: third_insert
|
||||
first_upsert:
|
||||
config:
|
||||
record_size: 100
|
||||
num_partitions_insert: 1
|
||||
num_records_insert: 300
|
||||
repeat_count: 1
|
||||
num_records_upsert: 100
|
||||
num_partitions_upsert: 1
|
||||
type: UpsertNode
|
||||
deps: first_validate
|
||||
first_delete:
|
||||
config:
|
||||
num_partitions_delete: 1
|
||||
num_records_delete: 2000
|
||||
type: DeleteNode
|
||||
deps: first_upsert
|
||||
second_validate:
|
||||
config:
|
||||
delete_input_data: true
|
||||
type: ValidateDatasetNode
|
||||
deps: first_delete
|
||||
@@ -13,103 +13,107 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
first_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_insert_partitions: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 100
|
||||
type: InsertNode
|
||||
deps: none
|
||||
second_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_insert_partitions: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 100
|
||||
deps: first_insert
|
||||
type: InsertNode
|
||||
third_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_insert_partitions: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 300
|
||||
deps: second_insert
|
||||
type: InsertNode
|
||||
first_rollback:
|
||||
config:
|
||||
deps: third_insert
|
||||
type: RollbackNode
|
||||
first_upsert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_insert_partitions: 1
|
||||
num_records_insert: 300
|
||||
repeat_count: 1
|
||||
num_records_upsert: 100
|
||||
num_upsert_partitions: 10
|
||||
type: UpsertNode
|
||||
deps: first_rollback
|
||||
first_hive_sync:
|
||||
config:
|
||||
queue_name: "adhoc"
|
||||
engine: "mr"
|
||||
type: HiveSyncNode
|
||||
deps: first_upsert
|
||||
first_hive_query:
|
||||
config:
|
||||
queue_name: "adhoc"
|
||||
engine: "mr"
|
||||
type: HiveQueryNode
|
||||
deps: first_hive_sync
|
||||
second_upsert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_insert_partitions: 1
|
||||
num_records_insert: 300
|
||||
repeat_count: 1
|
||||
num_records_upsert: 100
|
||||
num_upsert_partitions: 10
|
||||
type: UpsertNode
|
||||
deps: first_hive_query
|
||||
second_hive_query:
|
||||
config:
|
||||
queue_name: "adhoc"
|
||||
engine: "mr"
|
||||
hive_queries:
|
||||
query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1"
|
||||
result1: 0
|
||||
query2: "select count(*) from testdb.table1"
|
||||
result2: 1100
|
||||
type: HiveQueryNode
|
||||
deps: second_upsert
|
||||
first_schedule_compact:
|
||||
config:
|
||||
type: ScheduleCompactNode
|
||||
deps: second_hive_query
|
||||
third_upsert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_insert_partitions: 1
|
||||
num_records_insert: 300
|
||||
repeat_count: 1
|
||||
num_records_upsert: 100
|
||||
num_upsert_partitions: 10
|
||||
type: UpsertNode
|
||||
deps: first_schedule_compact
|
||||
first_compact:
|
||||
config:
|
||||
type: CompactNode
|
||||
deps: first_schedule_compact
|
||||
third_hive_query:
|
||||
config:
|
||||
queue_name: "adhoc"
|
||||
engine: "mr"
|
||||
hive_queries:
|
||||
query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1"
|
||||
result1: 0
|
||||
query2: "select count(*) from testdb.table1"
|
||||
result2: 1400
|
||||
type: HiveQueryNode
|
||||
deps: first_compact
|
||||
dag_name: complex-dag-mor.yaml
|
||||
dag_rounds: 1
|
||||
dag_intermittent_delay_mins: 10
|
||||
dag_content:
|
||||
first_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 5
|
||||
num_records_insert: 100
|
||||
type: InsertNode
|
||||
deps: none
|
||||
second_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 5
|
||||
num_records_insert: 100
|
||||
deps: first_insert
|
||||
type: InsertNode
|
||||
third_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 2
|
||||
num_records_insert: 300
|
||||
deps: second_insert
|
||||
type: InsertNode
|
||||
first_rollback:
|
||||
config:
|
||||
deps: third_insert
|
||||
type: RollbackNode
|
||||
first_upsert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_insert: 1
|
||||
num_records_insert: 300
|
||||
repeat_count: 1
|
||||
num_records_upsert: 100
|
||||
num_partitions_upsert: 10
|
||||
type: UpsertNode
|
||||
deps: first_rollback
|
||||
first_hive_sync:
|
||||
config:
|
||||
queue_name: "adhoc"
|
||||
engine: "mr"
|
||||
type: HiveSyncNode
|
||||
deps: first_upsert
|
||||
first_hive_query:
|
||||
config:
|
||||
queue_name: "adhoc"
|
||||
engine: "mr"
|
||||
type: HiveQueryNode
|
||||
deps: first_hive_sync
|
||||
second_upsert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_insert: 1
|
||||
num_records_insert: 300
|
||||
repeat_count: 1
|
||||
num_records_upsert: 100
|
||||
num_partitions_upsert: 10
|
||||
type: UpsertNode
|
||||
deps: first_hive_query
|
||||
second_hive_query:
|
||||
config:
|
||||
queue_name: "adhoc"
|
||||
engine: "mr"
|
||||
hive_queries:
|
||||
query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1"
|
||||
result1: 0
|
||||
query2: "select count(*) from testdb.table1"
|
||||
result2: 1100
|
||||
type: HiveQueryNode
|
||||
deps: second_upsert
|
||||
first_schedule_compact:
|
||||
config:
|
||||
type: ScheduleCompactNode
|
||||
deps: second_hive_query
|
||||
third_upsert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_insert: 1
|
||||
num_records_insert: 300
|
||||
repeat_count: 1
|
||||
num_records_upsert: 100
|
||||
num_partitions_upsert: 10
|
||||
type: UpsertNode
|
||||
deps: first_schedule_compact
|
||||
first_compact:
|
||||
config:
|
||||
type: CompactNode
|
||||
deps: first_schedule_compact
|
||||
third_hive_query:
|
||||
config:
|
||||
queue_name: "adhoc"
|
||||
engine: "mr"
|
||||
hive_queries:
|
||||
query1: "select count(*) from testdb.table1 group by `_row_key` having count(*) > 1"
|
||||
result1: 0
|
||||
query2: "select count(*) from testdb.table1"
|
||||
result2: 1400
|
||||
type: HiveQueryNode
|
||||
deps: first_compact
|
||||
|
||||
68
docker/demo/config/test-suite/cow-long-running-example.yaml
Normal file
68
docker/demo/config/test-suite/cow-long-running-example.yaml
Normal file
@@ -0,0 +1,68 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
dag_name: cow-long-running-example.yaml
|
||||
dag_rounds: 20
|
||||
dag_intermittent_delay_mins: 10
|
||||
dag_content:
|
||||
first_insert:
|
||||
config:
|
||||
record_size: 100
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 1000
|
||||
type: InsertNode
|
||||
deps: none
|
||||
second_insert:
|
||||
config:
|
||||
record_size: 100
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 10000
|
||||
deps: first_insert
|
||||
type: InsertNode
|
||||
third_insert:
|
||||
config:
|
||||
record_size: 100
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 300
|
||||
deps: second_insert
|
||||
type: InsertNode
|
||||
first_validate:
|
||||
config:
|
||||
type: ValidateDatasetNode
|
||||
deps: third_insert
|
||||
first_upsert:
|
||||
config:
|
||||
record_size: 100
|
||||
num_partitions_insert: 1
|
||||
num_records_insert: 300
|
||||
repeat_count: 1
|
||||
num_records_upsert: 100
|
||||
num_partitions_upsert: 1
|
||||
type: UpsertNode
|
||||
deps: first_validate
|
||||
first_delete:
|
||||
config:
|
||||
num_partitions_delete: 1
|
||||
num_records_delete: 2000
|
||||
type: DeleteNode
|
||||
deps: first_upsert
|
||||
second_validate:
|
||||
config:
|
||||
delete_input_data: true
|
||||
type: ValidateDatasetNode
|
||||
deps: first_delete
|
||||
@@ -1,37 +0,0 @@
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# write configs
|
||||
hoodie.datasource.write.recordkey.field=_row_key
|
||||
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
|
||||
hoodie.datasource.write.partitionpath.field=timestamp
|
||||
|
||||
|
||||
# deltastreamer configs
|
||||
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
|
||||
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
|
||||
hoodie.deltastreamer.source.dfs.root=/user/hive/warehouse/hudi-bench/input
|
||||
hoodie.deltastreamer.schemaprovider.source.schema.file=/var/hoodie/ws/docker/demo/config/test-suite/source.avsc
|
||||
hoodie.deltastreamer.schemaprovider.target.schema.file=/var/hoodie/ws/docker/demo/config/bench/source.avsc
|
||||
|
||||
#hive sync
|
||||
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/
|
||||
hoodie.datasource.hive_sync.database=testdb
|
||||
hoodie.datasource.hive_sync.table=table1
|
||||
hoodie.datasource.hive_sync.use_jdbc=false
|
||||
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor
|
||||
hoodie.datasource.hive_sync.assume_date_partitioning=true
|
||||
hoodie.datasource.hive_sync.use_pre_apache_input_format=true
|
||||
@@ -1,3 +1,4 @@
|
||||
|
||||
hoodie.insert.shuffle.parallelism=100
|
||||
hoodie.upsert.shuffle.parallelism=100
|
||||
hoodie.bulkinsert.shuffle.parallelism=100
|
||||
@@ -8,6 +9,13 @@ hoodie.deltastreamer.source.test.max_unique_records=100000000
|
||||
hoodie.embed.timeline.server=false
|
||||
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
|
||||
|
||||
hoodie.insert.shuffle.parallelism=100
|
||||
hoodie.upsert.shuffle.parallelism=100
|
||||
hoodie.bulkinsert.shuffle.parallelism=100
|
||||
|
||||
hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector
|
||||
hoodie.datasource.hive_sync.skip_ro_suffix=true
|
||||
|
||||
hoodie.datasource.write.recordkey.field=_row_key
|
||||
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
|
||||
hoodie.datasource.write.partitionpath.field=timestamp
|
||||
|
||||
@@ -142,7 +142,9 @@ Start the Hudi Docker demo:
|
||||
docker/setup_demo.sh
|
||||
```
|
||||
|
||||
NOTE: We need to make a couple of environment changes for Hive 2.x support. This will be fixed once Hudi moves to Spark 3.x
|
||||
NOTE: We need to make a couple of environment changes for Hive 2.x support. This will be fixed once Hudi moves to Spark 3.x.
|
||||
Execute below if you are using Hudi query node in your dag. If not, below section is not required.
|
||||
Also, for longer running tests, go to next section.
|
||||
|
||||
```
|
||||
docker exec -it adhoc-2 bash
|
||||
@@ -214,7 +216,7 @@ spark-submit \
|
||||
--conf spark.sql.catalogImplementation=hive \
|
||||
--class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob \
|
||||
/opt/hudi-integ-test-bundle-0.6.1-SNAPSHOT.jar \
|
||||
--source-ordering-field timestamp \
|
||||
--source-ordering-field test_suite_source_ordering_field \
|
||||
--use-deltastreamer \
|
||||
--target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \
|
||||
--input-base-path /user/hive/warehouse/hudi-integ-test-suite/input \
|
||||
@@ -253,7 +255,7 @@ spark-submit \
|
||||
--conf spark.sql.catalogImplementation=hive \
|
||||
--class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob \
|
||||
/opt/hudi-integ-test-bundle-0.6.1-SNAPSHOT.jar \
|
||||
--source-ordering-field timestamp \
|
||||
--source-ordering-field test_suite_source_ordering_field \
|
||||
--use-deltastreamer \
|
||||
--target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \
|
||||
--input-base-path /user/hive/warehouse/hudi-integ-test-suite/input \
|
||||
@@ -267,3 +269,182 @@ spark-submit \
|
||||
--table-type MERGE_ON_READ \
|
||||
--compact-scheduling-minshare 1
|
||||
```
|
||||
|
||||
For long running test suite, validation has to be done differently. Idea is to run same dag in a repeated manner.
|
||||
Hence "ValidateDatasetNode" is introduced which will read entire input data and compare it with hudi contents both via
|
||||
spark datasource and hive table via spark sql engine.
|
||||
|
||||
If you have "ValidateDatasetNode" in your dag, do not replace hive jars as instructed above. Spark sql engine does not
|
||||
go well w/ hive2* jars. So, after running docker setup, just copy test.properties and your dag of interest and you are
|
||||
good to go ahead.
|
||||
|
||||
For repeated runs, two additional configs need to be set. "dag_rounds" and "dag_intermittent_delay_mins".
|
||||
This means that your dag will be repeated for N times w/ a delay of Y mins between each round.
|
||||
|
||||
Also, ValidateDatasetNode can be configured in two ways. Either with "delete_input_data: true" set or not set.
|
||||
When "delete_input_data" is set for ValidateDatasetNode, once validation is complete, entire input data will be deleted.
|
||||
So, suggestion is to use this ValidateDatasetNode as the last node in the dag with "delete_input_data".
|
||||
Example dag:
|
||||
```
|
||||
Insert
|
||||
Upsert
|
||||
ValidateDatasetNode with delete_input_data = true
|
||||
```
|
||||
|
||||
If above dag is run with "dag_rounds" = 10 and "dag_intermittent_delay_mins" = 10, then this dag will run for 10 times
|
||||
with 10 mins delay between every run. At the end of every run, records written as part of this round will be validated.
|
||||
At the end of each validation, all contents of input are deleted.
|
||||
For eg: incase of above dag,
|
||||
```
|
||||
Round1:
|
||||
insert => inputPath/batch1
|
||||
upsert -> inputPath/batch2
|
||||
Validate with delete_input_data = true
|
||||
Validates contents from batch1 and batch2 are in hudi and ensures Row equality
|
||||
Since "delete_input_data" is set, deletes contents from batch1 and batch2.
|
||||
Round2:
|
||||
insert => inputPath/batch3
|
||||
upsert -> inputPath/batch4
|
||||
Validate with delete_input_data = true
|
||||
Validates contents from batch3 and batch4 are in hudi and ensures Row equality
|
||||
Since "delete_input_data" is set, deletes contents from batch3 and batch4.
|
||||
Round3:
|
||||
insert => inputPath/batch5
|
||||
upsert -> inputPath/batch6
|
||||
Validate with delete_input_data = true
|
||||
Validates contents from batch5 and batch6 are in hudi and ensures Row equality
|
||||
Since "delete_input_data" is set, deletes contents from batch5 and batch6.
|
||||
.
|
||||
.
|
||||
```
|
||||
If you wish to do a cumulative validation, do not set delete_input_data in ValidateDatasetNode. But remember that this
|
||||
may not scale beyond certain point since input data as well as hudi content's keeps occupying the disk and grows for
|
||||
every cycle.
|
||||
|
||||
Lets see an example where you don't set "delete_input_data" as part of Validation.
|
||||
```
|
||||
Round1:
|
||||
insert => inputPath/batch1
|
||||
upsert -> inputPath/batch2
|
||||
Validate: validates contents from batch1 and batch2 are in hudi and ensures Row equality
|
||||
Round2:
|
||||
insert => inputPath/batch3
|
||||
upsert -> inputPath/batch4
|
||||
Validate: validates contents from batch1 to batch4 are in hudi and ensures Row equality
|
||||
Round3:
|
||||
insert => inputPath/batch5
|
||||
upsert -> inputPath/batch6
|
||||
Validate: validates contents from batch1 and batch6 are in hudi and ensures Row equality
|
||||
.
|
||||
.
|
||||
```
|
||||
|
||||
You could also have validations in the middle of your dag and not set the "delete_input_data". But set it only in the
|
||||
last node in the dag.
|
||||
```
|
||||
Round1:
|
||||
insert => inputPath/batch1
|
||||
upsert -> inputPath/batch2
|
||||
Validate: validates contents from batch1 and batch2 are in hudi and ensures Row equality
|
||||
insert => inputPath/batch3
|
||||
upsert -> inputPath/batch4
|
||||
Validate with delete_input_data = true
|
||||
Validates contents from batch1 to batch4 are in hudi and ensures Row equality
|
||||
since "delete_input_data" is set to true, this node deletes contents from batch1 and batch4.
|
||||
Round2:
|
||||
insert => inputPath/batch5
|
||||
upsert -> inputPath/batch6
|
||||
Validate: validates contents from batch5 and batch6 are in hudi and ensures Row equality
|
||||
insert => inputPath/batch7
|
||||
upsert -> inputPath/batch8
|
||||
Validate: validates contents from batch5 to batch8 are in hudi and ensures Row equality
|
||||
since "delete_input_data" is set to true, this node deletes contents from batch5 to batch8.
|
||||
Round3:
|
||||
insert => inputPath/batch9
|
||||
upsert -> inputPath/batch10
|
||||
Validate: validates contents from batch9 and batch10 are in hudi and ensures Row equality
|
||||
insert => inputPath/batch11
|
||||
upsert -> inputPath/batch12
|
||||
Validate with delete_input_data = true
|
||||
Validates contents from batch9 to batch12 are in hudi and ensures Row equality
|
||||
Set "delete_input_data" to true. so this node deletes contents from batch9 to batch12.
|
||||
.
|
||||
.
|
||||
```
|
||||
Above dag was just an example for illustration purposes. But you can make it complex as per your needs.
|
||||
```
|
||||
Insert
|
||||
Upsert
|
||||
Delete
|
||||
Validate w/o deleting
|
||||
Insert
|
||||
Rollback
|
||||
Validate w/o deleting
|
||||
Upsert
|
||||
Validate w/ deletion
|
||||
```
|
||||
With this dag, you can set the two additional configs "dag_rounds" and "dag_intermittent_delay_mins" and have a long
|
||||
running test suite.
|
||||
|
||||
```
|
||||
dag_rounds: 1
|
||||
dag_intermittent_delay_mins: 10
|
||||
dag_content:
|
||||
Insert
|
||||
Upsert
|
||||
Delete
|
||||
Validate w/o deleting
|
||||
Insert
|
||||
Rollback
|
||||
Validate w/o deleting
|
||||
Upsert
|
||||
Validate w/ deletion
|
||||
|
||||
```
|
||||
|
||||
Sample COW command with repeated runs.
|
||||
```
|
||||
spark-submit \
|
||||
--packages org.apache.spark:spark-avro_2.11:2.4.0 \
|
||||
--conf spark.task.cpus=1 \
|
||||
--conf spark.executor.cores=1 \
|
||||
--conf spark.task.maxFailures=100 \
|
||||
--conf spark.memory.fraction=0.4 \
|
||||
--conf spark.rdd.compress=true \
|
||||
--conf spark.kryoserializer.buffer.max=2000m \
|
||||
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
|
||||
--conf spark.memory.storageFraction=0.1 \
|
||||
--conf spark.shuffle.service.enabled=true \
|
||||
--conf spark.sql.hive.convertMetastoreParquet=false \
|
||||
--conf spark.driver.maxResultSize=12g \
|
||||
--conf spark.executor.heartbeatInterval=120s \
|
||||
--conf spark.network.timeout=600s \
|
||||
--conf spark.yarn.max.executor.failures=10 \
|
||||
--conf spark.sql.catalogImplementation=hive \
|
||||
--conf spark.driver.extraClassPath=/var/demo/jars/* \
|
||||
--conf spark.executor.extraClassPath=/var/demo/jars/* \
|
||||
--class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob \
|
||||
/opt/hudi-integ-test-bundle-0.6.1-SNAPSHOT.jar \
|
||||
--source-ordering-field test_suite_source_ordering_field \
|
||||
--use-deltastreamer \
|
||||
--target-base-path /user/hive/warehouse/hudi-integ-test-suite/output \
|
||||
--input-base-path /user/hive/warehouse/hudi-integ-test-suite/input \
|
||||
--target-table table1 \
|
||||
--props test.properties \
|
||||
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
|
||||
--source-class org.apache.hudi.utilities.sources.AvroDFSSource \
|
||||
--input-file-size 125829120 \
|
||||
--workload-yaml-path file:/var/hoodie/ws/docker/demo/config/test-suite/complex-dag-cow.yaml \
|
||||
--workload-generator-classname org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \
|
||||
--table-type COPY_ON_WRITE \
|
||||
--compact-scheduling-minshare 1
|
||||
```
|
||||
|
||||
A ready to use dag is available under docker/demo/config/test-suite/ that could give you an idea for long running
|
||||
dags.
|
||||
cow-per-round-mixed-validate.yaml
|
||||
|
||||
As of now, "ValidateDatasetNode" uses spark data source and hive tables for comparison. Hence COW and real time view in
|
||||
MOR can be tested.
|
||||
|
||||
|
||||
@@ -25,13 +25,13 @@ import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.utilities.deltastreamer.DeltaSync;
|
||||
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
|
||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
|
||||
/**
|
||||
* Extends the {@link HoodieDeltaStreamer} to expose certain operations helpful in running the Test Suite.
|
||||
* This is done to achieve 2 things 1) Leverage some components of {@link HoodieDeltaStreamer} 2)
|
||||
* Piggyback on the suite to test {@link HoodieDeltaStreamer}
|
||||
* Extends the {@link HoodieDeltaStreamer} to expose certain operations helpful in running the Test Suite. This is done to achieve 2 things 1) Leverage some components of {@link HoodieDeltaStreamer}
|
||||
* 2) Piggyback on the suite to test {@link HoodieDeltaStreamer}
|
||||
*/
|
||||
public class HoodieDeltaStreamerWrapper extends HoodieDeltaStreamer {
|
||||
|
||||
|
||||
@@ -18,13 +18,6 @@
|
||||
|
||||
package org.apache.hudi.integ.testsuite;
|
||||
|
||||
import com.beust.jcommander.JCommander;
|
||||
import com.beust.jcommander.Parameter;
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hive.conf.HiveConf;
|
||||
import org.apache.hudi.DataSourceUtils;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
@@ -35,23 +28,30 @@ import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.integ.testsuite.dag.DagUtils;
|
||||
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
|
||||
import org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator;
|
||||
import org.apache.hudi.integ.testsuite.dag.WriterContext;
|
||||
import org.apache.hudi.integ.testsuite.dag.scheduler.DagScheduler;
|
||||
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
|
||||
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
|
||||
import org.apache.hudi.integ.testsuite.dag.WriterContext;
|
||||
import org.apache.hudi.keygen.BuiltinKeyGenerator;
|
||||
import org.apache.hudi.utilities.UtilHelpers;
|
||||
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
|
||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||
|
||||
import com.beust.jcommander.JCommander;
|
||||
import com.beust.jcommander.Parameter;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hive.conf.HiveConf;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* This is the entry point for running a Hudi Test Suite. Although this class has similarities with
|
||||
* {@link HoodieDeltaStreamer} this class does not extend it since do not want to create a dependency on the changes in
|
||||
* DeltaStreamer.
|
||||
* This is the entry point for running a Hudi Test Suite. Although this class has similarities with {@link HoodieDeltaStreamer} this class does not extend it since do not want to create a dependency
|
||||
* on the changes in DeltaStreamer.
|
||||
*/
|
||||
public class HoodieTestSuiteJob {
|
||||
|
||||
@@ -133,10 +133,10 @@ public class HoodieTestSuiteJob {
|
||||
|
||||
public WorkflowDag createWorkflowDag() throws IOException {
|
||||
WorkflowDag workflowDag = this.cfg.workloadYamlPath == null ? ((WorkflowDagGenerator) ReflectionUtils
|
||||
.loadClass((this.cfg).workloadDagGenerator)).build()
|
||||
: DagUtils.convertYamlPathToDag(
|
||||
FSUtils.getFs(this.cfg.workloadYamlPath, jsc.hadoopConfiguration(), true),
|
||||
this.cfg.workloadYamlPath);
|
||||
.loadClass((this.cfg).workloadDagGenerator)).build()
|
||||
: DagUtils.convertYamlPathToDag(
|
||||
FSUtils.getFs(this.cfg.workloadYamlPath, jsc.hadoopConfiguration(), true),
|
||||
this.cfg.workloadYamlPath);
|
||||
return workflowDag;
|
||||
}
|
||||
|
||||
@@ -147,7 +147,7 @@ public class HoodieTestSuiteJob {
|
||||
long startTime = System.currentTimeMillis();
|
||||
WriterContext writerContext = new WriterContext(jsc, props, cfg, keyGenerator, sparkSession);
|
||||
writerContext.initContext(jsc);
|
||||
DagScheduler dagScheduler = new DagScheduler(workflowDag, writerContext);
|
||||
DagScheduler dagScheduler = new DagScheduler(workflowDag, writerContext, jsc);
|
||||
dagScheduler.schedule();
|
||||
log.info("Finished scheduling all tasks, Time taken {}", System.currentTimeMillis() - startTime);
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -87,6 +87,7 @@ public class DeltaConfig implements Serializable {
|
||||
private static String HIVE_LOCAL = "hive_local";
|
||||
private static String REINIT_CONTEXT = "reinitialize_context";
|
||||
private static String START_PARTITION = "start_partition";
|
||||
private static String DELETE_INPUT_DATA = "delete_input_data";
|
||||
|
||||
private Map<String, Object> configsMap;
|
||||
|
||||
@@ -154,6 +155,10 @@ public class DeltaConfig implements Serializable {
|
||||
return Boolean.valueOf(configsMap.getOrDefault(REINIT_CONTEXT, false).toString());
|
||||
}
|
||||
|
||||
public boolean isDeleteInputData() {
|
||||
return Boolean.valueOf(configsMap.getOrDefault(DELETE_INPUT_DATA, false).toString());
|
||||
}
|
||||
|
||||
public Map<String, Object> getOtherConfigs() {
|
||||
if (configsMap == null) {
|
||||
return new HashMap<>();
|
||||
|
||||
@@ -18,12 +18,14 @@
|
||||
|
||||
package org.apache.hudi.integ.testsuite.converter;
|
||||
|
||||
import java.util.List;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hudi.integ.testsuite.generator.LazyRecordGeneratorIterator;
|
||||
import org.apache.hudi.integ.testsuite.generator.UpdateGeneratorIterator;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* This converter creates an update {@link GenericRecord} from an existing {@link GenericRecord}.
|
||||
*/
|
||||
@@ -36,7 +38,7 @@ public class UpdateConverter implements Converter<GenericRecord, GenericRecord>
|
||||
private final List<String> recordKeyFields;
|
||||
private final int minPayloadSize;
|
||||
|
||||
public UpdateConverter(String schemaStr, int minPayloadSize, List<String> partitionPathFields,
|
||||
public UpdateConverter(String schemaStr, int minPayloadSize, List<String> partitionPathFields,
|
||||
List<String> recordKeyFields) {
|
||||
this.schemaStr = schemaStr;
|
||||
this.partitionPathFields = partitionPathFields;
|
||||
|
||||
@@ -48,6 +48,15 @@ import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
|
||||
*/
|
||||
public class DagUtils {
|
||||
|
||||
public static final String DAG_NAME = "dag_name";
|
||||
public static final String DAG_ROUNDS = "dag_rounds";
|
||||
public static final String DAG_INTERMITTENT_DELAY_MINS = "dag_intermittent_delay_mins";
|
||||
public static final String DAG_CONTENT = "dag_content";
|
||||
|
||||
public static int DEFAULT_DAG_ROUNDS = 1;
|
||||
public static int DEFAULT_INTERMITTENT_DELAY_MINS = 10;
|
||||
public static String DEFAULT_DAG_NAME = "TestDagName";
|
||||
|
||||
static final ObjectMapper MAPPER = new ObjectMapper();
|
||||
|
||||
/**
|
||||
@@ -62,15 +71,38 @@ public class DagUtils {
|
||||
* Converts a YAML representation to {@link WorkflowDag}.
|
||||
*/
|
||||
public static WorkflowDag convertYamlToDag(String yaml) throws IOException {
|
||||
int dagRounds = DEFAULT_DAG_ROUNDS;
|
||||
int intermittentDelayMins = DEFAULT_INTERMITTENT_DELAY_MINS;
|
||||
String dagName = DEFAULT_DAG_NAME;
|
||||
Map<String, DagNode> allNodes = new HashMap<>();
|
||||
final ObjectMapper yamlReader = new ObjectMapper(new YAMLFactory());
|
||||
final JsonNode jsonNode = yamlReader.readTree(yaml);
|
||||
Iterator<Entry<String, JsonNode>> itr = jsonNode.fields();
|
||||
while (itr.hasNext()) {
|
||||
Entry<String, JsonNode> dagNode = itr.next();
|
||||
allNodes.put(dagNode.getKey(), convertJsonToDagNode(allNodes, dagNode.getKey(), dagNode.getValue()));
|
||||
String key = dagNode.getKey();
|
||||
switch (key) {
|
||||
case DAG_NAME:
|
||||
dagName = dagNode.getValue().asText();
|
||||
break;
|
||||
case DAG_ROUNDS:
|
||||
dagRounds = dagNode.getValue().asInt();
|
||||
break;
|
||||
case DAG_INTERMITTENT_DELAY_MINS:
|
||||
intermittentDelayMins = dagNode.getValue().asInt();
|
||||
break;
|
||||
case DAG_CONTENT:
|
||||
JsonNode dagContent = dagNode.getValue();
|
||||
Iterator<Entry<String, JsonNode>> contentItr = dagContent.fields();
|
||||
while(contentItr.hasNext()) {
|
||||
Entry<String, JsonNode> dagContentNode = contentItr.next();
|
||||
allNodes.put(dagContentNode.getKey(), convertJsonToDagNode(allNodes, dagContentNode.getKey(), dagContentNode.getValue()));
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
return new WorkflowDag(findRootNodes(allNodes));
|
||||
return new WorkflowDag(dagName, dagRounds, intermittentDelayMins, findRootNodes(allNodes));
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -18,8 +18,6 @@
|
||||
|
||||
package org.apache.hudi.integ.testsuite.dag;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
|
||||
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
|
||||
@@ -27,9 +25,11 @@ import org.apache.hudi.integ.testsuite.dag.nodes.HiveQueryNode;
|
||||
import org.apache.hudi.integ.testsuite.dag.nodes.InsertNode;
|
||||
import org.apache.hudi.integ.testsuite.dag.nodes.UpsertNode;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* An example of how to generate a workflow dag programmatically. This is also used as the default workflow dag if
|
||||
* none is provided.
|
||||
* An example of how to generate a workflow dag programmatically. This is also used as the default workflow dag if none is provided.
|
||||
*/
|
||||
public class SimpleWorkflowDagGenerator implements WorkflowDagGenerator {
|
||||
|
||||
|
||||
@@ -18,20 +18,47 @@
|
||||
|
||||
package org.apache.hudi.integ.testsuite.dag;
|
||||
|
||||
import java.util.List;
|
||||
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hudi.integ.testsuite.dag.DagUtils.DEFAULT_DAG_NAME;
|
||||
import static org.apache.hudi.integ.testsuite.dag.DagUtils.DEFAULT_DAG_ROUNDS;
|
||||
import static org.apache.hudi.integ.testsuite.dag.DagUtils.DEFAULT_INTERMITTENT_DELAY_MINS;
|
||||
|
||||
/**
|
||||
* Workflow dag that encapsulates all execute nodes.
|
||||
*/
|
||||
public class WorkflowDag<O> {
|
||||
|
||||
private String dagName;
|
||||
private int rounds;
|
||||
private int intermittentDelayMins;
|
||||
private List<DagNode<O>> nodeList;
|
||||
|
||||
public WorkflowDag(List<DagNode<O>> nodeList) {
|
||||
this(DEFAULT_DAG_NAME, DEFAULT_DAG_ROUNDS, DEFAULT_INTERMITTENT_DELAY_MINS, nodeList);
|
||||
}
|
||||
|
||||
public WorkflowDag(String dagName, int rounds, int intermittentDelayMins, List<DagNode<O>> nodeList) {
|
||||
this.dagName = dagName;
|
||||
this.rounds = rounds;
|
||||
this.intermittentDelayMins = intermittentDelayMins;
|
||||
this.nodeList = nodeList;
|
||||
}
|
||||
|
||||
public String getDagName() {
|
||||
return dagName;
|
||||
}
|
||||
|
||||
public int getRounds() {
|
||||
return rounds;
|
||||
}
|
||||
|
||||
public int getIntermittentDelayMins() {
|
||||
return intermittentDelayMins;
|
||||
}
|
||||
|
||||
public List<DagNode<O>> getNodeList() {
|
||||
return nodeList;
|
||||
}
|
||||
|
||||
@@ -21,15 +21,16 @@ package org.apache.hudi.integ.testsuite.dag;
|
||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
|
||||
import org.apache.hudi.integ.testsuite.HoodieTestSuiteWriter;
|
||||
import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
|
||||
import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
|
||||
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
|
||||
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
|
||||
import org.apache.hudi.keygen.BuiltinKeyGenerator;
|
||||
import org.apache.hudi.integ.testsuite.generator.DeltaGenerator;
|
||||
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob.HoodieTestSuiteConfig;
|
||||
import org.apache.hudi.utilities.UtilHelpers;
|
||||
import org.apache.hudi.utilities.schema.SchemaProvider;
|
||||
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
@@ -38,8 +39,7 @@ import org.apache.spark.sql.SparkSession;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* WriterContext wraps the delta writer/data generator related configuration needed
|
||||
* to init/reinit.
|
||||
* WriterContext wraps the delta writer/data generator related configuration needed to init/reinit.
|
||||
*/
|
||||
public class WriterContext {
|
||||
|
||||
@@ -53,8 +53,9 @@ public class WriterContext {
|
||||
private BuiltinKeyGenerator keyGenerator;
|
||||
private transient SparkSession sparkSession;
|
||||
private transient JavaSparkContext jsc;
|
||||
|
||||
public WriterContext(JavaSparkContext jsc, TypedProperties props, HoodieTestSuiteConfig cfg,
|
||||
BuiltinKeyGenerator keyGenerator, SparkSession sparkSession) {
|
||||
BuiltinKeyGenerator keyGenerator, SparkSession sparkSession) {
|
||||
this.cfg = cfg;
|
||||
this.props = props;
|
||||
this.keyGenerator = keyGenerator;
|
||||
|
||||
@@ -41,6 +41,17 @@ public abstract class DagNode<O> implements Comparable<DagNode<O>> {
|
||||
protected Config config;
|
||||
private boolean isCompleted;
|
||||
|
||||
public DagNode clone() {
|
||||
List<DagNode<O>> tempChildNodes = new ArrayList<>();
|
||||
for(DagNode dagNode: childNodes) {
|
||||
tempChildNodes.add(dagNode.clone());
|
||||
}
|
||||
this.childNodes = tempChildNodes;
|
||||
this.result = null;
|
||||
this.isCompleted = false;
|
||||
return this;
|
||||
}
|
||||
|
||||
public DagNode<O> addChildNode(DagNode childNode) {
|
||||
childNode.getParentNodes().add(this);
|
||||
getChildNodes().add(childNode);
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.integ.testsuite.dag.nodes;
|
||||
|
||||
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Delay Node to add delays between each group of test runs.
|
||||
*/
|
||||
public class DelayNode extends DagNode<Boolean> {
|
||||
|
||||
private static Logger log = LoggerFactory.getLogger(ValidateDatasetNode.class);
|
||||
private int delayMins;
|
||||
|
||||
public DelayNode(int delayMins) {
|
||||
this.delayMins = delayMins;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(ExecutionContext context) throws Exception {
|
||||
log.warn("Waiting for "+ delayMins+" mins before going for next test run");
|
||||
Thread.sleep(delayMins * 60 * 1000);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,148 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.integ.testsuite.dag.nodes;
|
||||
|
||||
import org.apache.hudi.DataSourceWriteOptions;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
|
||||
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
|
||||
import org.apache.hudi.integ.testsuite.schema.SchemaUtils;
|
||||
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.spark.api.java.function.MapFunction;
|
||||
import org.apache.spark.api.java.function.ReduceFunction;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Encoders;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
|
||||
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
|
||||
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
|
||||
import org.apache.spark.sql.catalyst.expressions.Attribute;
|
||||
import org.apache.spark.sql.types.StructType;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import scala.Tuple2;
|
||||
import scala.collection.JavaConversions;
|
||||
import scala.collection.JavaConverters;
|
||||
|
||||
/**
|
||||
* This nodes validates contents from input path are in tact with Hudi. This nodes uses spark datasource for comparison purposes. By default no configs are required for this node. But there is an
|
||||
* optional config "delete_input_data" that you can set for this node. If set, once validation completes, contents from inputPath are deleted. This will come in handy for long running test suites.
|
||||
* README has more details under docker set up for usages of this node.
|
||||
*/
|
||||
public class ValidateDatasetNode extends DagNode<Boolean> {
|
||||
|
||||
private static Logger log = LoggerFactory.getLogger(ValidateDatasetNode.class);
|
||||
|
||||
public ValidateDatasetNode(Config config) {
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(ExecutionContext context) throws Exception {
|
||||
|
||||
SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
|
||||
|
||||
// todo: Fix partitioning schemes. For now, assumes data based partitioning.
|
||||
String inputPath = context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
|
||||
String hudiPath = context.getHoodieTestSuiteWriter().getCfg().targetBasePath + "/*/*/*";
|
||||
log.warn("ValidateDataset Node: Input path " + inputPath + ", hudi path " + hudiPath);
|
||||
// listing batches to be validated
|
||||
String inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
|
||||
FileSystem fs = new Path(inputPathStr)
|
||||
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
|
||||
FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
|
||||
for (FileStatus fileStatus : fileStatuses) {
|
||||
log.debug("Listing all Micro batches to be validated :: " + fileStatus.getPath().toString());
|
||||
}
|
||||
|
||||
String recordKeyField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY());
|
||||
String partitionPathField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY());
|
||||
// todo: fix hard coded fields from configs.
|
||||
// read input and resolve insert, updates, etc.
|
||||
Dataset<Row> inputDf = session.read().format("avro").load(inputPath);
|
||||
ExpressionEncoder encoder = getEncoder(inputDf.schema());
|
||||
Dataset<Row> inputSnapshotDf = inputDf.groupByKey(
|
||||
(MapFunction<Row, String>) value -> partitionPathField + "+" + recordKeyField, Encoders.STRING())
|
||||
.reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
|
||||
int ts1 = v1.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
|
||||
int ts2 = v2.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
|
||||
if (ts1 > ts2) {
|
||||
return v1;
|
||||
} else {
|
||||
return v2;
|
||||
}
|
||||
})
|
||||
.map((MapFunction<Tuple2<String, Row>, Row>) value -> value._2, encoder)
|
||||
.filter("_hoodie_is_deleted is NULL");
|
||||
|
||||
// read from hudi and remove meta columns.
|
||||
Dataset<Row> hudiDf = session.read().format("hudi").load(hudiPath);
|
||||
Dataset<Row> trimmedDf = hudiDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
|
||||
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
|
||||
|
||||
Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedDf);
|
||||
// the intersected df should be same as inputDf. if not, there is some mismatch.
|
||||
if (inputSnapshotDf.except(intersectionDf).count() != 0) {
|
||||
log.error("Data set validation failed. Total count in hudi " + trimmedDf.count() + ", input df count " + inputSnapshotDf.count());
|
||||
throw new AssertionError("Hudi contents does not match contents input data. ");
|
||||
}
|
||||
|
||||
String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY());
|
||||
String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY());
|
||||
log.warn("Validating hive table with db : " + database + " and table : " + tableName);
|
||||
Dataset<Row> cowDf = session.sql("SELECT * FROM " + database + "." + tableName);
|
||||
Dataset<Row> trimmedCowDf = cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
|
||||
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
|
||||
intersectionDf = inputSnapshotDf.intersect(trimmedDf);
|
||||
// the intersected df should be same as inputDf. if not, there is some mismatch.
|
||||
if (inputSnapshotDf.except(intersectionDf).count() != 0) {
|
||||
log.error("Data set validation failed for COW hive table. Total count in hudi " + trimmedCowDf.count() + ", input df count " + inputSnapshotDf.count());
|
||||
throw new AssertionError("Hudi hive table contents does not match contents input data. ");
|
||||
}
|
||||
|
||||
// if delete input data is enabled, erase input data.
|
||||
if (config.isDeleteInputData()) {
|
||||
// clean up input data for current group of writes.
|
||||
inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
|
||||
fs = new Path(inputPathStr)
|
||||
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
|
||||
fileStatuses = fs.listStatus(new Path(inputPathStr));
|
||||
for (FileStatus fileStatus : fileStatuses) {
|
||||
log.debug("Micro batch to be deleted " + fileStatus.getPath().toString());
|
||||
fs.delete(fileStatus.getPath(), true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private ExpressionEncoder getEncoder(StructType schema) {
|
||||
List<Attribute> attributes = JavaConversions.asJavaCollection(schema.toAttributes()).stream()
|
||||
.map(Attribute::toAttribute).collect(Collectors.toList());
|
||||
return RowEncoder.apply(schema)
|
||||
.resolveAndBind(JavaConverters.asScalaBufferConverter(attributes).asScala().toSeq(),
|
||||
SimpleAnalyzer$.MODULE$);
|
||||
}
|
||||
}
|
||||
@@ -23,8 +23,10 @@ import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
|
||||
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
|
||||
import org.apache.hudi.integ.testsuite.dag.WriterContext;
|
||||
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
|
||||
import org.apache.hudi.integ.testsuite.dag.nodes.DelayNode;
|
||||
import org.apache.hudi.metrics.Metrics;
|
||||
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -50,9 +52,9 @@ public class DagScheduler {
|
||||
private WorkflowDag workflowDag;
|
||||
private ExecutionContext executionContext;
|
||||
|
||||
public DagScheduler(WorkflowDag workflowDag, WriterContext writerContext) {
|
||||
public DagScheduler(WorkflowDag workflowDag, WriterContext writerContext, JavaSparkContext jsc) {
|
||||
this.workflowDag = workflowDag;
|
||||
this.executionContext = new ExecutionContext(null, writerContext);
|
||||
this.executionContext = new ExecutionContext(jsc, writerContext);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -63,7 +65,7 @@ public class DagScheduler {
|
||||
public void schedule() throws Exception {
|
||||
ExecutorService service = Executors.newFixedThreadPool(2);
|
||||
try {
|
||||
execute(service, workflowDag.getNodeList());
|
||||
execute(service, workflowDag);
|
||||
service.shutdown();
|
||||
} finally {
|
||||
if (!service.isShutdown()) {
|
||||
@@ -77,33 +79,47 @@ public class DagScheduler {
|
||||
* Method to start executing the nodes in workflow DAGs.
|
||||
*
|
||||
* @param service ExecutorService
|
||||
* @param nodes Nodes to be executed
|
||||
* @param workflowDag instance of workflow dag that needs to be executed
|
||||
* @throws Exception will be thrown if ant error occurred
|
||||
*/
|
||||
private void execute(ExecutorService service, List<DagNode> nodes) throws Exception {
|
||||
private void execute(ExecutorService service, WorkflowDag workflowDag) throws Exception {
|
||||
// Nodes at the same level are executed in parallel
|
||||
Queue<DagNode> queue = new PriorityQueue<>(nodes);
|
||||
log.info("Running workloads");
|
||||
List<DagNode> nodes = workflowDag.getNodeList();
|
||||
int curRound = 1;
|
||||
do {
|
||||
List<Future> futures = new ArrayList<>();
|
||||
Set<DagNode> childNodes = new HashSet<>();
|
||||
while (queue.size() > 0) {
|
||||
DagNode nodeToExecute = queue.poll();
|
||||
log.info("Node to execute in dag scheduler " + nodeToExecute.getConfig().toString());
|
||||
futures.add(service.submit(() -> executeNode(nodeToExecute)));
|
||||
if (nodeToExecute.getChildNodes().size() > 0) {
|
||||
childNodes.addAll(nodeToExecute.getChildNodes());
|
||||
}
|
||||
log.warn("===================================================================");
|
||||
log.warn("Running workloads for round num " + curRound);
|
||||
log.warn("===================================================================");
|
||||
Queue<DagNode> queue = new PriorityQueue<>();
|
||||
for (DagNode dagNode : nodes) {
|
||||
queue.add(dagNode.clone());
|
||||
}
|
||||
queue.addAll(childNodes);
|
||||
childNodes.clear();
|
||||
for (Future future : futures) {
|
||||
future.get(1, TimeUnit.HOURS);
|
||||
do {
|
||||
List<Future> futures = new ArrayList<>();
|
||||
Set<DagNode> childNodes = new HashSet<>();
|
||||
while (queue.size() > 0) {
|
||||
DagNode nodeToExecute = queue.poll();
|
||||
log.warn("Executing node \"" + nodeToExecute.getConfig().getOtherConfigs().get(CONFIG_NAME) + "\" :: " + nodeToExecute.getConfig());
|
||||
futures.add(service.submit(() -> executeNode(nodeToExecute)));
|
||||
if (nodeToExecute.getChildNodes().size() > 0) {
|
||||
childNodes.addAll(nodeToExecute.getChildNodes());
|
||||
}
|
||||
}
|
||||
queue.addAll(childNodes);
|
||||
childNodes.clear();
|
||||
for (Future future : futures) {
|
||||
future.get(1, TimeUnit.HOURS);
|
||||
}
|
||||
} while (queue.size() > 0);
|
||||
log.info("Finished workloads for round num " + curRound);
|
||||
if (curRound < workflowDag.getRounds()) {
|
||||
new DelayNode(workflowDag.getIntermittentDelayMins()).execute(executionContext);
|
||||
}
|
||||
|
||||
// After each level, report and flush the metrics
|
||||
Metrics.flush();
|
||||
} while (queue.size() > 0);
|
||||
} while (curRound++ < workflowDag.getRounds());
|
||||
log.info("Finished workloads");
|
||||
}
|
||||
|
||||
@@ -119,7 +135,6 @@ public class DagScheduler {
|
||||
try {
|
||||
int repeatCount = node.getConfig().getRepeatCount();
|
||||
while (repeatCount > 0) {
|
||||
log.warn("executing node: \"" + node.getConfig().getOtherConfigs().get(CONFIG_NAME) + "\" of type: " + node.getClass() + " :: " + node.getConfig().toString());
|
||||
node.execute(executionContext);
|
||||
log.info("Finished executing {}", node.getName());
|
||||
repeatCount--;
|
||||
|
||||
@@ -41,6 +41,10 @@ import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
|
||||
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
|
||||
import org.apache.hudi.integ.testsuite.converter.Converter;
|
||||
import org.apache.hudi.integ.testsuite.converter.DeleteConverter;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
|
||||
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
|
||||
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
|
||||
import org.apache.hudi.integ.testsuite.converter.UpdateConverter;
|
||||
import org.apache.hudi.integ.testsuite.reader.DFSAvroDeltaInputReader;
|
||||
import org.apache.hudi.integ.testsuite.reader.DFSHoodieDatasetInputReader;
|
||||
@@ -51,6 +55,7 @@ import org.apache.hudi.integ.testsuite.writer.DeltaWriterAdapter;
|
||||
import org.apache.hudi.integ.testsuite.writer.DeltaWriterFactory;
|
||||
import org.apache.hudi.keygen.BuiltinKeyGenerator;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
@@ -58,6 +63,17 @@ import org.apache.spark.storage.StorageLevel;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import scala.Tuple2;
|
||||
|
||||
@@ -77,7 +93,7 @@ public class DeltaGenerator implements Serializable {
|
||||
private int batchId;
|
||||
|
||||
public DeltaGenerator(DFSDeltaConfig deltaOutputConfig, JavaSparkContext jsc, SparkSession sparkSession,
|
||||
String schemaStr, BuiltinKeyGenerator keyGenerator) {
|
||||
String schemaStr, BuiltinKeyGenerator keyGenerator) {
|
||||
this.deltaOutputConfig = deltaOutputConfig;
|
||||
this.jsc = jsc;
|
||||
this.sparkSession = sparkSession;
|
||||
@@ -167,7 +183,6 @@ public class DeltaGenerator implements Serializable {
|
||||
log.info("Repartitioning records into " + numPartition + " partitions for updates");
|
||||
adjustedRDD = adjustedRDD.repartition(numPartition);
|
||||
log.info("Repartitioning records done for updates");
|
||||
|
||||
UpdateConverter converter = new UpdateConverter(schemaStr, config.getRecordSize(),
|
||||
partitionPathFieldNames, recordRowKeyFieldNames);
|
||||
JavaRDD<GenericRecord> updates = converter.convert(adjustedRDD);
|
||||
|
||||
@@ -20,6 +20,11 @@ package org.apache.hudi.integ.testsuite.generator;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
@@ -67,7 +72,7 @@ public class FlexibleSchemaRecordGenerationIterator implements Iterator<GenericR
|
||||
lastRecord = record;
|
||||
return record;
|
||||
} else {
|
||||
return this.generator.randomize(lastRecord, partitionPathFieldNames);
|
||||
return this.generator.randomize(lastRecord, this.partitionPathFieldNames);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -134,12 +134,16 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
|
||||
protected GenericRecord create(Schema schema, Set<String> partitionPathFieldNames) {
|
||||
GenericRecord result = new GenericData.Record(schema);
|
||||
for (Schema.Field f : schema.getFields()) {
|
||||
if (isPartialLongField(f, partitionPathFieldNames)) {
|
||||
// This is a long field used as partition field. Set it to seconds since epoch.
|
||||
long value = TimeUnit.SECONDS.convert(partitionIndex, TimeUnit.DAYS);
|
||||
result.put(f.name(), (long) value);
|
||||
if (f.name().equals(DEFAULT_HOODIE_IS_DELETED_COL)) {
|
||||
result.put(f.name(), false);
|
||||
} else {
|
||||
result.put(f.name(), typeConvert(f));
|
||||
if (isPartialLongField(f, partitionPathFieldNames)) {
|
||||
// This is a long field used as partition field. Set it to seconds since epoch.
|
||||
long value = TimeUnit.SECONDS.convert(partitionIndex, TimeUnit.DAYS);
|
||||
result.put(f.name(), (long) value);
|
||||
} else {
|
||||
result.put(f.name(), typeConvert(f));
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
|
||||
@@ -18,16 +18,16 @@
|
||||
|
||||
package org.apache.hudi.integ.testsuite.generator;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* A lazy update payload generator to generate {@link GenericRecord}s lazily.
|
||||
*/
|
||||
|
||||
@@ -18,20 +18,6 @@
|
||||
|
||||
package org.apache.hudi.integ.testsuite.helpers;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FsStatus;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.ImmutablePair;
|
||||
@@ -40,13 +26,26 @@ import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.integ.testsuite.HoodieTestSuiteJob;
|
||||
import org.apache.hudi.utilities.sources.helpers.DFSPathSelector;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* A custom dfs path selector used only for the hudi test suite. To be used only if workload is not run inline.
|
||||
*/
|
||||
public class DFSTestSuitePathSelector extends DFSPathSelector {
|
||||
|
||||
private static volatile Logger log = LoggerFactory.getLogger(HoodieTestSuiteJob.class);
|
||||
|
||||
public DFSTestSuitePathSelector(TypedProperties props, Configuration hadoopConf) {
|
||||
@@ -67,6 +66,7 @@ public class DFSTestSuitePathSelector extends DFSPathSelector {
|
||||
lastBatchId = 0;
|
||||
nextBatchId = 1;
|
||||
}
|
||||
|
||||
// obtain all eligible files for the batch
|
||||
List<FileStatus> eligibleFiles = new ArrayList<>();
|
||||
FileStatus[] fileStatuses = fs.globStatus(
|
||||
@@ -87,7 +87,8 @@ public class DFSTestSuitePathSelector extends DFSPathSelector {
|
||||
if (!fileStatus.isDirectory() || IGNORE_FILEPREFIX_LIST.stream()
|
||||
.anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) {
|
||||
continue;
|
||||
} else if (fileStatus.getPath().getName().compareTo(lastBatchId.toString()) > 0) {
|
||||
} else if (Integer.parseInt(fileStatus.getPath().getName()) > lastBatchId && Integer.parseInt(fileStatus.getPath()
|
||||
.getName()) <= nextBatchId) {
|
||||
RemoteIterator<LocatedFileStatus> files = fs.listFiles(fileStatus.getPath(), true);
|
||||
while (files.hasNext()) {
|
||||
eligibleFiles.add(files.next());
|
||||
@@ -95,7 +96,6 @@ public class DFSTestSuitePathSelector extends DFSPathSelector {
|
||||
}
|
||||
}
|
||||
|
||||
log.info("Reading " + eligibleFiles.size() + " files. ");
|
||||
// no data to readAvro
|
||||
if (eligibleFiles.size() == 0) {
|
||||
return new ImmutablePair<>(Option.empty(),
|
||||
|
||||
@@ -18,26 +18,6 @@
|
||||
|
||||
package org.apache.hudi.integ.testsuite.reader;
|
||||
|
||||
import static java.util.Map.Entry.comparingByValue;
|
||||
import static java.util.stream.Collectors.toMap;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.avro.HoodieAvroUtils;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.FileSlice;
|
||||
@@ -51,6 +31,12 @@ import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.ParquetReaderIterator;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.config.HoodieMemoryConfig;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.parquet.avro.AvroParquetReader;
|
||||
import org.apache.parquet.avro.AvroReadSupport;
|
||||
import org.apache.spark.api.java.JavaPairRDD;
|
||||
@@ -59,11 +45,27 @@ import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.sql.SparkSession;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
import scala.Tuple2;
|
||||
|
||||
import static java.util.Map.Entry.comparingByValue;
|
||||
import static java.util.stream.Collectors.toMap;
|
||||
|
||||
/**
|
||||
* This class helps to generate updates from an already existing hoodie dataset. It supports generating updates in
|
||||
* across partitions, files and records.
|
||||
* This class helps to generate updates from an already existing hoodie dataset. It supports generating updates in across partitions, files and records.
|
||||
*/
|
||||
public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
|
||||
|
||||
@@ -148,16 +150,22 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
|
||||
long recordsInSingleFile = iteratorSize(readParquetOrLogFiles(getSingleSliceFromRDD(partitionToFileSlice)));
|
||||
int numFilesToUpdate;
|
||||
long numRecordsToUpdatePerFile;
|
||||
if (!numFiles.isPresent() || numFiles.get() == 0) {
|
||||
if (!numFiles.isPresent() || numFiles.get() <= 0) {
|
||||
// If num files are not passed, find the number of files to update based on total records to update and records
|
||||
// per file
|
||||
numFilesToUpdate = (int)Math.ceil((double)numRecordsToUpdate.get() / recordsInSingleFile);
|
||||
// recordsInSingleFile is not average so we still need to account for bias is records distribution
|
||||
// in the files. Limit to the maximum number of files available.
|
||||
int totalExistingFilesCount = partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get();
|
||||
numFilesToUpdate = Math.min(numFilesToUpdate, totalExistingFilesCount);
|
||||
log.info("Files to update {}", numFilesToUpdate);
|
||||
numRecordsToUpdatePerFile = recordsInSingleFile;
|
||||
numFilesToUpdate = (int) Math.floor((double) numRecordsToUpdate.get() / recordsInSingleFile);
|
||||
if (numFilesToUpdate > 0) {
|
||||
// recordsInSingleFile is not average so we still need to account for bias is records distribution
|
||||
// in the files. Limit to the maximum number of files available.
|
||||
int totalExistingFilesCount = partitionToFileIdCountMap.values().stream().reduce((a, b) -> a + b).get();
|
||||
numFilesToUpdate = Math.min(numFilesToUpdate, totalExistingFilesCount);
|
||||
log.info("Files to update {}, records to update per file {}", numFilesToUpdate, recordsInSingleFile);
|
||||
numRecordsToUpdatePerFile = recordsInSingleFile;
|
||||
} else {
|
||||
numFilesToUpdate = 1;
|
||||
numRecordsToUpdatePerFile = numRecordsToUpdate.get();
|
||||
log.info("Total records passed in < records in single file. Hence setting numFilesToUpdate to 1 and numRecordsToUpdate to {} ", numRecordsToUpdatePerFile);
|
||||
}
|
||||
} else {
|
||||
// If num files is passed, find the number of records per file based on either percentage or total records to
|
||||
// update and num files passed
|
||||
@@ -171,6 +179,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
|
||||
partitionPaths.size(), numFilesToUpdate, partitionToFileIdCountMap);
|
||||
JavaRDD<GenericRecord> updates = projectSchema(generateUpdates(adjustedPartitionToFileIdCountMap,
|
||||
partitionToFileSlice, numFilesToUpdate, (int) numRecordsToUpdatePerFile));
|
||||
|
||||
if (numRecordsToUpdate.isPresent() && numFiles.isPresent() && numFiles.get() != 0 && numRecordsToUpdate.get()
|
||||
!= numRecordsToUpdatePerFile * numFiles.get()) {
|
||||
long remainingRecordsToAdd = (numRecordsToUpdate.get() - (numRecordsToUpdatePerFile * numFiles.get()));
|
||||
@@ -215,7 +224,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
|
||||
LinkedHashMap::new));
|
||||
|
||||
// Limit files to be read per partition
|
||||
int numFilesPerPartition = (int) Math.ceil((double)numFiles / numPartitions);
|
||||
int numFilesPerPartition = (int) Math.ceil((double) numFiles / numPartitions);
|
||||
Map<String, Integer> adjustedPartitionToFileIdCountMap = new HashMap<>();
|
||||
partitionToFileIdCountSortedMap.entrySet().stream().forEach(e -> {
|
||||
if (e.getValue() <= numFilesPerPartition) {
|
||||
@@ -283,9 +292,7 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of elements remaining in {@code iterator}. The iterator
|
||||
* will be left exhausted: its {@code hasNext()} method will return
|
||||
* {@code false}.
|
||||
* Returns the number of elements remaining in {@code iterator}. The iterator will be left exhausted: its {@code hasNext()} method will return {@code false}.
|
||||
*/
|
||||
private static int iteratorSize(Iterator<?> iterator) {
|
||||
int count = 0;
|
||||
@@ -297,11 +304,8 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an iterator returning the first {@code limitSize} elements of the
|
||||
* given iterator. If the original iterator does not contain that many
|
||||
* elements, the returned iterator will have the same behavior as the original
|
||||
* iterator. The returned iterator supports {@code remove()} if the original
|
||||
* iterator does.
|
||||
* Creates an iterator returning the first {@code limitSize} elements of the given iterator. If the original iterator does not contain that many elements, the returned iterator will have the same
|
||||
* behavior as the original iterator. The returned iterator supports {@code remove()} if the original iterator does.
|
||||
*
|
||||
* @param iterator the iterator to limit
|
||||
* @param limitSize the maximum number of elements in the returned iterator
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.integ.testsuite.schema;
|
||||
|
||||
public class SchemaUtils {
|
||||
|
||||
public static final String SOURCE_ORDERING_FIELD = "test_suite_source_ordering_field";
|
||||
|
||||
}
|
||||
@@ -0,0 +1,66 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.integ.testsuite.schema;
|
||||
|
||||
import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.integ.testsuite.dag.WriterContext;
|
||||
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
|
||||
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.avro.Schema.Field;
|
||||
import org.apache.avro.Schema.Type;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Appends source ordering field to both source and target schemas. This is required to assist in validation to differentiate records written in different batches.
|
||||
*/
|
||||
public class TestSuiteFileBasedSchemaProvider extends FilebasedSchemaProvider {
|
||||
|
||||
protected static Logger log = LogManager.getLogger(WriterContext.class);
|
||||
|
||||
public TestSuiteFileBasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
|
||||
super(props, jssc);
|
||||
this.sourceSchema = addSourceOrderingFieldToSchema(sourceSchema);
|
||||
this.targetSchema = addSourceOrderingFieldToSchema(targetSchema);
|
||||
}
|
||||
|
||||
private Schema addSourceOrderingFieldToSchema(Schema schema) {
|
||||
List<Field> fields = new ArrayList<>();
|
||||
for (Schema.Field field : schema.getFields()) {
|
||||
Schema.Field newField = new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal());
|
||||
for (Map.Entry<String, Object> prop : field.getObjectProps().entrySet()) {
|
||||
newField.addProp(prop.getKey(), prop.getValue());
|
||||
}
|
||||
fields.add(newField);
|
||||
}
|
||||
Schema.Field sourceOrderingField =
|
||||
new Schema.Field(SchemaUtils.SOURCE_ORDERING_FIELD, Schema.create(Type.INT), "", 0);
|
||||
fields.add(sourceOrderingField);
|
||||
Schema mergedSchema = Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), false);
|
||||
mergedSchema.setFields(fields);
|
||||
return mergedSchema;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -18,6 +18,8 @@
|
||||
|
||||
package org.apache.hudi.integ.testsuite.writer;
|
||||
|
||||
import org.apache.hudi.integ.testsuite.schema.SchemaUtils;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
|
||||
import java.io.IOException;
|
||||
@@ -30,22 +32,29 @@ import java.util.List;
|
||||
*/
|
||||
public class DFSDeltaWriterAdapter implements DeltaWriterAdapter<GenericRecord> {
|
||||
|
||||
private DeltaInputWriter deltaInputGenerator;
|
||||
private DeltaInputWriter deltaInputWriter;
|
||||
private List<DeltaWriteStats> metrics = new ArrayList<>();
|
||||
private int preCombineFieldVal = 0;
|
||||
|
||||
public DFSDeltaWriterAdapter(DeltaInputWriter<GenericRecord> deltaInputGenerator) {
|
||||
this.deltaInputGenerator = deltaInputGenerator;
|
||||
public DFSDeltaWriterAdapter(DeltaInputWriter<GenericRecord> deltaInputWriter, int preCombineFieldVal) {
|
||||
this.deltaInputWriter = deltaInputWriter;
|
||||
this.preCombineFieldVal = preCombineFieldVal;
|
||||
}
|
||||
|
||||
public DFSDeltaWriterAdapter(DeltaInputWriter<GenericRecord> deltaInputWriter) {
|
||||
this.deltaInputWriter = deltaInputWriter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<DeltaWriteStats> write(Iterator<GenericRecord> input) throws IOException {
|
||||
while (input.hasNext()) {
|
||||
GenericRecord next = input.next();
|
||||
if (this.deltaInputGenerator.canWrite()) {
|
||||
this.deltaInputGenerator.writeData(next);
|
||||
} else if (input.hasNext()) {
|
||||
next.put(SchemaUtils.SOURCE_ORDERING_FIELD, preCombineFieldVal);
|
||||
if (this.deltaInputWriter.canWrite()) {
|
||||
this.deltaInputWriter.writeData(next);
|
||||
} else {
|
||||
rollOver();
|
||||
this.deltaInputGenerator.writeData(next);
|
||||
this.deltaInputWriter.writeData(next);
|
||||
}
|
||||
}
|
||||
close();
|
||||
@@ -54,11 +63,11 @@ public class DFSDeltaWriterAdapter implements DeltaWriterAdapter<GenericRecord>
|
||||
|
||||
public void rollOver() throws IOException {
|
||||
close();
|
||||
this.deltaInputGenerator = this.deltaInputGenerator.getNewWriter();
|
||||
this.deltaInputWriter = this.deltaInputWriter.getNewWriter();
|
||||
}
|
||||
|
||||
private void close() throws IOException {
|
||||
this.deltaInputGenerator.close();
|
||||
this.metrics.add(this.deltaInputGenerator.getDeltaWriteStats());
|
||||
this.deltaInputWriter.close();
|
||||
this.metrics.add(this.deltaInputWriter.getDeltaWriteStats());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,16 +18,17 @@
|
||||
|
||||
package org.apache.hudi.integ.testsuite.writer;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
import org.apache.hudi.common.util.StringUtils;
|
||||
import org.apache.hudi.integ.testsuite.configuration.DFSDeltaConfig;
|
||||
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
|
||||
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
|
||||
|
||||
import org.apache.avro.generic.GenericRecord;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* A factory to help instantiate different {@link DeltaWriterAdapter}s depending on the {@link DeltaOutputMode} and
|
||||
* {@link DeltaInputType}.
|
||||
* A factory to help instantiate different {@link DeltaWriterAdapter}s depending on the {@link DeltaOutputMode} and {@link DeltaInputType}.
|
||||
*/
|
||||
public class DeltaWriterFactory {
|
||||
|
||||
@@ -44,9 +45,9 @@ public class DeltaWriterFactory {
|
||||
DeltaInputWriter<GenericRecord> fileDeltaInputGenerator = new AvroFileDeltaInputWriter(
|
||||
dfsDeltaConfig.getConfiguration(),
|
||||
StringUtils
|
||||
.join(new String[]{dfsDeltaConfig.getDeltaBasePath(), dfsDeltaConfig.getBatchId().toString()},
|
||||
.join(new String[] {dfsDeltaConfig.getDeltaBasePath(), dfsDeltaConfig.getBatchId().toString()},
|
||||
"/"), dfsDeltaConfig.getSchemaStr(), dfsDeltaConfig.getMaxFileSize());
|
||||
return new DFSDeltaWriterAdapter(fileDeltaInputGenerator);
|
||||
return new DFSDeltaWriterAdapter(fileDeltaInputGenerator, batchId);
|
||||
default:
|
||||
throw new IllegalArgumentException("Invalid delta input format " + config.getDeltaInputType());
|
||||
}
|
||||
|
||||
@@ -18,17 +18,18 @@
|
||||
|
||||
package org.apache.hudi.integ.testsuite.configuration;
|
||||
|
||||
import static junit.framework.Assert.assertTrue;
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
|
||||
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
|
||||
import org.apache.hudi.integ.testsuite.dag.nodes.InsertNode;
|
||||
import org.apache.hudi.integ.testsuite.dag.nodes.UpsertNode;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode;
|
||||
import org.apache.hudi.integ.testsuite.dag.nodes.InsertNode;
|
||||
import org.apache.hudi.integ.testsuite.dag.nodes.UpsertNode;
|
||||
import org.apache.hudi.integ.testsuite.dag.WorkflowDag;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import static junit.framework.Assert.assertTrue;
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
|
||||
/**
|
||||
* Unit test for the build process of {@link DagNode} and {@link WorkflowDag}.
|
||||
|
||||
@@ -47,6 +47,9 @@ public class TestDagUtils {
|
||||
public void testConvertYamlToDag() throws Exception {
|
||||
WorkflowDag dag = DagUtils.convertYamlToDag(UtilitiesTestBase.Helpers
|
||||
.readFileFromAbsolutePath((System.getProperty("user.dir") + "/.." + COW_DAG_DOCKER_DEMO_RELATIVE_PATH)));
|
||||
assertEquals(dag.getDagName(), "unit-test-cow-dag");
|
||||
assertEquals(dag.getRounds(), 1);
|
||||
assertEquals(dag.getIntermittentDelayMins(), 10);
|
||||
assertEquals(dag.getNodeList().size(), 1);
|
||||
Assertions.assertEquals(((DagNode) dag.getNodeList().get(0)).getParentNodes().size(), 0);
|
||||
assertEquals(((DagNode) dag.getNodeList().get(0)).getChildNodes().size(), 1);
|
||||
|
||||
@@ -13,58 +13,62 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
first_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 2
|
||||
num_records_insert: 100
|
||||
type: InsertNode
|
||||
deps: none
|
||||
second_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 100
|
||||
type: InsertNode
|
||||
deps: first_insert
|
||||
first_rollback:
|
||||
config:
|
||||
deps: second_insert
|
||||
type: RollbackNode
|
||||
third_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 100
|
||||
type: InsertNode
|
||||
deps: first_rollback
|
||||
first_upsert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_upsert: 1
|
||||
repeat_count: 1
|
||||
num_records_upsert: 100
|
||||
type: UpsertNode
|
||||
deps: third_insert
|
||||
first_hive_sync:
|
||||
config:
|
||||
queue_name: "adhoc"
|
||||
engine: "mr"
|
||||
type: HiveSyncNode
|
||||
deps: first_upsert
|
||||
first_hive_query:
|
||||
config:
|
||||
hive_props:
|
||||
prop2: "set spark.yarn.queue="
|
||||
prop3: "set hive.strict.checks.large.query=false"
|
||||
prop4: "set hive.stats.autogather=false"
|
||||
hive_queries:
|
||||
query1: "select count(*) from testdb1.table1"
|
||||
result1: 300
|
||||
query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1"
|
||||
result2: 0
|
||||
type: HiveQueryNode
|
||||
deps: first_hive_sync
|
||||
dag_name: unit-test-cow-dag
|
||||
dag_rounds: 1
|
||||
dag_intermittent_delay_mins: 10
|
||||
dag_content:
|
||||
first_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 2
|
||||
num_records_insert: 100
|
||||
type: InsertNode
|
||||
deps: none
|
||||
second_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 100
|
||||
type: InsertNode
|
||||
deps: first_insert
|
||||
first_rollback:
|
||||
config:
|
||||
deps: second_insert
|
||||
type: RollbackNode
|
||||
third_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 100
|
||||
type: InsertNode
|
||||
deps: first_rollback
|
||||
first_upsert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_upsert: 1
|
||||
repeat_count: 1
|
||||
num_records_upsert: 100
|
||||
type: UpsertNode
|
||||
deps: third_insert
|
||||
first_hive_sync:
|
||||
config:
|
||||
queue_name: "adhoc"
|
||||
engine: "mr"
|
||||
type: HiveSyncNode
|
||||
deps: first_upsert
|
||||
first_hive_query:
|
||||
config:
|
||||
hive_props:
|
||||
prop2: "set spark.yarn.queue="
|
||||
prop3: "set hive.strict.checks.large.query=false"
|
||||
prop4: "set hive.stats.autogather=false"
|
||||
hive_queries:
|
||||
query1: "select count(*) from testdb1.table1"
|
||||
result1: 300
|
||||
query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1"
|
||||
result2: 0
|
||||
type: HiveQueryNode
|
||||
deps: first_hive_sync
|
||||
@@ -13,58 +13,62 @@
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
first_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 2
|
||||
num_records_insert: 100
|
||||
type: InsertNode
|
||||
deps: none
|
||||
second_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 100
|
||||
type: InsertNode
|
||||
deps: first_insert
|
||||
first_rollback:
|
||||
config:
|
||||
deps: second_insert
|
||||
type: RollbackNode
|
||||
third_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 100
|
||||
type: InsertNode
|
||||
deps: first_rollback
|
||||
first_upsert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_upsert: 1
|
||||
repeat_count: 1
|
||||
num_records_upsert: 100
|
||||
type: UpsertNode
|
||||
deps: third_insert
|
||||
first_hive_sync:
|
||||
config:
|
||||
queue_name: "adhoc"
|
||||
engine: "mr"
|
||||
type: HiveSyncNode
|
||||
deps: first_upsert
|
||||
first_hive_query:
|
||||
config:
|
||||
hive_props:
|
||||
prop2: "set spark.yarn.queue="
|
||||
prop3: "set hive.strict.checks.large.query=false"
|
||||
prop4: "set hive.stats.autogather=false"
|
||||
hive_queries:
|
||||
query1: "select count(*) from testdb1.table1"
|
||||
result1: 300
|
||||
query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1"
|
||||
result2: 0
|
||||
type: HiveQueryNode
|
||||
deps: first_hive_sync
|
||||
dag_name: unit-test-mor-dag
|
||||
dag_rounds: 1
|
||||
dag_intermittent_delay_mins: 10
|
||||
dag_content:
|
||||
first_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 2
|
||||
num_records_insert: 100
|
||||
type: InsertNode
|
||||
deps: none
|
||||
second_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 100
|
||||
type: InsertNode
|
||||
deps: first_insert
|
||||
first_rollback:
|
||||
config:
|
||||
deps: second_insert
|
||||
type: RollbackNode
|
||||
third_insert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_insert: 1
|
||||
repeat_count: 1
|
||||
num_records_insert: 100
|
||||
type: InsertNode
|
||||
deps: first_rollback
|
||||
first_upsert:
|
||||
config:
|
||||
record_size: 70000
|
||||
num_partitions_upsert: 1
|
||||
repeat_count: 1
|
||||
num_records_upsert: 100
|
||||
type: UpsertNode
|
||||
deps: third_insert
|
||||
first_hive_sync:
|
||||
config:
|
||||
queue_name: "adhoc"
|
||||
engine: "mr"
|
||||
type: HiveSyncNode
|
||||
deps: first_upsert
|
||||
first_hive_query:
|
||||
config:
|
||||
hive_props:
|
||||
prop2: "set spark.yarn.queue="
|
||||
prop3: "set hive.strict.checks.large.query=false"
|
||||
prop4: "set hive.stats.autogather=false"
|
||||
hive_queries:
|
||||
query1: "select count(*) from testdb1.table1"
|
||||
result1: 300
|
||||
query2: "select count(*) from testdb1.table1 group by `_row_key` having count(*) > 1"
|
||||
result2: 0
|
||||
type: HiveQueryNode
|
||||
deps: first_hive_sync
|
||||
@@ -46,9 +46,9 @@ public class FilebasedSchemaProvider extends SchemaProvider {
|
||||
|
||||
private final FileSystem fs;
|
||||
|
||||
private final Schema sourceSchema;
|
||||
protected Schema sourceSchema;
|
||||
|
||||
private Schema targetSchema;
|
||||
protected Schema targetSchema;
|
||||
|
||||
public FilebasedSchemaProvider(TypedProperties props, JavaSparkContext jssc) {
|
||||
super(props, jssc);
|
||||
|
||||
Reference in New Issue
Block a user