[HUDI-4018][HUDI-4027] Adding integ test yamls for immutable use-cases. Added delete partition support to integ tests (#5501)
- Added pure immutable test yamls to integ test framework. Added SparkBulkInsertNode as part of it. - Added delete_partition support to integ test framework using spark-datasource. - Added a single yaml to test all non core write operations (insert overwrite, insert overwrite table and delete partitions) - Added tests for 4 concurrent spark datasource writers (multi-writer tests). - Fixed readme w/ sample commands for multi-writer.
This commit is contained in:
committed by
GitHub
parent
ecd47e7aae
commit
0cec955fa2
@@ -30,6 +30,7 @@ import org.apache.spark.sql.SparkSession;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
@@ -116,6 +117,7 @@ public class HoodieMultiWriterTestSuiteJob {
|
||||
}
|
||||
|
||||
ExecutorService executor = Executors.newFixedThreadPool(inputPaths.length);
|
||||
Random random = new Random();
|
||||
|
||||
List<HoodieTestSuiteJob.HoodieTestSuiteConfig> testSuiteConfigList = new ArrayList<>();
|
||||
int jobIndex = 0;
|
||||
@@ -131,11 +133,20 @@ public class HoodieMultiWriterTestSuiteJob {
|
||||
|
||||
AtomicBoolean jobFailed = new AtomicBoolean(false);
|
||||
AtomicInteger counter = new AtomicInteger(0);
|
||||
List<Long> waitTimes = new ArrayList<>();
|
||||
for (int i = 0;i < jobIndex ;i++) {
|
||||
if (i == 0) {
|
||||
waitTimes.add(0L);
|
||||
} else {
|
||||
// every job after 1st, will start after 1 min + some delta.
|
||||
waitTimes.add(60000L + random.nextInt(10000));
|
||||
}
|
||||
}
|
||||
List<CompletableFuture<Boolean>> completableFutureList = new ArrayList<>();
|
||||
testSuiteConfigList.forEach(hoodieTestSuiteConfig -> {
|
||||
try {
|
||||
// start each job at 20 seconds interval so that metaClient instantiation does not overstep
|
||||
Thread.sleep(counter.get() * 20000);
|
||||
Thread.sleep(waitTimes.get(counter.get()));
|
||||
LOG.info("Starting job " + hoodieTestSuiteConfig.toString());
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
|
||||
@@ -101,6 +101,8 @@ public class DeltaConfig implements Serializable {
|
||||
private static String ENABLE_METADATA_VALIDATE = "enable_metadata_validate";
|
||||
private static String VALIDATE_FULL_DATA = "validate_full_data";
|
||||
private static String DELETE_INPUT_DATA_EXCEPT_LATEST = "delete_input_data_except_latest";
|
||||
private static String PARTITIONS_TO_DELETE = "partitions_to_delete";
|
||||
private static String INPUT_PARTITIONS_TO_SKIP_VALIDATE = "input_partitions_to_skip_validate";
|
||||
|
||||
// Spark SQL Create Table
|
||||
private static String TABLE_TYPE = "table_type";
|
||||
@@ -203,6 +205,10 @@ public class DeltaConfig implements Serializable {
|
||||
return Boolean.valueOf(configsMap.getOrDefault(DISABLE_INGEST, false).toString());
|
||||
}
|
||||
|
||||
public String getPartitionsToDelete() {
|
||||
return configsMap.getOrDefault(PARTITIONS_TO_DELETE, "").toString();
|
||||
}
|
||||
|
||||
public boolean getReinitContext() {
|
||||
return Boolean.valueOf(configsMap.getOrDefault(REINIT_CONTEXT, false).toString());
|
||||
}
|
||||
@@ -223,6 +229,10 @@ public class DeltaConfig implements Serializable {
|
||||
return Integer.valueOf(configsMap.getOrDefault(VALIDATE_ONCE_EVERY_ITR, 1).toString());
|
||||
}
|
||||
|
||||
public String inputPartitonsToSkipWithValidate() {
|
||||
return configsMap.getOrDefault(INPUT_PARTITIONS_TO_SKIP_VALIDATE, "").toString();
|
||||
}
|
||||
|
||||
public boolean isValidateFullData() {
|
||||
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_FULL_DATA, false).toString());
|
||||
}
|
||||
|
||||
@@ -163,8 +163,13 @@ public abstract class BaseValidateDatasetNode extends DagNode<Boolean> {
|
||||
// todo: fix hard coded fields from configs.
|
||||
// read input and resolve insert, updates, etc.
|
||||
Dataset<Row> inputDf = session.read().format("avro").load(inputPath);
|
||||
Dataset<Row> trimmedDf = inputDf;
|
||||
if (!config.inputPartitonsToSkipWithValidate().isEmpty()) {
|
||||
trimmedDf = inputDf.filter("instr("+partitionPathField+", \'"+ config.inputPartitonsToSkipWithValidate() +"\') != 1");
|
||||
}
|
||||
|
||||
ExpressionEncoder encoder = getEncoder(inputDf.schema());
|
||||
return inputDf.groupByKey(
|
||||
return trimmedDf.groupByKey(
|
||||
(MapFunction<Row, String>) value ->
|
||||
(partitionPathField.isEmpty() ? value.getAs(recordKeyField) : (value.getAs(partitionPathField) + "+" + value.getAs(recordKeyField))), Encoders.STRING())
|
||||
.reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
|
||||
|
||||
@@ -123,7 +123,7 @@ public class DeltaGenerator implements Serializable {
|
||||
int startPartition = operation.getStartPartition();
|
||||
|
||||
// Each spark partition below will generate records for a single partition given by the integer index.
|
||||
List<Integer> partitionIndexes = IntStream.rangeClosed(0 + startPartition, numPartitions + startPartition)
|
||||
List<Integer> partitionIndexes = IntStream.rangeClosed(0 + startPartition, numPartitions + startPartition - 1)
|
||||
.boxed().collect(Collectors.toList());
|
||||
|
||||
JavaRDD<GenericRecord> inputBatch = jsc.parallelize(partitionIndexes, numPartitions)
|
||||
|
||||
@@ -338,7 +338,7 @@ public class GenericRecordFullPayloadGenerator implements Serializable {
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public GenericRecord updateTimestamp(GenericRecord record, String fieldName) {
|
||||
long delta = TimeUnit.SECONDS.convert((++partitionIndex % numDatePartitions) + startPartition, TimeUnit.DAYS);
|
||||
long delta = TimeUnit.SECONDS.convert((partitionIndex++ % numDatePartitions) + startPartition, TimeUnit.DAYS);
|
||||
record.put(fieldName, delta);
|
||||
return record;
|
||||
}
|
||||
|
||||
@@ -19,49 +19,18 @@
|
||||
package org.apache.hudi.integ.testsuite.dag.nodes
|
||||
|
||||
import org.apache.hudi.client.WriteStatus
|
||||
import org.apache.hudi.config.HoodieWriteConfig
|
||||
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
|
||||
import org.apache.hudi.integ.testsuite.dag.ExecutionContext
|
||||
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions}
|
||||
import org.apache.hudi.DataSourceWriteOptions
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.SaveMode
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
/**
|
||||
* Spark datasource based bulk insert node
|
||||
*
|
||||
* @param dagNodeConfig DAG node configurations.
|
||||
*/
|
||||
class SparkBulkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] {
|
||||
class SparkBulkInsertNode(dagNodeConfig: Config) extends SparkInsertNode(dagNodeConfig) {
|
||||
|
||||
config = dagNodeConfig
|
||||
|
||||
/**
|
||||
* Execute the {@link DagNode}.
|
||||
*
|
||||
* @param context The context needed for an execution of a node.
|
||||
* @param curItrCount iteration count for executing the node.
|
||||
* @throws Exception Thrown if the execution failed.
|
||||
*/
|
||||
override def execute(context: ExecutionContext, curItrCount: Int): Unit = {
|
||||
if (!config.isDisableGenerate) {
|
||||
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).getValue().count()
|
||||
}
|
||||
val inputDF = AvroConversionUtils.createDataFrame(context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch,
|
||||
context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
|
||||
context.getWriterContext.getSparkSession)
|
||||
val saveMode = if(curItrCount == 0) SaveMode.Overwrite else SaveMode.Append
|
||||
inputDF.write.format("hudi")
|
||||
.options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap))
|
||||
.option(DataSourceWriteOptions.TABLE_NAME.key(), context.getHoodieTestSuiteWriter.getCfg.targetTableName)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE.key(), context.getHoodieTestSuiteWriter.getCfg.tableType)
|
||||
.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
|
||||
.option(DataSourceWriteOptions.ENABLE_ROW_WRITER.key(), String.valueOf(config.enableRowWriting()))
|
||||
.option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX.key(), "deltastreamer.checkpoint.key")
|
||||
.option("deltastreamer.checkpoint.key", context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse(""))
|
||||
.option(HoodieWriteConfig.TBL_NAME.key(), context.getHoodieTestSuiteWriter.getCfg.targetTableName)
|
||||
.mode(saveMode)
|
||||
.save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)
|
||||
override def getOperation(): String = {
|
||||
DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,70 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hudi.integ.testsuite.dag.nodes
|
||||
|
||||
|
||||
import org.apache.avro.Schema
|
||||
import org.apache.hudi.client.WriteStatus
|
||||
import org.apache.hudi.common.util.collection.Pair
|
||||
import org.apache.hudi.config.HoodieWriteConfig
|
||||
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
|
||||
import org.apache.hudi.integ.testsuite.dag.ExecutionContext
|
||||
import org.apache.hudi.integ.testsuite.schema.SchemaUtils
|
||||
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats
|
||||
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, HoodieSparkUtils}
|
||||
import org.apache.log4j.LogManager
|
||||
import org.apache.spark.api.java.JavaRDD
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.SaveMode
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
/**
|
||||
* Spark datasource based insert node
|
||||
*
|
||||
* @param dagNodeConfig DAG node configurations.
|
||||
*/
|
||||
class SparkDeletePartitionNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] {
|
||||
|
||||
private val log = LogManager.getLogger(getClass)
|
||||
config = dagNodeConfig
|
||||
|
||||
/**
|
||||
* Execute the {@link DagNode}.
|
||||
*
|
||||
* @param context The context needed for an execution of a node.
|
||||
* @param curItrCount iteration count for executing the node.
|
||||
* @throws Exception Thrown if the execution failed.
|
||||
*/
|
||||
override def execute(context: ExecutionContext, curItrCount: Int): Unit = {
|
||||
println("Generating input data for node {}", this.getName)
|
||||
|
||||
context.getWriterContext.getSparkSession.emptyDataFrame.write.format("hudi")
|
||||
.options(DataSourceWriteOptions.translateSqlOptions(context.getWriterContext.getProps.asScala.toMap))
|
||||
.option(DataSourceWriteOptions.PRECOMBINE_FIELD.key(), SchemaUtils.SOURCE_ORDERING_FIELD)
|
||||
.option(DataSourceWriteOptions.TABLE_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName)
|
||||
.option(DataSourceWriteOptions.TABLE_TYPE.key, context.getHoodieTestSuiteWriter.getCfg.tableType)
|
||||
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.DELETE_PARTITION_OPERATION_OPT_VAL)
|
||||
.option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName)
|
||||
.option(DataSourceWriteOptions.PARTITIONS_TO_DELETE.key, config.getPartitionsToDelete)
|
||||
.mode(SaveMode.Append)
|
||||
.save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user