From d58a8348dc729c83c60025fc8dbed3ecec28578b Mon Sep 17 00:00:00 2001 From: Sivabalan Narayanan Date: Wed, 21 Jul 2021 00:11:01 -0400 Subject: [PATCH] [HUDI-2007] Fixing hudi_test_suite for spark nodes and adding spark bulk_insert node (#3074) --- hudi-integ-test/pom.xml | 72 +++++++++++++++++++ .../testsuite/configuration/DeltaConfig.java | 5 ++ .../dag/nodes/SparkBulkInsertNode.scala | 66 +++++++++++++++++ 3 files changed, 143 insertions(+) create mode 100644 hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkBulkInsertNode.scala diff --git a/hudi-integ-test/pom.xml b/hudi-integ-test/pom.xml index b6b37c78d..e92beee7f 100644 --- a/hudi-integ-test/pom.xml +++ b/hudi-integ-test/pom.xml @@ -396,6 +396,13 @@ test + + org.scalatest + scalatest_${scala.binary.version} + ${scalatest.version} + test + + @@ -407,7 +414,49 @@ + + + + net.alchim31.maven + scala-maven-plugin + ${scala-maven-plugin.version} + + + -nobootcp + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + + + + + net.alchim31.maven + scala-maven-plugin + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + org.codehaus.mojo exec-maven-plugin @@ -430,6 +479,29 @@ + + org.scalatest + scalatest-maven-plugin + 1.0 + + ${skipUTs} + ${project.build.directory}/surefire-reports + . + TestSuite.txt + + + + test + + test + + + + + + org.scalastyle + scalastyle-maven-plugin + org.jacoco jacoco-maven-plugin diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java index 9e29c9b10..d8ea946fe 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/configuration/DeltaConfig.java @@ -94,6 +94,7 @@ public class DeltaConfig implements Serializable { private static String VALIDATE_CLEAN = "validate_clean"; private static String SCHEMA_VERSION = "schema_version"; private static String NUM_ROLLBACKS = "num_rollbacks"; + private static String ENABLE_ROW_WRITING = "enable_row_writing"; private Map configsMap; @@ -189,6 +190,10 @@ public class DeltaConfig implements Serializable { return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_CLEAN, false).toString()); } + public boolean enableRowWriting() { + return Boolean.valueOf(configsMap.getOrDefault(ENABLE_ROW_WRITING, false).toString()); + } + public Map getOtherConfigs() { if (configsMap == null) { return new HashMap<>(); diff --git a/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkBulkInsertNode.scala b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkBulkInsertNode.scala new file mode 100644 index 000000000..af3df6f70 --- /dev/null +++ b/hudi-integ-test/src/main/scala/org/apache/hudi/integ/testsuite/dag/nodes/SparkBulkInsertNode.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.integ.testsuite.dag.nodes + +import org.apache.hudi.client.WriteStatus +import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config +import org.apache.hudi.integ.testsuite.dag.ExecutionContext +import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SaveMode + +import scala.collection.JavaConverters._ + +/** + * Spark datasource based bulk insert node + * @param config1 + */ +class SparkBulkInsertNode(config1: Config) extends DagNode[RDD[WriteStatus]] { + + config = config1 + + /** + * Execute the {@link DagNode}. + * + * @param context The context needed for an execution of a node. + * @param curItrCount iteration count for executing the node. + * @throws Exception Thrown if the execution failed. + */ + override def execute(context: ExecutionContext, curItrCount: Int): Unit = { + if (!config.isDisableGenerate) { + context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).count() + } + val inputDF = AvroConversionUtils.createDataFrame(context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch, + context.getWriterContext.getHoodieTestSuiteWriter.getSchema, + context.getWriterContext.getSparkSession) + val saveMode = if(curItrCount == 0) SaveMode.Overwrite else SaveMode.Append + inputDF.write.format("hudi") + .options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap)) + .option(DataSourceWriteOptions.TABLE_NAME_OPT_KEY.key(), context.getHoodieTestSuiteWriter.getCfg.targetTableName) + .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key(), context.getHoodieTestSuiteWriter.getCfg.tableType) + .option(DataSourceWriteOptions.OPERATION_OPT_KEY.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL) + .option(DataSourceWriteOptions.ENABLE_ROW_WRITER_OPT_KEY.key(), String.valueOf(config.enableRowWriting())) + .option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX_OPT_KEY.key(), "deltastreamer.checkpoint.key") + .option("deltastreamer.checkpoint.key", context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse("")) + .option(HoodieWriteConfig.TABLE_NAME.key(), context.getHoodieTestSuiteWriter.getCfg.targetTableName) + .mode(saveMode) + .save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath) + } +}