1
0

[HUDI-2388] Add DAG nodes for Spark SQL in integration test suite (#3583)

- Fixed validation in integ test suite for both deltastreamer and write client path. 

Co-authored-by: Sivabalan Narayanan <n.siva.b@gmail.com>
This commit is contained in:
Y Ethan Guo
2021-09-13 08:53:13 -07:00
committed by GitHub
parent 35a04c43a5
commit 5d60491f5b
29 changed files with 1865 additions and 153 deletions

View File

@@ -101,7 +101,7 @@ public class HoodieTestSuiteJob {
this.cfg = cfg;
this.jsc = jsc;
cfg.propsFilePath = FSUtils.addSchemeIfLocalPath(cfg.propsFilePath).toString();
this.sparkSession = SparkSession.builder().config(jsc.getConf()).getOrCreate();
this.sparkSession = SparkSession.builder().config(jsc.getConf()).enableHiveSupport().getOrCreate();
this.fs = FSUtils.getFs(cfg.inputBasePath, jsc.hadoopConfiguration());
this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), cfg.configs).getConfig();
log.info("Creating workload generator with configs : {}", props.toString());

View File

@@ -18,14 +18,14 @@
package org.apache.hudi.integ.testsuite.configuration;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.conf.Configuration;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
@@ -42,7 +42,7 @@ public class DeltaConfig implements Serializable {
private final SerializableConfiguration configuration;
public DeltaConfig(DeltaOutputMode deltaOutputMode, DeltaInputType deltaInputType,
SerializableConfiguration configuration) {
SerializableConfiguration configuration) {
this.deltaOutputMode = deltaOutputMode;
this.deltaInputType = deltaInputType;
this.configuration = configuration;
@@ -96,6 +96,33 @@ public class DeltaConfig implements Serializable {
private static String NUM_ROLLBACKS = "num_rollbacks";
private static String ENABLE_ROW_WRITING = "enable_row_writing";
// Spark SQL Create Table
private static String TABLE_TYPE = "table_type";
private static String IS_EXTERNAL = "is_external";
private static String USE_CTAS = "use_ctas";
private static String PRIMARY_KEY = "primary_key";
private static String PRE_COMBINE_FIELD = "pre_combine_field";
private static String PARTITION_FIELD = "partition_field";
// Spark SQL Merge
private static String MERGE_CONDITION = "merge_condition";
private static String DEFAULT_MERGE_CONDITION = "target._row_key = source._row_key";
private static String MERGE_MATCHED_ACTION = "matched_action";
private static String DEFAULT_MERGE_MATCHED_ACTION = "update set *";
private static String MERGE_NOT_MATCHED_ACTION = "not_matched_action";
private static String DEFAULT_MERGE_NOT_MATCHED_ACTION = "insert *";
// Spark SQL Update
// column to update. The logic is fixed, i.e., to do "fare = fare * 1.6". to be fixed.
private static String UPDATE_COLUMN = "update_column";
private static String DEFAULT_UPDATE_COLUMN = "fare";
private static String WHERE_CONDITION_COLUMN = "condition_column";
// the where condition expression is like "begin_lon between 0.1 and 0.2"
// the value range is determined by the ratio of records to update or delete
// only support numeric type column for now
private static String DEFAULT_WHERE_CONDITION_COLUMN = "begin_lon";
// the ratio range is between 0.01 and 1.0. The ratio is approximate to the actual ratio achieved
private static String RATIO_RECORDS_CHANGE = "ratio_records_change";
private static double DEFAULT_RATIO_RECORDS_CHANGE = 0.5;
private Map<String, Object> configsMap;
public Config(Map<String, Object> configsMap) {
@@ -194,6 +221,58 @@ public class DeltaConfig implements Serializable {
return Boolean.valueOf(configsMap.getOrDefault(ENABLE_ROW_WRITING, false).toString());
}
public Option<String> getTableType() {
return !configsMap.containsKey(TABLE_TYPE) ? Option.empty()
: Option.of(configsMap.get(TABLE_TYPE).toString());
}
public boolean shouldUseCtas() {
return Boolean.valueOf(configsMap.getOrDefault(USE_CTAS, false).toString());
}
public boolean isTableExternal() {
return Boolean.valueOf(configsMap.getOrDefault(IS_EXTERNAL, false).toString());
}
public Option<String> getPrimaryKey() {
return !configsMap.containsKey(PRIMARY_KEY) ? Option.empty()
: Option.of(configsMap.get(PRIMARY_KEY).toString());
}
public Option<String> getPreCombineField() {
return !configsMap.containsKey(PRE_COMBINE_FIELD) ? Option.empty()
: Option.of(configsMap.get(PRE_COMBINE_FIELD).toString());
}
public Option<String> getPartitionField() {
return !configsMap.containsKey(PARTITION_FIELD) ? Option.empty()
: Option.of(configsMap.get(PARTITION_FIELD).toString());
}
public String getMergeCondition() {
return configsMap.getOrDefault(MERGE_CONDITION, DEFAULT_MERGE_CONDITION).toString();
}
public String getMatchedAction() {
return configsMap.getOrDefault(MERGE_MATCHED_ACTION, DEFAULT_MERGE_MATCHED_ACTION).toString();
}
public String getNotMatchedAction() {
return configsMap.getOrDefault(MERGE_NOT_MATCHED_ACTION, DEFAULT_MERGE_NOT_MATCHED_ACTION).toString();
}
public String getUpdateColumn() {
return configsMap.getOrDefault(UPDATE_COLUMN, DEFAULT_UPDATE_COLUMN).toString();
}
public String getWhereConditionColumn() {
return configsMap.getOrDefault(WHERE_CONDITION_COLUMN, DEFAULT_WHERE_CONDITION_COLUMN).toString();
}
public double getRatioRecordsChange() {
return Double.valueOf(configsMap.getOrDefault(RATIO_RECORDS_CHANGE, DEFAULT_RATIO_RECORDS_CHANGE).toString());
}
public Map<String, Object> getOtherConfigs() {
if (configsMap == null) {
return new HashMap<>();

View File

@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.integ.testsuite.dag.nodes;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.schema.SchemaUtils;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
import java.util.List;
import java.util.stream.Collectors;
/**
* This nodes validates contents from input path are in tact with Hudi. By default no configs are required for this node. But there is an
* optional config "delete_input_data" that you can set for this node. If set, once validation completes, contents from inputPath are deleted. This will come in handy for long running test suites.
* README has more details under docker set up for usages of this node.
*/
public abstract class BaseValidateDatasetNode extends DagNode<Boolean> {
public BaseValidateDatasetNode(DeltaConfig.Config config) {
this.config = config;
}
/**
* @return {@link Logger} instance to use.
*/
public abstract Logger getLogger();
/**
* @param session {@link SparkSession} instance to use.
* @param context {@link ExecutionContext} instance to use.
* @param inputSchema input schema in {@link StructType}
* @return data in {@link Dataset<Row>} to validate.
*/
public abstract Dataset<Row> getDatasetToValidate(SparkSession session, ExecutionContext context,
StructType inputSchema);
@Override
public void execute(ExecutionContext context, int curItrCount) throws Exception {
SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
// todo: Fix partitioning schemes. For now, assumes data based partitioning.
String inputPath = context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
log.warn("Validation using data from input path " + inputPath);
// listing batches to be validated
String inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
if (log.isDebugEnabled()) {
FileSystem fs = new Path(inputPathStr)
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
log.info("fileStatuses length: " + fileStatuses.length);
for (FileStatus fileStatus : fileStatuses) {
log.debug("Listing all Micro batches to be validated :: " + fileStatus.getPath().toString());
}
}
Dataset<Row> inputSnapshotDf = getInputDf(context, session, inputPath);
// read from hudi and remove meta columns.
Dataset<Row> trimmedHudiDf = getDatasetToValidate(session, context, inputSnapshotDf.schema());
Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf);
long inputCount = inputSnapshotDf.count();
long outputCount = trimmedHudiDf.count();
log.debug("Input count: " + inputCount + "; output count: " + outputCount);
// the intersected df should be same as inputDf. if not, there is some mismatch.
if (outputCount == 0 || inputCount == 0 || inputSnapshotDf.except(intersectionDf).count() != 0) {
log.error("Data set validation failed. Total count in hudi " + outputCount + ", input df count " + inputCount);
throw new AssertionError("Hudi contents does not match contents input data. ");
}
if (config.isValidateHive()) {
String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key());
String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
log.warn("Validating hive table with db : " + database + " and table : " + tableName);
Dataset<Row> cowDf = session.sql("SELECT * FROM " + database + "." + tableName);
Dataset<Row> trimmedCowDf = cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
intersectionDf = inputSnapshotDf.intersect(trimmedCowDf);
outputCount = trimmedHudiDf.count();
log.warn("Input count: " + inputCount + "; output count: " + outputCount);
// the intersected df should be same as inputDf. if not, there is some mismatch.
if (outputCount == 0 || inputSnapshotDf.except(intersectionDf).count() != 0) {
log.error("Data set validation failed for COW hive table. Total count in hudi " + outputCount + ", input df count " + inputCount);
throw new AssertionError("Hudi hive table contents does not match contents input data. ");
}
}
// if delete input data is enabled, erase input data.
if (config.isDeleteInputData()) {
// clean up input data for current group of writes.
inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
FileSystem fs = new Path(inputPathStr)
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
for (FileStatus fileStatus : fileStatuses) {
log.debug("Micro batch to be deleted " + fileStatus.getPath().toString());
fs.delete(fileStatus.getPath(), true);
}
}
}
private Dataset<Row> getInputDf(ExecutionContext context, SparkSession session, String inputPath) {
String recordKeyField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key());
String partitionPathField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD().key());
// todo: fix hard coded fields from configs.
// read input and resolve insert, updates, etc.
Dataset<Row> inputDf = session.read().format("avro").load(inputPath);
ExpressionEncoder encoder = getEncoder(inputDf.schema());
return inputDf.groupByKey(
(MapFunction<Row, String>) value ->
value.getAs(partitionPathField) + "+" + value.getAs(recordKeyField), Encoders.STRING())
.reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
int ts1 = v1.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
int ts2 = v2.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
if (ts1 > ts2) {
return v1;
} else {
return v2;
}
})
.map((MapFunction<Tuple2<String, Row>, Row>) value -> value._2, encoder)
.filter("_hoodie_is_deleted != true");
}
private ExpressionEncoder getEncoder(StructType schema) {
List<Attribute> attributes = JavaConversions.asJavaCollection(schema.toAttributes()).stream()
.map(Attribute::toAttribute).collect(Collectors.toList());
return RowEncoder.apply(schema)
.resolveAndBind(JavaConverters.asScalaBufferConverter(attributes).asScala().toSeq(),
SimpleAnalyzer$.MODULE$);
}
}

