diff --git a/docker/demo/config/test-suite/cow-spark-long-running.yaml b/docker/demo/config/test-suite/cow-spark-long-running.yaml new file mode 100644 index 000000000..493ad7a55 --- /dev/null +++ b/docker/demo/config/test-suite/cow-spark-long-running.yaml @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: cow-spark-long-running-multi-partitions.yaml +dag_rounds: 50 +dag_intermittent_delay_mins: 1 +dag_content: + first_insert: + config: + record_size: 1000 + num_partitions_insert: 50 + repeat_count: 1 + num_records_insert: 10000 + type: SparkInsertNode + deps: none + first_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_insert + first_validate: + config: + validate_hive: true + type: ValidateDatasetNode + deps: first_hive_sync + first_upsert: + config: + record_size: 1000 + num_partitions_insert: 50 + num_records_insert: 300 + repeat_count: 1 + num_records_upsert: 3000 + num_partitions_upsert: 50 + type: SparkUpsertNode + deps: first_validate + first_delete: + config: + num_partitions_delete: 50 + num_records_delete: 8000 + type: SparkDeleteNode + deps: first_upsert + second_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_delete + second_validate: + config: + validate_hive: true + delete_input_data: true + type: ValidateDatasetNode + deps: second_hive_sync + last_validate: + config: + execute_itr_count: 50 + validate_clean: true + validate_archival: true + type: ValidateAsyncOperations + deps: second_validate diff --git a/docker/demo/config/test-suite/cow-spark-simple.yaml b/docker/demo/config/test-suite/cow-spark-simple.yaml new file mode 100644 index 000000000..21e7e6bbe --- /dev/null +++ b/docker/demo/config/test-suite/cow-spark-simple.yaml @@ -0,0 +1,66 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +dag_name: cow-spark-simple.yaml +dag_rounds: 2 +dag_intermittent_delay_mins: 1 +dag_content: + first_insert: + config: + record_size: 1000 + num_partitions_insert: 1 + repeat_count: 1 + num_records_insert: 100 + type: SparkInsertNode + deps: none + first_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_insert + first_validate: + config: + validate_hive: true + type: ValidateDatasetNode + deps: first_hive_sync + first_upsert: + config: + record_size: 1000 + num_partitions_insert: 1 + num_records_insert: 50 + repeat_count: 1 + num_records_upsert: 100 + num_partitions_upsert: 1 + type: SparkUpsertNode + deps: first_validate + first_delete: + config: + num_partitions_delete: 1 + num_records_delete: 30 + type: SparkDeleteNode + deps: first_upsert + second_hive_sync: + config: + queue_name: "adhoc" + engine: "mr" + type: HiveSyncNode + deps: first_delete + second_validate: + config: + validate_hive: true + delete_input_data: false + type: ValidateDatasetNode + deps: second_hive_sync \ No newline at end of file diff --git a/hudi-integ-test/README.md b/hudi-integ-test/README.md index 68db715a3..4a9e9bc67 100644 --- a/hudi-integ-test/README.md +++ b/hudi-integ-test/README.md @@ -442,7 +442,7 @@ spark-submit \ --workload-generator-classname org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \ --table-type COPY_ON_WRITE \ --compact-scheduling-minshare 1 \ ---clean-input +--clean-input \ --clean-output ``` diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java index fee76ac82..b8229b474 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieTestSuiteWriter.java @@ -131,6 +131,13 @@ public class HoodieTestSuiteWriter implements Serializable { .getInsertValue(new Schema.Parser().parse(schema)).get()).rdd(); } + public void getNextBatchForDeletes() throws Exception { + Pair>> nextBatch = fetchSource(); + lastCheckpoint = Option.of(nextBatch.getValue().getLeft()); + JavaRDD inputRDD = nextBatch.getRight().getRight(); + inputRDD.collect(); + } + public Pair>> fetchSource() throws Exception { return this.deltaStreamerWrapper.fetchSource(); } diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeleteNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeleteNode.scala new file mode 100644 index 000000000..4ebd59d8f --- /dev/null +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkDeleteNode.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.dag.nodes + +import org.apache.avro.Schema +import org.apache.hudi.client.WriteStatus +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config +import org.apache.hudi.integ.testsuite.dag.ExecutionContext +import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkUtils} +import org.apache.log4j.LogManager +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SaveMode + +import scala.collection.JavaConverters._ + +/** + * Spark datasource based upsert node + * @param config1 + */ +class SparkDeleteNode(config1: Config) extends DagNode[RDD[WriteStatus]] { + + private val log = LogManager.getLogger(getClass) + config = config1 + + /** + * Execute the {@link DagNode}. + * + * @param context The context needed for an execution of a node. + * @param curItrCount iteration count for executing the node. + * @throws Exception Thrown if the execution failed. + */ + override def execute(context: ExecutionContext, curItrCount: Int): Unit = { + if (!config.isDisableGenerate) { + println("Generating input data for node {}", this.getName) + context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateDeletes(config)).count() + } + + // Deletes can't be fetched using getNextBatch() bcoz, getInsert(schema) from payload will return empty for delete + // records + context.getWriterContext.getHoodieTestSuiteWriter.getNextBatchForDeletes() + val pathToRead = context.getWriterContext.getCfg.inputBasePath + "/" + context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse("") + + val avroDf = context.getWriterContext.getSparkSession.read.format("avro").load(pathToRead) + val genRecsRDD = HoodieSparkUtils.createRdd(avroDf, "testStructName","testNamespace", false, + org.apache.hudi.common.util.Option.of(new Schema.Parser().parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema))) + + val inputDF = AvroConversionUtils.createDataFrame(genRecsRDD, + context.getWriterContext.getHoodieTestSuiteWriter.getSchema, + context.getWriterContext.getSparkSession) + inputDF.write.format("hudi") + .options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap)) + .option(DataSourceWriteOptions.TABLE_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName) + .option(DataSourceWriteOptions.TABLE_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.tableType) + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX.key, "deltastreamer.checkpoint.key") + .option("deltastreamer.checkpoint.key", context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse("")) + .option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName) + .mode(SaveMode.Append) + .save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath) + } +}