1
0

[HUDI-2349] Adding spark delete node to integ test suite (#3528)

This commit is contained in:
Sivabalan Narayanan
2021-08-24 10:58:47 -04:00
committed by GitHub
parent de94787a85
commit 15bf01dcb7
5 changed files with 225 additions and 1 deletions

View File

@@ -0,0 +1,73 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: cow-spark-long-running-multi-partitions.yaml
dag_rounds: 50
dag_intermittent_delay_mins: 1
dag_content:
first_insert:
config:
record_size: 1000
num_partitions_insert: 50
repeat_count: 1
num_records_insert: 10000
type: SparkInsertNode
deps: none
first_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: first_insert
first_validate:
config:
validate_hive: true
type: ValidateDatasetNode
deps: first_hive_sync
first_upsert:
config:
record_size: 1000
num_partitions_insert: 50
num_records_insert: 300
repeat_count: 1
num_records_upsert: 3000
num_partitions_upsert: 50
type: SparkUpsertNode
deps: first_validate
first_delete:
config:
num_partitions_delete: 50
num_records_delete: 8000
type: SparkDeleteNode
deps: first_upsert
second_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: first_delete
second_validate:
config:
validate_hive: true
delete_input_data: true
type: ValidateDatasetNode
deps: second_hive_sync
last_validate:
config:
execute_itr_count: 50
validate_clean: true
validate_archival: true
type: ValidateAsyncOperations
deps: second_validate

View File

@@ -0,0 +1,66 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: cow-spark-simple.yaml
dag_rounds: 2
dag_intermittent_delay_mins: 1
dag_content:
first_insert:
config:
record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 100
type: SparkInsertNode
deps: none
first_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: first_insert
first_validate:
config:
validate_hive: true
type: ValidateDatasetNode
deps: first_hive_sync
first_upsert:
config:
record_size: 1000
num_partitions_insert: 1
num_records_insert: 50
repeat_count: 1
num_records_upsert: 100
num_partitions_upsert: 1
type: SparkUpsertNode
deps: first_validate
first_delete:
config:
num_partitions_delete: 1
num_records_delete: 30
type: SparkDeleteNode
deps: first_upsert
second_hive_sync:
config:
queue_name: "adhoc"
engine: "mr"
type: HiveSyncNode
deps: first_delete
second_validate:
config:
validate_hive: true
delete_input_data: false
type: ValidateDatasetNode
deps: second_hive_sync

View File

@@ -442,7 +442,7 @@ spark-submit \
--workload-generator-classname org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator \
--table-type COPY_ON_WRITE \
--compact-scheduling-minshare 1 \
--clean-input
--clean-input \
--clean-output
```

View File

@@ -131,6 +131,13 @@ public class HoodieTestSuiteWriter implements Serializable {
.getInsertValue(new Schema.Parser().parse(schema)).get()).rdd();
}
public void getNextBatchForDeletes() throws Exception {
Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> nextBatch = fetchSource();
lastCheckpoint = Option.of(nextBatch.getValue().getLeft());
JavaRDD<HoodieRecord> inputRDD = nextBatch.getRight().getRight();
inputRDD.collect();
}
public Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> fetchSource() throws Exception {
return this.deltaStreamerWrapper.fetchSource();
}

View File

@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.integ.testsuite.dag.nodes
import org.apache.avro.Schema
import org.apache.hudi.client.WriteStatus
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
import org.apache.hudi.integ.testsuite.dag.ExecutionContext
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkUtils}
import org.apache.log4j.LogManager
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SaveMode
import scala.collection.JavaConverters._
/**
* Spark datasource based upsert node
* @param config1
*/
class SparkDeleteNode(config1: Config) extends DagNode[RDD[WriteStatus]] {
private val log = LogManager.getLogger(getClass)
config = config1
/**
* Execute the {@link DagNode}.
*
* @param context The context needed for an execution of a node.
* @param curItrCount iteration count for executing the node.
* @throws Exception Thrown if the execution failed.
*/
override def execute(context: ExecutionContext, curItrCount: Int): Unit = {
if (!config.isDisableGenerate) {
println("Generating input data for node {}", this.getName)
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateDeletes(config)).count()
}
// Deletes can't be fetched using getNextBatch() bcoz, getInsert(schema) from payload will return empty for delete
// records
context.getWriterContext.getHoodieTestSuiteWriter.getNextBatchForDeletes()
val pathToRead = context.getWriterContext.getCfg.inputBasePath + "/" + context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse("")
val avroDf = context.getWriterContext.getSparkSession.read.format("avro").load(pathToRead)
val genRecsRDD = HoodieSparkUtils.createRdd(avroDf, "testStructName","testNamespace", false,
org.apache.hudi.common.util.Option.of(new Schema.Parser().parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema)))
val inputDF = AvroConversionUtils.createDataFrame(genRecsRDD,
context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
context.getWriterContext.getSparkSession)
inputDF.write.format("hudi")
.options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap))
.option(DataSourceWriteOptions.TABLE_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName)
.option(DataSourceWriteOptions.TABLE_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.tableType)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX.key, "deltastreamer.checkpoint.key")
.option("deltastreamer.checkpoint.key", context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse(""))
.option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName)
.mode(SaveMode.Append)
.save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)
}
}