View File

@@ -60,6 +60,7 @@ public class InsertNode extends DagNode<JavaRDD<WriteStatus>> {
if (!config.isDisableGenerate()) {
log.info("Generating input data for node {}", this.getName());
this.deltaWriteStatsRDD = deltaGenerator.writeRecords(deltaGenerator.generateInserts(config));
this.deltaWriteStatsRDD.cache();
this.deltaWriteStatsRDD.count();
}
}

View File

@@ -18,133 +18,39 @@
package org.apache.hudi.integ.testsuite.dag.nodes;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
import org.apache.hudi.integ.testsuite.schema.SchemaUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.ReduceFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.SimpleAnalyzer$;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.stream.Collectors;
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.JavaConverters;
/**
* This nodes validates contents from input path are in tact with Hudi. This nodes uses spark datasource for comparison purposes. By default no configs are required for this node. But there is an
* optional config "delete_input_data" that you can set for this node. If set, once validation completes, contents from inputPath are deleted. This will come in handy for long running test suites.
* README has more details under docker set up for usages of this node.
* This validation node uses spark datasource for comparison purposes.
*/
public class ValidateDatasetNode extends DagNode<Boolean> {
public class ValidateDatasetNode extends BaseValidateDatasetNode {
private static Logger log = LoggerFactory.getLogger(ValidateDatasetNode.class);
public ValidateDatasetNode(Config config) {
this.config = config;
super(config);
}
@Override
public void execute(ExecutionContext context, int curItrCount) throws Exception {
SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
// todo: Fix partitioning schemes. For now, assumes data based partitioning.
String inputPath = context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
String hudiPath = context.getHoodieTestSuiteWriter().getCfg().targetBasePath + "/*/*/*";
log.warn("ValidateDataset Node: Input path " + inputPath + ", hudi path " + hudiPath);
// listing batches to be validated
String inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
FileSystem fs = new Path(inputPathStr)
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
for (FileStatus fileStatus : fileStatuses) {
log.debug("Listing all Micro batches to be validated :: " + fileStatus.getPath().toString());
}
String recordKeyField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.RECORDKEY_FIELD().key());
String partitionPathField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD().key());
// todo: fix hard coded fields from configs.
// read input and resolve insert, updates, etc.
Dataset<Row> inputDf = session.read().format("avro").load(inputPath);
ExpressionEncoder encoder = getEncoder(inputDf.schema());
Dataset<Row> inputSnapshotDf = inputDf.groupByKey(
(MapFunction<Row, String>) value -> partitionPathField + "+" + recordKeyField, Encoders.STRING())
.reduceGroups((ReduceFunction<Row>) (v1, v2) -> {
int ts1 = v1.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
int ts2 = v2.getAs(SchemaUtils.SOURCE_ORDERING_FIELD);
if (ts1 > ts2) {
return v1;
} else {
return v2;
}
})
.map((MapFunction<Tuple2<String, Row>, Row>) value -> value._2, encoder)
.filter("_hoodie_is_deleted is NULL");
// read from hudi and remove meta columns.
Dataset<Row> hudiDf = session.read().format("hudi").load(hudiPath);
Dataset<Row> trimmedDf = hudiDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedDf);
// the intersected df should be same as inputDf. if not, there is some mismatch.
if (inputSnapshotDf.except(intersectionDf).count() != 0) {
log.error("Data set validation failed. Total count in hudi " + trimmedDf.count() + ", input df count " + inputSnapshotDf.count());
throw new AssertionError("Hudi contents does not match contents input data. ");
}
if (config.isValidateHive()) {
String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key());
String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
log.warn("Validating hive table with db : " + database + " and table : " + tableName);
Dataset<Row> cowDf = session.sql("SELECT * FROM " + database + "." + tableName);
Dataset<Row> trimmedCowDf = cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
intersectionDf = inputSnapshotDf.intersect(trimmedDf);
// the intersected df should be same as inputDf. if not, there is some mismatch.
if (inputSnapshotDf.except(intersectionDf).count() != 0) {
log.error("Data set validation failed for COW hive table. Total count in hudi " + trimmedCowDf.count() + ", input df count " + inputSnapshotDf.count());
throw new AssertionError("Hudi hive table contents does not match contents input data. ");
}
}
// if delete input data is enabled, erase input data.
if (config.isDeleteInputData()) {
// clean up input data for current group of writes.
inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
fs = new Path(inputPathStr)
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
fileStatuses = fs.listStatus(new Path(inputPathStr));
for (FileStatus fileStatus : fileStatuses) {
log.debug("Micro batch to be deleted " + fileStatus.getPath().toString());
fs.delete(fileStatus.getPath(), true);
}
}
public Logger getLogger() {
return log;
}
private ExpressionEncoder getEncoder(StructType schema) {
List<Attribute> attributes = JavaConversions.asJavaCollection(schema.toAttributes()).stream()
.map(Attribute::toAttribute).collect(Collectors.toList());
return RowEncoder.apply(schema)
.resolveAndBind(JavaConverters.asScalaBufferConverter(attributes).asScala().toSeq(),
SimpleAnalyzer$.MODULE$);
@Override
public Dataset<Row> getDatasetToValidate(SparkSession session, ExecutionContext context,
StructType inputSchema) {
String hudiPath = context.getHoodieTestSuiteWriter().getCfg().targetBasePath + "/*/*/*";
log.info("Validate data in target hudi path " + hudiPath);
Dataset<Row> hudiDf = session.read().format("hudi").load(hudiPath);
return hudiDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
}
}

