diff --git a/docker/demo/config/test-suite/templates/clustering.yaml.template b/docker/demo/config/test-suite/templates/clustering.yaml.template new file mode 100644 index 000000000..7b33423e7 --- /dev/null +++ b/docker/demo/config/test-suite/templates/clustering.yaml.template @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: NAME-clustering.yaml +dag_rounds: clustering_num_iterations +dag_intermittent_delay_mins: clustering_delay_in_mins +dag_content: + first_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 1000 + type: InsertNode + deps: none + second_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 10000 + deps: first_insert + type: InsertNode + third_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 300 + deps: second_insert + type: InsertNode + first_delete: + config: + num_partitions_delete: 1 + num_records_delete: 9000 + type: DeleteNode + deps: third_insert + first_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_delete + first_validate: + config: + validate_hive: true + type: ValidateDatasetNode + deps: first_hive_sync + first_cluster: + config: + execute_itr_count: clustering_itr_count + type: ClusteringNode + deps: first_validate + second_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_cluster + second_validate: + config: + validate_hive: true + type: ValidateDatasetNode + deps: second_hive_sync diff --git a/docker/demo/config/test-suite/templates/long-running.yaml.template b/docker/demo/config/test-suite/templates/long-running.yaml.template new file mode 100644 index 000000000..b6392967e --- /dev/null +++ b/docker/demo/config/test-suite/templates/long-running.yaml.template @@ -0,0 +1,89 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: NAME-long-running-multi-partitions.yaml +dag_rounds: num_iterations +dag_intermittent_delay_mins: delay_in_mins +dag_content: + first_insert: + config: + record_size: 1000 + num_partitions_insert: 5 + repeat_count: 1 + num_records_insert: 1000 + type: InsertNode + deps: none + second_insert: + config: + record_size: 1000 + num_partitions_insert: 50 + repeat_count: 1 + num_records_insert: 10000 + deps: first_insert + type: InsertNode + third_insert: + config: + record_size: 1000 + num_partitions_insert: 2 + repeat_count: 1 + num_records_insert: 300 + deps: second_insert + type: InsertNode + first_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: third_insert + first_validate: + config: + validate_hive: true + type: ValidateDatasetNode + deps: first_hive_sync + first_upsert: + config: + record_size: 1000 + num_partitions_insert: 2 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 100 + num_partitions_upsert: 1 + type: UpsertNode + deps: first_validate + first_delete: + config: + num_partitions_delete: 50 + num_records_delete: 8000 + type: DeleteNode + deps: first_upsert + second_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_delete + second_validate: + config: + validate_hive: true + delete_input_data: true + type: ValidateDatasetNode + deps: second_hive_sync + last_validate: + config: + execute_itr_count: 50 + validate_clean: true + validate_archival: true + type: ValidateAsyncOperations + deps: second_validate diff --git a/docker/demo/config/test-suite/templates/spark_command.txt.template b/docker/demo/config/test-suite/templates/spark_command.txt.template new file mode 100644 index 000000000..2310a7e0b --- /dev/null +++ b/docker/demo/config/test-suite/templates/spark_command.txt.template @@ -0,0 +1,34 @@ +spark-submit \ +--packages org.apache.spark:spark-avro_2.11:2.4.0 \ +--conf spark.task.cpus=1 \ +--conf spark.executor.cores=1 \ +--conf spark.task.maxFailures=100 \ +--conf spark.memory.fraction=0.4 \ +--conf spark.rdd.compress=true \ +--conf spark.kryoserializer.buffer.max=2000m \ +--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ +--conf spark.memory.storageFraction=0.1 \ +--conf spark.shuffle.service.enabled=true \ +--conf spark.sql.hive.convertMetastoreParquet=false \ +--conf spark.driver.maxResultSize=12g \ +--conf spark.executor.heartbeatInterval=120s \ +--conf spark.network.timeout=600s \ +--conf spark.yarn.max.executor.failures=10 \ +--conf spark.sql.catalogImplementation=hive \ +--class org.apache.hudi.integ.testsuite.HoodieTestSuiteJob \ +/opt/JAR_NAME \ +--source-ordering-field test_suite_source_ordering_field \ +--use-deltastreamer \ +--target-base-path OUTPUT_PATH \ +--input-base-path INPUT_PATH \ +--target-table table1 \ +--props file:/opt/staging/test.properties \ +--schemaprovider-class org.apache.hudi.integ.testsuite.schema.TestSuiteFileBasedSchemaProvider \ +--source-class org.apache.hudi.utilities.sources.AvroDFSSource \ +--input-file-size 125829120 \ +--workload-yaml-path file:/opt/staging/input_yaml \ +--workload-generator-classname org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \ +--table-type TABLE_TYPE \ +--compact-scheduling-minshare 1 \ +--clean-input \ +--clean-output \ No newline at end of file diff --git a/docker/demo/config/test-suite/templates/test.properties.template b/docker/demo/config/test-suite/templates/test.properties.template new file mode 100644 index 000000000..e1b65fb73 --- /dev/null +++ b/docker/demo/config/test-suite/templates/test.properties.template @@ -0,0 +1,50 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +hoodie.insert.shuffle.parallelism=100 +hoodie.upsert.shuffle.parallelism=100 +hoodie.bulkinsert.shuffle.parallelism=100 + +hoodie.deltastreamer.source.test.num_partitions=100 +hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys=false +hoodie.deltastreamer.source.test.max_unique_records=100000000 +hoodie.embed.timeline.server=false +hoodie.deltastreamer.source.input.selector=org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector + +hoodie.datasource.hive_sync.skip_ro_suffix=true + +hoodie.datasource.write.recordkey.field=_row_key +hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator +hoodie.datasource.write.partitionpath.field=timestamp + +hoodie.clustering.plan.strategy.sort.columns=_row_key +hoodie.clustering.plan.strategy.daybased.lookback.partitions=0 +hoodie.clustering.inline.max.commits=1 + +hoodie.deltastreamer.source.dfs.root=INPUT_PATH +hoodie.deltastreamer.schemaprovider.target.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc +hoodie.deltastreamer.schemaprovider.source.schema.file=file:/var/hoodie/ws/docker/demo/config/test-suite/source.avsc +hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP +hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd + +hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://hiveserver:10000/ +hoodie.datasource.hive_sync.database=testdb +hoodie.datasource.hive_sync.table=table1 +hoodie.datasource.hive_sync.assume_date_partitioning=false +hoodie.datasource.hive_sync.partition_fields=_hoodie_partition_path +hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor + diff --git a/docker/generate_test_suite.sh b/docker/generate_test_suite.sh new file mode 100755 index 000000000..60655a7a5 --- /dev/null +++ b/docker/generate_test_suite.sh @@ -0,0 +1,172 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +NUM_ITR=1 +DELAY_MINS=1 +TABLE_TYPE=COPY_ON_WRITE +INCLUDE_CLUSTER_YAML=false +CLUSTER_NUM_ITR=2 +CLUSTER_DELAY_MINS=1 +CLUSTER_ITR_COUNT=1 +JAR_NAME=hudi-integ-test-bundle-0.9.0-SNAPSHOT.jar +INPUT_PATH="/user/hive/warehouse/hudi-integ-test-suite/input/" +OUTPUT_PATH="/user/hive/warehouse/hudi-integ-test-suite/output/" + +CUR_DIR=$(pwd) + +POSITIONAL=() +while [[ $# -gt 0 ]] +do +key="$1" + +case $key in + --num_iterations) + NUM_ITR="$2" + shift # past argument + shift # past value + ;; + --intermittent_delay_mins) + DELAY_MINS="$2" + shift # past argument + shift # past value + ;; + --table_type) + TABLE_TYPE="$2" + shift # past argument + shift # past value + ;; + --include_cluster_yaml) + INCLUDE_CLUSTER_YAML="$2" + shift # past argument + shift # past value + ;; + --cluster_num_itr) + CLUSTER_NUM_ITR="$2" + shift # past argument + shift # past value + ;; + --cluster_delay_mins) + CLUSTER_DELAY_MINS="$2" + shift # past argument + shift # past value + ;; + --cluster_exec_itr_count) + CLUSTER_ITR_COUNT="$2" + shift # past argument + shift # past value + ;; + --integ_test_jar_name) + JAR_NAME="$2" + shift # past argument + shift # past value + ;; + --input_path) + INPUT_PATH="$2" + shift # past argument + shift # past value + ;; + --output_path) + OUTPUT_PATH="$2" + shift # past argument + shift # past value + ;; + --default) + DEFAULT=YES + shift # past argument + ;; + *) # unknown option + POSITIONAL+=("$1") # save it in an array for later + shift # past argument + ;; +esac +done +set -- "${POSITIONAL[@]}" # restore positional parameters + +echo "Num Iterations = ${NUM_ITR}" +echo "Intermittent delay in mins = ${DELAY_MINS}" +echo "Table type = ${TABLE_TYPE}" +echo "Include cluster yaml $INCLUDE_CLUSTER_YAML" +echo "Cluster total itr count $CLUSTER_NUM_ITR" +echo "Cluster delay mins $CLUSTER_DELAY_MINS" +echo "Cluster exec itr count $CLUSTER_ITR_COUNT" +echo "Jar name $JAR_NAME" +INPUT_PATH=$(echo "$INPUT_PATH" | sed "s|\/|\\\/|g") +echo "Input path $INPUT_PATH" +OUTPUT_PATH=$(echo "$OUTPUT_PATH" | sed "s|\/|\\\/|g") +echo "Output path $OUTPUT_PATH" + +if [ ! -f $CUR_DIR/../packaging/hudi-integ-test-bundle/target/$JAR_NAME ]; then + echo "Integ test bundle not found at $CUR_DIR/../packaging/hudi-integ-test-bundle/target/$JAR_NAME" + exit 1 +fi + +if [ -d "demo/config/test-suite/staging" ]; then + echo "Cleaning up staging dir" + rm -rf demo/config/test-suite/staging* +fi + +if [ ! -d "demo/config/test-suite/staging" ]; then + echo "Creating staging dir" + mkdir demo/config/test-suite/staging +fi + +cp demo/config/test-suite/templates/long-running.yaml.template demo/config/test-suite/staging/long-running.yaml + +sed -i '' "s/NAME/$TABLE_TYPE/" demo/config/test-suite/staging/long-running.yaml +sed -i '' "s/num_iterations/$NUM_ITR/" demo/config/test-suite/staging/long-running.yaml +sed -i '' "s/delay_in_mins/$DELAY_MINS/" demo/config/test-suite/staging/long-running.yaml + +cp demo/config/test-suite/templates/test.properties.template demo/config/test-suite/staging/test.properties +sed -i '' "s/INPUT_PATH/$INPUT_PATH/" demo/config/test-suite/staging/test.properties + +cp demo/config/test-suite/templates/spark_command.txt.template demo/config/test-suite/staging/long_running_spark_command.sh + +sed -i '' "s/JAR_NAME/$JAR_NAME/" demo/config/test-suite/staging/long_running_spark_command.sh +sed -i '' "s/INPUT_PATH/$INPUT_PATH/" demo/config/test-suite/staging/long_running_spark_command.sh +sed -i '' "s/OUTPUT_PATH/$OUTPUT_PATH/" demo/config/test-suite/staging/long_running_spark_command.sh +sed -i '' "s/input_yaml/long-running.yaml/" demo/config/test-suite/staging/long_running_spark_command.sh +sed -i '' "s/TABLE_TYPE/$TABLE_TYPE/" demo/config/test-suite/staging/long_running_spark_command.sh + +if $INCLUDE_CLUSTER_YAML ; then + + cp demo/config/test-suite/templates/clustering.yaml.template demo/config/test-suite/staging/clustering.yaml + + sed -i '' "s/NAME/$TABLE_TYPE/" demo/config/test-suite/staging/clustering.yaml + sed -i '' "s/clustering_num_iterations/$CLUSTER_NUM_ITR/" demo/config/test-suite/staging/clustering.yaml + sed -i '' "s/clustering_delay_in_mins/$CLUSTER_DELAY_MINS/" demo/config/test-suite/staging/clustering.yaml + sed -i '' "s/clustering_itr_count/$CLUSTER_ITR_COUNT/" demo/config/test-suite/staging/clustering.yaml + + cp demo/config/test-suite/templates/spark_command.txt.template demo/config/test-suite/staging/clustering_spark_command.sh + + sed -i '' "s/JAR_NAME/$JAR_NAME/" demo/config/test-suite/staging/clustering_spark_command.sh + sed -i '' "s/INPUT_PATH/$INPUT_PATH/" demo/config/test-suite/staging/clustering_spark_command.sh + sed -i '' "s/OUTPUT_PATH/$OUTPUT_PATH/" demo/config/test-suite/staging/clustering_spark_command.sh + sed -i '' "s/input_yaml/clustering.yaml/" demo/config/test-suite/staging/clustering_spark_command.sh + sed -i '' "s/TABLE_TYPE/$TABLE_TYPE/" demo/config/test-suite/staging/clustering_spark_command.sh + sed -i '' "/use-deltastreamer/d" demo/config/test-suite/staging/clustering_spark_command.sh + +fi + +docker cp $CUR_DIR/../packaging/hudi-integ-test-bundle/target/$JAR_NAME adhoc-2:/opt/ +docker exec -it adhoc-2 /bin/bash rm -rf /opt/staging* +docker cp demo/config/test-suite/staging/ adhoc-2:/opt/ +docker exec -it adhoc-2 /bin/bash /opt/staging/long_running_spark_command.sh + +if [ -f demo/config/test-suite/staging/clustering_spark_command.sh ]; then + docker exec -it adhoc-2 /bin/bash /opt/staging/clustering_spark_command.sh +fi