1
0

[HUDI-3312] Fixing spark yaml and adding hive validation to integ test suite (#4731)

This commit is contained in:
Sivabalan Narayanan
2022-02-08 00:40:36 -05:00
committed by GitHub
parent 8ab6f17149
commit 0ab1a8ec80
14 changed files with 295 additions and 38 deletions

View File

@@ -95,6 +95,7 @@ public class DeltaConfig implements Serializable {
private static String SCHEMA_VERSION = "schema_version";
private static String NUM_ROLLBACKS = "num_rollbacks";
private static String ENABLE_ROW_WRITING = "enable_row_writing";
private static String ENABLE_METADATA_VALIDATE = "enable_metadata_validate";
// Spark SQL Create Table
private static String TABLE_TYPE = "table_type";
@@ -149,6 +150,10 @@ public class DeltaConfig implements Serializable {
return Integer.valueOf(configsMap.getOrDefault(RECORD_SIZE, 1024).toString());
}
public boolean isEnableMetadataValidate() {
return Boolean.valueOf(configsMap.getOrDefault(ENABLE_METADATA_VALIDATE, false).toString());
}
public int getNumInsertPartitions() {
return Integer.valueOf(configsMap.getOrDefault(NUM_PARTITIONS_INSERT, 1).toString());
}

View File

@@ -111,14 +111,17 @@ public abstract class BaseValidateDatasetNode extends DagNode<Boolean> {
String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key());
String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
log.warn("Validating hive table with db : " + database + " and table : " + tableName);
Dataset<Row> cowDf = session.sql("SELECT * FROM " + database + "." + tableName);
Dataset<Row> trimmedCowDf = cowDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
intersectionDf = inputSnapshotDf.intersect(trimmedCowDf);
session.sql("REFRESH TABLE " + database + "." + tableName);
Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " +
"test_suite_source_ordering_field FROM " + database + "." + tableName);
Dataset<Row> reorderedInputDf = inputSnapshotDf.select("_row_key","rider","driver","begin_lat","begin_lon","end_lat","end_lon","fare",
"_hoodie_is_deleted","test_suite_source_ordering_field");
Dataset<Row> intersectedHiveDf = reorderedInputDf.intersect(cowDf);
outputCount = trimmedHudiDf.count();
log.warn("Input count: " + inputCount + "; output count: " + outputCount);
// the intersected df should be same as inputDf. if not, there is some mismatch.
if (outputCount == 0 || inputSnapshotDf.except(intersectionDf).count() != 0) {
if (outputCount == 0 || reorderedInputDf.except(intersectedHiveDf).count() != 0) {
log.error("Data set validation failed for COW hive table. Total count in hudi " + outputCount + ", input df count " + inputCount);
throw new AssertionError("Hudi hive table contents does not match contents input data. ");
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.integ.testsuite.dag.nodes;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.integ.testsuite.configuration.DeltaConfig.Config;
import org.apache.hudi.integ.testsuite.dag.ExecutionContext;
@@ -49,7 +50,8 @@ public class ValidateDatasetNode extends BaseValidateDatasetNode {
StructType inputSchema) {
String hudiPath = context.getHoodieTestSuiteWriter().getCfg().targetBasePath + "/*/*/*";
log.info("Validate data in target hudi path " + hudiPath);
Dataset<Row> hudiDf = session.read().format("hudi").load(hudiPath);
Dataset<Row> hudiDf = session.read().option(HoodieMetadataConfig.ENABLE.key(), String.valueOf(config.isEnableMetadataValidate()))
.format("hudi").load(hudiPath);
return hudiDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
.drop(HoodieRecord.PARTITION_PATH_METADATA_FIELD).drop(HoodieRecord.FILENAME_METADATA_FIELD);
}

View File

@@ -60,7 +60,7 @@ class SparkInsertNode(dagNodeConfig: Config) extends DagNode[RDD[WriteStatus]] {
.option(DataSourceWriteOptions.COMMIT_METADATA_KEYPREFIX.key, "deltastreamer.checkpoint.key")
.option("deltastreamer.checkpoint.key", context.getWriterContext.getHoodieTestSuiteWriter.getLastCheckpoint.orElse(""))
.option(HoodieWriteConfig.TBL_NAME.key, context.getHoodieTestSuiteWriter.getCfg.targetTableName)
.mode(SaveMode.Overwrite)
.mode(SaveMode.Append)
.save(context.getHoodieTestSuiteWriter.getWriteConfig.getBasePath)
}
}