View File

@@ -44,6 +44,7 @@ import org.apache.hudi.integ.testsuite.converter.UpdateConverter;
import org.apache.hudi.integ.testsuite.reader.DFSAvroDeltaInputReader;
import org.apache.hudi.integ.testsuite.reader.DFSHoodieDatasetInputReader;
import org.apache.hudi.integ.testsuite.reader.DeltaInputReader;
import org.apache.hudi.integ.testsuite.schema.SchemaUtils;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
import org.apache.hudi.integ.testsuite.writer.DeltaWriteStats;
import org.apache.hudi.integ.testsuite.writer.DeltaWriterAdapter;
@@ -75,7 +76,7 @@ public class DeltaGenerator implements Serializable {
private int batchId;
public DeltaGenerator(DFSDeltaConfig deltaOutputConfig, JavaSparkContext jsc, SparkSession sparkSession,
String schemaStr, BuiltinKeyGenerator keyGenerator) {
String schemaStr, BuiltinKeyGenerator keyGenerator) {
this.deltaOutputConfig = deltaOutputConfig;
this.jsc = jsc;
this.sparkSession = sparkSession;
@@ -123,7 +124,11 @@ public class DeltaGenerator implements Serializable {
.mapPartitionsWithIndex((index, p) -> {
return new LazyRecordGeneratorIterator(new FlexibleSchemaRecordGenerationIterator(recordsPerPartition,
minPayloadSize, schemaStr, partitionPathFieldNames, numPartitions, startPartition));
}, true);
}, true)
.map(record -> {
record.put(SchemaUtils.SOURCE_ORDERING_FIELD, batchId);
return record;
});
if (deltaOutputConfig.getInputParallelism() < numPartitions) {
inputBatch = inputBatch.coalesce(deltaOutputConfig.getInputParallelism());
@@ -167,7 +172,11 @@ public class DeltaGenerator implements Serializable {
log.info("Repartitioning records done for updates");
UpdateConverter converter = new UpdateConverter(schemaStr, config.getRecordSize(),
partitionPathFieldNames, recordRowKeyFieldNames);
JavaRDD<GenericRecord> updates = converter.convert(adjustedRDD);
JavaRDD<GenericRecord> convertedRecords = converter.convert(adjustedRDD);
JavaRDD<GenericRecord> updates = convertedRecords.map(record -> {
record.put(SchemaUtils.SOURCE_ORDERING_FIELD, batchId);
return record;
});
updates.persist(StorageLevel.DISK_ONLY());
if (inserts == null) {
inserts = updates;
@@ -205,11 +214,16 @@ public class DeltaGenerator implements Serializable {
.getNumRecordsDelete());
}
}
log.info("Repartitioning records for delete");
// persist this since we will make multiple passes over this
adjustedRDD = adjustedRDD.repartition(jsc.defaultParallelism());
Converter converter = new DeleteConverter(schemaStr, config.getRecordSize());
JavaRDD<GenericRecord> deletes = converter.convert(adjustedRDD);
JavaRDD<GenericRecord> convertedRecords = converter.convert(adjustedRDD);
JavaRDD<GenericRecord> deletes = convertedRecords.map(record -> {
record.put(SchemaUtils.SOURCE_ORDERING_FIELD, batchId);
return record;
});
deletes.persist(StorageLevel.DISK_ONLY());
return deletes;
} else {

View File

@@ -30,11 +30,12 @@ import scala.collection.JavaConverters._
/**
* Spark datasource based bulk insert node
* @param config1
*
* @param dagNodeConfig DAG node configurations.
*/
class SparkBulkInsertNode(config1: Config) extends DagNode[RDD[WriteStatus]] {
class SparkBulkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] {
config = config1
config = dagNodeConfig
/**
* Execute the {@link DagNode}.

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.integ.testsuite.dag.nodes
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.client.WriteStatus
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
@@ -32,12 +33,13 @@ import scala.collection.JavaConverters._
/**
* Spark datasource based upsert node
* @param config1
*
* @param dagNodeConfig DAG node configurations.
*/
class SparkDeleteNode(config1: Config) extends DagNode[RDD[WriteStatus]] {
class SparkDeleteNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] {
private val log = LogManager.getLogger(getClass)
config = config1
config = dagNodeConfig
/**
* Execute the {@link DagNode}.
@@ -47,20 +49,9 @@ class SparkDeleteNode(config1: Config) extends DagNode[RDD[WriteStatus]] {
* @throws Exception Thrown if the execution failed.
*/
override def execute(context: ExecutionContext, curItrCount: Int): Unit = {
if (!config.isDisableGenerate) {
println("Generating input data for node {}", this.getName)
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateDeletes(config)).count()
}
// Deletes can't be fetched using getNextBatch() bcoz, getInsert(schema) from payload will return empty for delete
// records
context.getWriterContext.getHoodieTestSuiteWriter.getNextBatchForDeletes()
val pathToRead = context.getWriterContext.getCfg.inputBasePath + "/" + context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse("")
val avroDf = context.getWriterContext.getSparkSession.read.format("avro").load(pathToRead)
val genRecsRDD = HoodieSparkUtils.createRdd(avroDf, "testStructName","testNamespace", false,
org.apache.hudi.common.util.Option.of(new Schema.Parser().parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema)))
val genRecsRDD = generateRecordsForDelete(config, context)
val inputDF = AvroConversionUtils.createDataFrame(genRecsRDD,
context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
context.getWriterContext.getSparkSession)
@@ -75,4 +66,24 @@ class SparkDeleteNode(config1: Config) extends DagNode[RDD[WriteStatus]] {
.mode(SaveMode.Append)
.save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)
}
/**
* Generates records for delete operations in Spark.
*
* @param config Node configs.
* @param context The context needed for an execution of a node.
* @return Records in {@link RDD}.
*/
private def generateRecordsForDelete(config: Config, context: ExecutionContext): RDD[GenericRecord] = {
if (!config.isDisableGenerate) {
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateDeletes(config)).count()
}
context.getWriterContext.getHoodieTestSuiteWriter.getNextBatchForDeletes()
val pathToRead = context.getWriterContext.getCfg.inputBasePath + "/" + context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse("")
val avroDf = context.getWriterContext.getSparkSession.read.format("avro").load(pathToRead)
HoodieSparkUtils.createRdd(avroDf, "testStructName", "testNamespace", false,
org.apache.hudi.common.util.Option.of(new Schema.Parser().parse(context.getWriterContext.getHoodieTestSuiteWriter.getSchema)))
}
}

View File

@@ -30,11 +30,12 @@ import scala.collection.JavaConverters._
/**
* Spark datasource based insert node
* @param config1
*
* @param dagNodeConfig DAG node configurations.
*/
class SparkInsertNode(config1: Config) extends DagNode[RDD[WriteStatus]] {
class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] {
config = config1
config = dagNodeConfig
/**
* Execute the {@link DagNode}.

View File

@@ -30,11 +30,12 @@ import scala.collection.JavaConverters._
/**
* Spark datasource based upsert node
* @param config1
*
* @param dagNodeConfig DAG node configurations.
*/
class SparkUpsertNode(config1: Config) extends DagNode[RDD[WriteStatus]] {
class SparkUpsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] {
config = config1
config = dagNodeConfig
/**
* Execute the {@link DagNode}.

View File

@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.integ.testsuite.dag.nodes.spark.sql
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.AvroConversionUtils
import org.apache.hudi.client.WriteStatus
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
import org.apache.hudi.integ.testsuite.dag.ExecutionContext
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode
import org.apache.hudi.integ.testsuite.utils.SparkSqlUtils
import org.apache.spark.rdd.RDD
import org.slf4j.{Logger, LoggerFactory}
/**
* Abstract class for DAG node of running Spark SQL.
*
* @param dagNodeConfig DAG node configurations.
*/
abstract class BaseSparkSqlNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] {
val LOG: Logger = LoggerFactory.getLogger(this.getClass)
val TEMP_TABLE_NAME = "_spark_sql_temp_table"
config = dagNodeConfig
/**
* Returns the Spark SQL query to execute for this {@link DagNode}.
*
* @param config DAG node configurations.
* @param context The context needed for an execution of a node.
* @return the query String.
*/
def queryToRun(config: Config, context: ExecutionContext): String
/**
* Prepares the data for the Spark write operation.
*
* @param context The context needed for an execution of a node.
* @return Records in {@link RDD}.
*/
def prepareData(context: ExecutionContext): RDD[GenericRecord] = {
if (!config.isDisableGenerate) {
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateInserts(config)).count()
}
context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch
}
/**
* @return Name of the temp table containing the input data.
*/
def getTempTableName(): String = {
TEMP_TABLE_NAME
}
/**
* Execute the {@link DagNode}.
*
* @param context The context needed for an execution of a node.
* @param curItrCount iteration count for executing the node.
* @throws Exception Thrown if the execution failed.
*/
override def execute(context: ExecutionContext, curItrCount: Int): Unit = {
LOG.info("Run query in Spark SQL ...")
val nextBatch = prepareData(context)
val sparkSession = context.getWriterContext.getSparkSession
val inputDF = AvroConversionUtils.createDataFrame(nextBatch,
context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
sparkSession)
inputDF.createOrReplaceTempView(TEMP_TABLE_NAME)
val query = queryToRun(config, context)
SparkSqlUtils.logQuery(LOG, query)
sparkSession.sql(query)
LOG.info("Finish run query in Spark SQL.")
}
}

View File

@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.integ.testsuite.dag.nodes.spark.sql
import org.apache.hadoop.fs.Path
import org.apache.hudi.AvroConversionUtils
import org.apache.hudi.client.WriteStatus
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
import org.apache.hudi.integ.testsuite.dag.ExecutionContext
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode
import org.apache.hudi.integ.testsuite.utils.SparkSqlUtils
import org.apache.spark.rdd.RDD
import org.slf4j.{Logger, LoggerFactory}
/**
* DAG node of create table using Spark SQL.
*
* @param dagNodeConfig DAG node configurations.
*/
class SparkSqlCreateTableNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] {
val LOG: Logger = LoggerFactory.getLogger(classOf[SparkSqlCreateTableNode])
val TEMP_TABLE_NAME: String = "_spark_sql_temp_table"
config = dagNodeConfig
/**
* Execute the {@link DagNode}.
*
* @param context The context needed for an execution of a node.
* @param curItrCount iteration count for executing the node.
* @throws Exception Thrown if the execution failed.
*/
override def execute(context: ExecutionContext, curItrCount: Int): Unit = {
LOG.info("Creating table in Spark SQL ...")
val sparkSession = context.getWriterContext.getSparkSession
val targetTableName = context.getWriterContext.getCfg.targetTableName
val targetBasePath = context.getWriterContext.getCfg.targetBasePath + "_sql"
if (config.shouldUseCtas) {
// Prepares data for CTAS query
if (!config.isDisableGenerate) {
context.getDeltaGenerator.writeRecords(context.getDeltaGenerator.generateInserts(config)).count()
}
val nextBatch = context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch
val sparkSession = context.getWriterContext.getSparkSession
val inputDF = AvroConversionUtils.createDataFrame(nextBatch,
context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
sparkSession)
inputDF.createOrReplaceTempView(TEMP_TABLE_NAME)
}
// Cleans up the table
sparkSession.sql("drop table if exists " + targetTableName)
if (config.isTableExternal) {
LOG.info("Clean up " + targetBasePath)
val fs = FSUtils.getFs(targetBasePath, context.getJsc.hadoopConfiguration())
val targetPath = new Path(targetBasePath)
if (fs.exists(targetPath)) {
fs.delete(targetPath, true)
}
}
// Executes the create table query
val createTableQuery = SparkSqlUtils.constructCreateTableQuery(
config, targetTableName, targetBasePath,
context.getWriterContext.getHoodieTestSuiteWriter.getSchema, TEMP_TABLE_NAME)
SparkSqlUtils.logQuery(LOG, createTableQuery)
sparkSession.sql(createTableQuery)
val targetTableCount = sparkSession.sql("select * from " + targetTableName)
LOG.info("Target table count: " + targetTableCount.count())
LOG.info("Finish create table in Spark SQL.")
}
}

View File

@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.integ.testsuite.dag.nodes.spark.sql
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
import org.apache.hudi.integ.testsuite.dag.ExecutionContext
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode
import org.apache.hudi.integ.testsuite.utils.SparkSqlUtils
import org.apache.spark.rdd.RDD
/**
* DAG node of delete using Spark SQL.
*
* @param dagNodeConfig DAG node configurations.
*/
class SparkSqlDeleteNode(dagNodeConfig: Config) extends BaseSparkSqlNode(dagNodeConfig) {
config = dagNodeConfig
/**
* Prepares the data for the write operation.
*
* @param context The context needed for an execution of a node.
* @return Records in {@link RDD}.
*/
override def prepareData(context: ExecutionContext): RDD[GenericRecord] = {
val sparkSession = context.getWriterContext.getSparkSession
val recordsToDelete = SparkSqlUtils.generateDeleteRecords(
config, sparkSession, context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
context.getWriterContext.getCfg.targetTableName, sparkSession.sparkContext.defaultParallelism)
LOG.info("Number of records to delete: " + recordsToDelete.count())
// The update records corresponding to the SQL are only used for data validation
context.getDeltaGenerator().writeRecords(recordsToDelete).count()
recordsToDelete
}
/**
* Returns the Spark SQL query to execute for this {@link DagNode}.
*
* @param config DAG node configurations.
* @param context The context needed for an execution of a node.
* @return the query String.
*/
override def queryToRun(config: Config, context: ExecutionContext): String = {
SparkSqlUtils.constructDeleteQuery(config, context.getWriterContext.getSparkSession,
context.getWriterContext.getCfg.targetTableName)
}
}

View File

@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.integ.testsuite.dag.nodes.spark.sql
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
import org.apache.hudi.integ.testsuite.dag.ExecutionContext
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode
import org.apache.hudi.integ.testsuite.utils.SparkSqlUtils
/**
* DAG node of insert using Spark SQL.
*
* @param dagNodeConfig DAG node configurations.
*/
class SparkSqlInsertNode(dagNodeConfig: Config) extends BaseSparkSqlNode(dagNodeConfig) {
config = dagNodeConfig
/**
* Returns the Spark SQL query to execute for this {@link DagNode}.
*
* @param config DAG node configurations.
* @param context The context needed for an execution of a node.
* @return the query String.
*/
override def queryToRun(config: Config, context: ExecutionContext): String = {
val targetTableName = context.getWriterContext.getCfg.targetTableName
SparkSqlUtils.constructInsertQuery(
"into", targetTableName,
SparkSqlUtils.getTableSchema(context.getWriterContext.getSparkSession, targetTableName),
getTempTableName())
}
}

View File

@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.integ.testsuite.dag.nodes.spark.sql
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
import org.apache.hudi.integ.testsuite.dag.ExecutionContext
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode
import org.apache.hudi.integ.testsuite.utils.SparkSqlUtils
/**
* DAG node of insert overwrite using Spark SQL.
*
* @param dagNodeConfig DAG node configurations.
*/
class SparkSqlInsertOverwriteNode(dagNodeConfig: Config) extends BaseSparkSqlNode(dagNodeConfig) {
config = dagNodeConfig
/**
* Returns the Spark SQL query to execute for this {@link DagNode}.
*
* @param config DAG node configurations.
* @param context The context needed for an execution of a node.
* @return the query String.
*/
override def queryToRun(config: Config, context: ExecutionContext): String = {
val targetTableName = context.getWriterContext.getCfg.targetTableName
SparkSqlUtils.constructInsertQuery(
"overwrite", targetTableName,
SparkSqlUtils.getTableSchema(context.getWriterContext.getSparkSession, targetTableName),
getTempTableName())
}
}

View File

@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.integ.testsuite.dag.nodes.spark.sql
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
import org.apache.hudi.integ.testsuite.dag.ExecutionContext
import org.apache.hudi.integ.testsuite.utils.SparkSqlUtils
import org.apache.spark.rdd.RDD
/**
* DAG node of merge using Spark SQL.
*
* @param dagNodeConfig DAG node configurations.
*/
class SparkSqlMergeNode(dagNodeConfig: Config) extends BaseSparkSqlNode(dagNodeConfig) {
config = dagNodeConfig
/**
* Prepares the data for the Spark write operation.
*
* @param context The context needed for an execution of a node.
* @return Records in {@link RDD}.
*/
override def prepareData(context: ExecutionContext): RDD[GenericRecord] = {
if (!config.isDisableGenerate) {
context.getDeltaGenerator().writeRecords(context.getDeltaGenerator().generateUpdates(config)).count()
}
context.getWriterContext.getHoodieTestSuiteWriter.getNextBatch
}
/**
* Returns the Spark SQL query to execute for this {@link DagNode}.
*
* @param config DAG node configurations.
* @param context The context needed for an execution of a node.
* @return the query String.
*/
override def queryToRun(config: Config, context: ExecutionContext): String = {
val targetTableName = context.getWriterContext.getCfg.targetTableName
SparkSqlUtils.constructMergeQuery(
config, targetTableName,
SparkSqlUtils.getTableSchema(context.getWriterContext.getSparkSession, targetTableName),
getTempTableName())
}
}

View File

@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.integ.testsuite.dag.nodes.spark.sql
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
import org.apache.hudi.integ.testsuite.dag.ExecutionContext
import org.apache.hudi.integ.testsuite.dag.nodes.DagNode
import org.apache.hudi.integ.testsuite.utils.SparkSqlUtils
import org.apache.spark.rdd.RDD
/**
* DAG node of update using Spark SQL.
*
* @param dagNodeConfig DAG node configurations.
*/
class SparkSqlUpdateNode(dagNodeConfig: Config) extends BaseSparkSqlNode(dagNodeConfig) {
config = dagNodeConfig
/**
* Prepares the data for the Spark write operation.
*
* @param context The context needed for an execution of a node.
* @return Records in {@link RDD}.
*/
override def prepareData(context: ExecutionContext): RDD[GenericRecord] = {
val sparkSession = context.getWriterContext.getSparkSession
val recordsToUpdate = SparkSqlUtils.generateUpdateRecords(
config, sparkSession, context.getWriterContext.getHoodieTestSuiteWriter.getSchema,
context.getWriterContext.getCfg.targetTableName, sparkSession.sparkContext.defaultParallelism)
LOG.info("Number of records to update: " + recordsToUpdate.count())
// The update records corresponding to the SQL are only used for data validation
context.getDeltaGenerator().writeRecords(recordsToUpdate).count()
recordsToUpdate
}
/**
* Returns the Spark SQL query to execute for this {@link DagNode}.
*
* @param config DAG node configurations.
* @param context The context needed for an execution of a node.
* @return the query String.
*/
override def queryToRun(config: Config, context: ExecutionContext): String = {
SparkSqlUtils.constructUpdateQuery(config, context.getWriterContext.getSparkSession,
context.getWriterContext.getCfg.targetTableName)
}
}

View File

@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.integ.testsuite.dag.nodes.spark.sql
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
import org.apache.hudi.integ.testsuite.dag.ExecutionContext
import org.apache.hudi.integ.testsuite.dag.nodes.BaseValidateDatasetNode
import org.apache.hudi.integ.testsuite.utils.SparkSqlUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.slf4j.{Logger, LoggerFactory}
/**
* This validation node uses Spark SQL to get data for comparison purposes.
*/
class SparkSqlValidateDatasetNode(dagNodeConfig: Config) extends BaseValidateDatasetNode(dagNodeConfig) {
val LOG: Logger = LoggerFactory.getLogger(classOf[SparkSqlValidateDatasetNode])
config = dagNodeConfig
/**
* @return {@link Logger} instance to use.
*/
override def getLogger: Logger = LOG
/**
* @param session {@link SparkSession} instance to use.
* @param context {@link ExecutionContext} instance to use.
* @param inputSchema input schema in {@link StructType}
* @return data in {@link Dataset< Row >} to validate.
*/
override def getDatasetToValidate(session: SparkSession, context: ExecutionContext,
inputSchema: StructType): Dataset[Row] = {
val tableName = context.getWriterContext.getCfg.targetTableName
LOG.info("Validate data in table " + tableName)
val sortedInputFieldNames = inputSchema.fieldNames.sorted
val tableSchema = session.table(tableName).schema
val sortedTableFieldNames = tableSchema.fieldNames
.filter(field => !HoodieRecord.HOODIE_META_COLUMNS.contains(field)).sorted
if (!(sortedInputFieldNames sameElements sortedTableFieldNames)) {
LOG.error("Input schema: ")
inputSchema.printTreeString()
LOG.error("Table schema: ")
tableSchema.printTreeString()
throw new AssertionError("Data set validation failed. The schema does not match.")
}
session.sql(SparkSqlUtils.constructSelectQuery(inputSchema, tableName))
}
}

View File

@@ -0,0 +1,526 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hudi.integ.testsuite.utils
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hudi.HoodieSparkUtils
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.util.Option
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config
import org.apache.hudi.integ.testsuite.generator.GenericRecordFullPayloadGenerator
import org.apache.hudi.integ.testsuite.utils.SparkSqlUtils.getFieldNamesAndTypes
import org.apache.hudi.utilities.schema.RowBasedSchemaProvider
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.storage.StorageLevel
import org.slf4j.Logger
import scala.math.BigDecimal.RoundingMode.RoundingMode
/**
* Utils for test nodes in Spark SQL
*/
object SparkSqlUtils {
/**
* @param sparkSession spark session to use
* @param tableName table name
* @return table schema excluding meta columns in `StructType`
*/
def getTableSchema(sparkSession: SparkSession, tableName: String): StructType = {
new StructType(sparkSession.table(tableName).schema.fields
.filter(field => !HoodieRecord.HOODIE_META_COLUMNS.contains(field.name)))
}
/**
* Converts Avro schema in String to the SQL schema expression, with partition fields at the end
*
* For example, given the Avro schema below:
* """
* {"type":"record","name":"triprec","fields":[{"name":"timestamp","type":"long"},
* {"name":"_row_key","type":"string"},{"name":"rider","type":"string"},{"name":"driver","type":"string"},
* {"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},
* {"name":"end_lon","type":"double"},{"name":"fare","type":"double"},
* {"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
* """
* and the partition columns Set("rider"),
* the SQL schema expression is:
* """
* timestamp bigint,
* _row_key string,
* driver string,
* begin_lat double,
* begin_lon double,
* end_lat double,
* end_lon double,
* fare double,
* _hoodie_is_deleted boolean,
* rider string
* """
*
* @param avroSchemaString Avro schema String
* @param partitionColumns partition columns
* @return corresponding SQL schema expression
*/
def convertAvroToSqlSchemaExpression(avroSchemaString: String, partitionColumns: Set[String]): String = {
val fields: Array[(String, String)] = getFieldNamesAndTypes(avroSchemaString)
val reorderedFields = fields.filter(field => !partitionColumns.contains(field._1)) ++
fields.filter(field => partitionColumns.contains(field._1))
reorderedFields.map(e => e._1 + " " + e._2).mkString(",\n")
}
/**
* Converts Avro schema in String to an array of field names.
*
* For example, given the Avro schema below:
* """
* {"type":"record","name":"triprec","fields":[{"name":"timestamp","type":"long"},
* {"name":"_row_key","type":"string"},{"name":"rider","type":"string"},{"name":"driver","type":"string"},
* {"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},
* {"name":"end_lon","type":"double"},{"name":"fare","type":"double"},
* {"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
* """
* the output is
* ["timestamp", "_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon",
* "fare", "_hoodie_is_deleted"]
*
* @param avroSchemaString Avro schema String
* @return an array of field names.
*/
def convertAvroToFieldNames(avroSchemaString: String): Array[String] = {
getFieldNamesAndTypes(avroSchemaString).map(e => e._1)
}
/**
* Gets an array of field names and types from Avro schema String.
*
* For example, given the Avro schema below:
* """
* {"type":"record","name":"triprec","fields":[{"name":"timestamp","type":"long"},
* {"name":"_row_key","type":"string"},{"name":"rider","type":"string"},{"name":"driver","type":"string"},
* {"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},
* {"name":"end_lon","type":"double"},{"name":"fare","type":"double"},
* {"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
* """
* the output is
* [("timestamp", "bigint"),
* ("_row_key", "string"),
* ("rider", "string",
* ("driver", "string"),
* ("begin_lat", "double"),
* ("begin_lon", "double"),
* ("end_lat", "double"),
* ("end_lon", "double"),
* ("fare", "double"),
* ("_hoodie_is_deleted", "boolean")]
*
* @param avroSchemaString Avro schema String
* @return an array of field names and types
*/
def getFieldNamesAndTypes(avroSchemaString: String): Array[(String, String)] = {
val schema = new Schema.Parser().parse(avroSchemaString)
val structType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
structType.fields.map(field => (field.name, field.dataType.simpleString))
}
/**
* Logs the Spark SQL query to run.
*
* @param log {@link Logger} instance to use.
* @param query query String.
*/
def logQuery(log: Logger, query: String): Unit = {
log.warn("----- Running the following Spark SQL query -----")
log.warn(query)
log.warn("-" * 50)
}
/**
* Constructs the select query.
*
* For example, given the Avro schema below:
* """
* {"type":"record","name":"triprec","fields":[{"name":"timestamp","type":"long"},
* {"name":"_row_key","type":"string"},{"name":"rider","type":"string"},{"name":"driver","type":"string"},
* {"name":"begin_lat","type":"double"},{"name":"begin_lon","type":"double"},{"name":"end_lat","type":"double"},
* {"name":"end_lon","type":"double"},{"name":"fare","type":"double"},
* {"name":"_hoodie_is_deleted","type":"boolean","default":false}]}
* """
* and the partition columns Set("rider"),
* the output is
* """
* select timestamp, _row_key, driver, begin_lat, begin_lon, end_lat, end_lon, fare,
* _hoodie_is_deleted, rider from _temp_table
* """
*
* @param inputSchema input Avro schema String.
* @param partitionColumns partition columns
* @param tableName table name.
* @return select query String.
*/
def constructSelectQuery(inputSchema: String, partitionColumns: Set[String], tableName: String): String = {
val fieldNames: Array[String] = SparkSqlUtils.convertAvroToFieldNames(inputSchema)
val reorderedFieldNames = fieldNames.filter(name => !partitionColumns.contains(name)) ++
fieldNames.filter(name => partitionColumns.contains(name))
constructSelectQuery(reorderedFieldNames, tableName)
}
/**
* Constructs the select query with {@link StructType} columns in the select.
*
* @param structType {@link StructType} instance.
* @param tableName table name.
* @return select query String.
*/
def constructSelectQuery(structType: StructType, tableName: String): String = {
constructSelectQuery(structType, Set.empty[String], tableName)
}
/**
* Constructs the select query with {@link StructType} columns in the select and the partition
* columns at the end.
*
* @param structType {@link StructType} instance.
* @param partitionColumns partition columns in a {@link Set}
* @param tableName table name.
* @return select query String.
*/
def constructSelectQuery(structType: StructType, partitionColumns: Set[String], tableName: String): String = {
val fieldNames: Array[String] = structType.fields.map(field => field.name)
val reorderedFieldNames = fieldNames.filter(name => !partitionColumns.contains(name)) ++
fieldNames.filter(name => partitionColumns.contains(name))
constructSelectQuery(reorderedFieldNames, tableName)
}
/**
* Constructs the select query with a {@link Array} of String.
*
* @param fieldNames field names in String.
* @param tableName table name.
* @return select query String.
*/
def constructSelectQuery(fieldNames: Array[String], tableName: String): String = {
val selectQueryBuilder = new StringBuilder("select ");
selectQueryBuilder.append(fieldNames.mkString(", "))
selectQueryBuilder.append(" from ")
selectQueryBuilder.append(tableName)
selectQueryBuilder.toString()
}
/**
* Constructs the Spark SQL create table query based on the configs.
*
* @param config DAG node configurations.
* @param targetTableName target table name.
* @param targetBasePath target bash path for external table.
* @param inputSchema input Avro schema String.
* @param inputTableName name of the table containing input data.
* @return create table query.
*/
def constructCreateTableQuery(config: Config, targetTableName: String, targetBasePath: String,
inputSchema: String, inputTableName: String): String = {
// Constructs create table statement
val createTableQueryBuilder = new StringBuilder("create table ")
createTableQueryBuilder.append(targetTableName)
val partitionColumns: Set[String] =
if (config.getPartitionField.isPresent) Set(config.getPartitionField.get) else Set.empty
if (!config.shouldUseCtas) {
// Adds the schema statement if not using CTAS
createTableQueryBuilder.append(" (")
createTableQueryBuilder.append(SparkSqlUtils.convertAvroToSqlSchemaExpression(inputSchema, partitionColumns))
createTableQueryBuilder.append("\n)")
}
createTableQueryBuilder.append(" using hudi")
val tableTypeOption = config.getTableType
val primaryKeyOption = config.getPrimaryKey
val preCombineFieldOption = config.getPreCombineField
// Adds location for external table
if (config.isTableExternal) {
createTableQueryBuilder.append("\nlocation '" + targetBasePath + "'")
}
// Adds options if set
var options = Array[String]()
if (tableTypeOption.isPresent) {
options :+= ("type = '" + tableTypeOption.get() + "'")
}
if (primaryKeyOption.isPresent) {
options :+= ("primaryKey = '" + primaryKeyOption.get() + "'")
}
if (preCombineFieldOption.isPresent) {
options :+= ("preCombineField = '" + preCombineFieldOption.get() + "'")
}
if (options.length > 0) {
createTableQueryBuilder.append(options.mkString("\noptions ( \n", ",\n", "\n)"))
}
// Adds partition fields if set
val partitionFieldOption = config.getPartitionField
if (partitionFieldOption.isPresent) {
createTableQueryBuilder.append("\npartitioned by (" + partitionFieldOption.get() + ")")
}
if (config.shouldUseCtas()) {
// Adds as select query
createTableQueryBuilder.append("\nas\n");
createTableQueryBuilder.append(constructSelectQuery(inputSchema, partitionColumns, inputTableName))
}
createTableQueryBuilder.toString()
}
/**
* Constructs the Spark SQL insert query based on the configs.
*
* @param insertType the insert type, in one of two types: "into" or "overwrite".
* @param targetTableName target table name.
* @param schema table schema to use
* @param inputTableName name of the table containing input data.
* @return insert query.
*/
def constructInsertQuery(insertType: String, targetTableName: String, schema: StructType,
inputTableName: String): String = {
// Constructs insert statement
val insertQueryBuilder = new StringBuilder("insert ")
insertQueryBuilder.append(insertType)
insertQueryBuilder.append(" ")
insertQueryBuilder.append(targetTableName)
insertQueryBuilder.append(" ")
insertQueryBuilder.append(constructSelectQuery(schema, inputTableName))
insertQueryBuilder.toString()
}
/**
* Constructs the Spark SQL merge query based on the configs.
*
* @param config DAG node configurations.
* @param targetTableName target table name.
* @param schema table schema to use
* @param inputTableName name of the table containing input data.
* @return merge query.
*/
def constructMergeQuery(config: Config, targetTableName: String, schema: StructType,
inputTableName: String): String = {
val mergeQueryBuilder = new StringBuilder("merge into ")
mergeQueryBuilder.append(targetTableName)
mergeQueryBuilder.append(" as target using (\n")
mergeQueryBuilder.append(constructSelectQuery(schema, inputTableName))
mergeQueryBuilder.append("\n) source\non ")
mergeQueryBuilder.append(config.getMergeCondition)
mergeQueryBuilder.append("\nwhen matched then ")
mergeQueryBuilder.append(config.getMatchedAction)
mergeQueryBuilder.append("\nwhen not matched then ")
mergeQueryBuilder.append(config.getNotMatchedAction)
mergeQueryBuilder.toString()
}
/**
* Constructs the Spark SQL update query based on the configs.
*
* @param config DAG node configurations.
* @param sparkSession Spark session.
* @param targetTableName target table name.
* @return update query.
*/
def constructUpdateQuery(config: Config, sparkSession: SparkSession,
targetTableName: String): String = {
val bounds = getLowerUpperBoundsFromPercentiles(config, sparkSession, targetTableName)
val updateQueryBuilder = new StringBuilder("update ")
updateQueryBuilder.append(targetTableName)
updateQueryBuilder.append(" set ")
updateQueryBuilder.append(config.getUpdateColumn)
updateQueryBuilder.append(" = ")
updateQueryBuilder.append(config.getUpdateColumn)
updateQueryBuilder.append(" * 1.6 ")
updateQueryBuilder.append(" where ")
updateQueryBuilder.append(config.getWhereConditionColumn)
updateQueryBuilder.append(" between ")
updateQueryBuilder.append(bounds._1)
updateQueryBuilder.append(" and ")
updateQueryBuilder.append(bounds._2)
updateQueryBuilder.toString()
}
/**
* Constructs the Spark SQL delete query based on the configs.
*
* @param config DAG node configurations.
* @param sparkSession Spark session.
* @param targetTableName target table name.
* @return delete query.
*/
def constructDeleteQuery(config: Config, sparkSession: SparkSession,
targetTableName: String): String = {
val bounds = getLowerUpperBoundsFromPercentiles(config, sparkSession, targetTableName)
val deleteQueryBuilder = new StringBuilder("delete from ")
deleteQueryBuilder.append(targetTableName)
deleteQueryBuilder.append(" where ")
deleteQueryBuilder.append(config.getWhereConditionColumn)
deleteQueryBuilder.append(" between ")
deleteQueryBuilder.append(bounds._1)
deleteQueryBuilder.append(" and ")
deleteQueryBuilder.append(bounds._2)
deleteQueryBuilder.toString()
}
/**
* Generates the pair of percentile levels based on the ratio in the config.
*
* For example, given ratio as 0.4, the output is (0.3, 0.7).
*
* @param config DAG node configurations.
* @return the lower bound and upper bound percentiles.
*/
def generatePercentiles(config: Config): (Double, Double) = {
val ratio: Double = config.getRatioRecordsChange
(Math.max(0.5 - (ratio / 2.0), 0.0), Math.min(0.5 + (ratio / 2.0), 1.0))
}
/**
* @param number input double number
* @param mode rounding mode
* @return rounded double
*/
def roundDouble(number: Double, mode: RoundingMode): Double = {
BigDecimal(number).setScale(4, mode).toDouble
}
/**
* @param config DAG node configurations.
* @param sparkSession Spark session.
* @param targetTableName target table name.
* @return lower and upper bound values based on the percentiles.
*/
def getLowerUpperBoundsFromPercentiles(config: Config, sparkSession: SparkSession,
targetTableName: String): (Double, Double) = {
val percentiles = generatePercentiles(config)
val result = sparkSession.sql(constructPercentileQuery(config, targetTableName, percentiles)).collect()(0)
(roundDouble(result.get(0).asInstanceOf[Double], BigDecimal.RoundingMode.HALF_DOWN),
roundDouble(result.get(1).asInstanceOf[Double], BigDecimal.RoundingMode.HALF_UP))
}
/**
* Constructs the query to get percentiles for the where condition.
*
* @param config DAG node configurations.
* @param targetTableName target table name.
* @param percentiles lower and upper percentiles.
* @return percentile query in String.
*/
def constructPercentileQuery(config: Config, targetTableName: String,
percentiles: (Double, Double)): String = {
val percentileQueryBuilder = new StringBuilder("select percentile(")
percentileQueryBuilder.append(config.getWhereConditionColumn)
percentileQueryBuilder.append(", ")
percentileQueryBuilder.append(percentiles._1)
percentileQueryBuilder.append("), percentile(")
percentileQueryBuilder.append(config.getWhereConditionColumn)
percentileQueryBuilder.append(", ")
percentileQueryBuilder.append(percentiles._2)
percentileQueryBuilder.append(") from ")
percentileQueryBuilder.append(targetTableName)
percentileQueryBuilder.toString()
}
/**
* Constructs the Spark SQL query to get update or delete records.
*
* @param config DAG node configurations.
* @param targetTableName target table name.
* @param avroSchemaString input Avro schema String.
* @param lowerBound lower bound value for the where condition.
* @param upperBound upper bound value for the where condition.
* @return delete query.
*/
def constructChangedRecordQuery(config: Config, targetTableName: String, avroSchemaString: String,
lowerBound: Double, upperBound: Double): String = {
val recordQueryBuilder = new StringBuilder(constructSelectQuery(avroSchemaString, Set.empty[String], targetTableName))
recordQueryBuilder.append(" where ")
recordQueryBuilder.append(config.getWhereConditionColumn)
recordQueryBuilder.append(" between ")
recordQueryBuilder.append(lowerBound)
recordQueryBuilder.append(" and ")
recordQueryBuilder.append(upperBound)
recordQueryBuilder.toString()
}
/**
* Generates the exact same records to update based on the SQL derived from the
* configs for data validation.
*
* @param config DAG node configurations.
* @param sparkSession Spark session.
* @param avroSchemaString input Avro schema String.
* @param targetTableName target table name.
* @param parallelism parallelism for RDD
* @return records in {@link JavaRdd[ GenericRecord ]}.
*/
def generateUpdateRecords(config: Config, sparkSession: SparkSession, avroSchemaString: String,
targetTableName: String, parallelism: Int): JavaRDD[GenericRecord] = {
val bounds = getLowerUpperBoundsFromPercentiles(config, sparkSession, targetTableName)
val rows = sparkSession.sql(
constructChangedRecordQuery(config, targetTableName, avroSchemaString, bounds._1, bounds._2))
val rdd = HoodieSparkUtils
.createRdd(rows, RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME,
RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE, reconcileToLatestSchema = false, Option.empty())
.map(record => {
record.put(config.getUpdateColumn, record.get(config.getUpdateColumn).toString.toDouble * 1.6)
record
})
.toJavaRDD()
val repartitionedRdd = rdd.repartition(parallelism)
repartitionedRdd.persist(StorageLevel.DISK_ONLY)
repartitionedRdd
}
/**
* Generates the exact same records to delete based on the SQL derived from the
* configs for data validation.
*
* @param config DAG node configurations.
* @param sparkSession Spark session.
* @param avroSchemaString input Avro schema String.
* @param targetTableName target table name.
* @param parallelism parallelism for RDD
* @return records in {@link JavaRdd[ GenericRecord ]}.
*/
def generateDeleteRecords(config: Config, sparkSession: SparkSession, avroSchemaString: String,
targetTableName: String, parallelism: Int): JavaRDD[GenericRecord] = {
val bounds = getLowerUpperBoundsFromPercentiles(config, sparkSession, targetTableName)
val rows = sparkSession.sql(
constructChangedRecordQuery(config, targetTableName, avroSchemaString, bounds._1, bounds._2))
val rdd = HoodieSparkUtils
.createRdd(rows, RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME,
RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE, reconcileToLatestSchema = false, Option.empty())
.map(record => {
record.put(GenericRecordFullPayloadGenerator.DEFAULT_HOODIE_IS_DELETED_COL, true)
record
})
.toJavaRDD()
val repartitionedRdd = rdd.repartition(parallelism)
repartitionedRdd.persist(StorageLevel.DISK_ONLY)
repartitionedRdd
}
}

View File

@@ -18,12 +18,6 @@
package org.apache.hudi.integ.testsuite.job;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.UUID;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieTableType;
@@ -37,21 +31,31 @@ import org.apache.hudi.integ.testsuite.dag.HiveSyncDagGeneratorMOR;
import org.apache.hudi.integ.testsuite.dag.WorkflowDagGenerator;
import org.apache.hudi.integ.testsuite.helpers.DFSTestSuitePathSelector;
import org.apache.hudi.integ.testsuite.reader.DeltaInputType;
import org.apache.hudi.integ.testsuite.schema.SchemaUtils;
import org.apache.hudi.integ.testsuite.schema.TestSuiteFileBasedSchemaProvider;
import org.apache.hudi.integ.testsuite.writer.DeltaOutputMode;
import org.apache.hudi.keygen.TimestampBasedKeyGenerator;
import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
import org.apache.hudi.utilities.sources.AvroDFSSource;
import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.util.UUID;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* Unit test against {@link HoodieTestSuiteJob}.
*/
@@ -72,6 +76,9 @@ public class TestHoodieTestSuiteJob extends UtilitiesTestBase {
private static final String COW_DAG_FILE_NAME_SPARK_DATASOURCE_NODES = "unit-test-cow-dag-spark-datasource.yaml";
private static final String COW_DAG_SPARK_DATASOURCE_NODES_RELATIVE_PATH = "/hudi-integ-test/src/test/resources/unit-test-cow-dag-spark-datasource.yaml";
private static final String SPARK_SQL_DAG_FILE_NAME = "unit-test-spark-sql-dag.yaml";
private static final String SPARK_SQL_DAG_SOURCE_PATH = "/hudi-integ-test/src/test/resources/" + SPARK_SQL_DAG_FILE_NAME;
public static Stream<Arguments> configParams() {
Object[][] data =
new Object[][] {{false, "COPY_ON_WRITE"}};
@@ -102,6 +109,8 @@ public class TestHoodieTestSuiteJob extends UtilitiesTestBase {
+ COW_DAG_SPARK_DATASOURCE_NODES_RELATIVE_PATH, dfs, dfsBasePath + "/" + COW_DAG_FILE_NAME_SPARK_DATASOURCE_NODES);
UtilitiesTestBase.Helpers.savePropsToDFS(getProperties(), dfs, dfsBasePath + "/test-source"
+ ".properties");
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir") + "/.."
+ SPARK_SQL_DAG_SOURCE_PATH, dfs, dfsBasePath + "/" + SPARK_SQL_DAG_FILE_NAME);
// Properties used for the delta-streamer which incrementally pulls from upstream DFS Avro source and
// writes to downstream hudi table
@@ -269,22 +278,35 @@ public class TestHoodieTestSuiteJob extends UtilitiesTestBase {
assertEquals(metaClient.getActiveTimeline().getCommitsTimeline().getInstants().count(), 3);
}
@Test
public void testSparkSqlDag() throws Exception {
boolean useDeltaStreamer = false;
this.cleanDFSDirs();
String inputBasePath = dfsBasePath + "/input";
String outputBasePath = dfsBasePath + "/result";
HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath, useDeltaStreamer, HoodieTableType
.COPY_ON_WRITE.name());
cfg.workloadYamlPath = dfsBasePath + "/" + SPARK_SQL_DAG_FILE_NAME;
HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
hoodieTestSuiteJob.runTestSuite();
}
protected HoodieTestSuiteConfig makeConfig(String inputBasePath, String outputBasePath, boolean useDeltaStream,
String tableType) {
String tableType) {
HoodieTestSuiteConfig cfg = new HoodieTestSuiteConfig();
cfg.targetBasePath = outputBasePath;
cfg.inputBasePath = inputBasePath;
cfg.targetTableName = "table1";
cfg.tableType = tableType;
cfg.sourceClassName = AvroDFSSource.class.getName();
cfg.sourceOrderingField = "timestamp";
cfg.sourceOrderingField = SchemaUtils.SOURCE_ORDERING_FIELD;
cfg.propsFilePath = dfsBasePath + "/test-source.properties";
cfg.outputTypeName = DeltaOutputMode.DFS.name();
cfg.inputFormatName = DeltaInputType.AVRO.name();
cfg.limitFileSize = 1024 * 1024L;
cfg.sourceLimit = 20000000;
cfg.workloadDagGenerator = WorkflowDagGenerator.class.getName();
cfg.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
cfg.schemaProviderClassName = TestSuiteFileBasedSchemaProvider.class.getName();
cfg.useDeltaStreamer = useDeltaStream;
return cfg;
}

View File

@@ -0,0 +1,64 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
dag_name: unit-test-spark-sql-dag.yaml
dag_rounds: 1
dag_intermittent_delay_mins: 1
dag_content:
create_table:
config:
table_type: cow
primary_key: _row_key
pre_combine_field: test_suite_source_ordering_field
partition_field: rider
record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 1000
type: spark.sql.SparkSqlCreateTableNode
deps: none
insert_records:
config:
record_size: 1000
num_partitions_insert: 1
repeat_count: 1
num_records_insert: 1000
type: spark.sql.SparkSqlInsertNode
deps: create_table
#merge_records:
# config:
# merge_condition: target._row_key = source._row_key
# matched_action: update set *
# not_matched_action: insert *
# record_size: 1000
# num_partitions_insert: 10
# repeat_count: 1
# num_records_upsert: 100
# num_records_insert: 1000
# type: spark.sql.SparkSqlMergeNode
# deps: insert_records
#delete_records:
# config:
# condition_column: begin_lat
# record_size: 1000
# repeat_count: 1
# ratio_records_change: 0.2
# type: spark.sql.SparkSqlDeleteNode
# deps: insert_records
#validate:
# config:
# delete_input_data: true
# type: spark.sql.SparkSqlValidateDatasetNode
# deps: delete_records