[HUDI-3659] Reducing the validation frequency with integ tests (#5067)
This commit is contained in:
committed by
GitHub
parent
2551c26183
commit
316e38c71e
@@ -25,11 +25,6 @@ dag_content:
|
|||||||
num_records_insert: 10000
|
num_records_insert: 10000
|
||||||
type: SparkInsertNode
|
type: SparkInsertNode
|
||||||
deps: none
|
deps: none
|
||||||
first_validate:
|
|
||||||
config:
|
|
||||||
validate_hive: false
|
|
||||||
type: ValidateDatasetNode
|
|
||||||
deps: first_insert
|
|
||||||
first_upsert:
|
first_upsert:
|
||||||
config:
|
config:
|
||||||
record_size: 200
|
record_size: 200
|
||||||
@@ -39,7 +34,7 @@ dag_content:
|
|||||||
num_records_upsert: 3000
|
num_records_upsert: 3000
|
||||||
num_partitions_upsert: 50
|
num_partitions_upsert: 50
|
||||||
type: SparkUpsertNode
|
type: SparkUpsertNode
|
||||||
deps: first_validate
|
deps: first_insert
|
||||||
first_delete:
|
first_delete:
|
||||||
config:
|
config:
|
||||||
num_partitions_delete: 50
|
num_partitions_delete: 50
|
||||||
@@ -48,6 +43,7 @@ dag_content:
|
|||||||
deps: first_upsert
|
deps: first_upsert
|
||||||
second_validate:
|
second_validate:
|
||||||
config:
|
config:
|
||||||
|
validate_once_every_itr : 5
|
||||||
validate_hive: false
|
validate_hive: false
|
||||||
delete_input_data: true
|
delete_input_data: true
|
||||||
type: ValidateDatasetNode
|
type: ValidateDatasetNode
|
||||||
|
|||||||
@@ -47,11 +47,6 @@ dag_content:
|
|||||||
engine: "mr"
|
engine: "mr"
|
||||||
type: HiveSyncNode
|
type: HiveSyncNode
|
||||||
deps: third_insert
|
deps: third_insert
|
||||||
first_validate:
|
|
||||||
config:
|
|
||||||
validate_hive: false
|
|
||||||
type: ValidateDatasetNode
|
|
||||||
deps: first_hive_sync
|
|
||||||
first_upsert:
|
first_upsert:
|
||||||
config:
|
config:
|
||||||
record_size: 1000
|
record_size: 1000
|
||||||
@@ -61,7 +56,7 @@ dag_content:
|
|||||||
num_records_upsert: 100
|
num_records_upsert: 100
|
||||||
num_partitions_upsert: 1
|
num_partitions_upsert: 1
|
||||||
type: UpsertNode
|
type: UpsertNode
|
||||||
deps: first_validate
|
deps: first_hive_sync
|
||||||
first_delete:
|
first_delete:
|
||||||
config:
|
config:
|
||||||
num_partitions_delete: 50
|
num_partitions_delete: 50
|
||||||
@@ -76,6 +71,7 @@ dag_content:
|
|||||||
deps: first_delete
|
deps: first_delete
|
||||||
second_validate:
|
second_validate:
|
||||||
config:
|
config:
|
||||||
|
validate_once_every_itr : 5
|
||||||
validate_hive: true
|
validate_hive: true
|
||||||
delete_input_data: true
|
delete_input_data: true
|
||||||
type: ValidateDatasetNode
|
type: ValidateDatasetNode
|
||||||
|
|||||||
@@ -59,6 +59,7 @@ dag_content:
|
|||||||
deps: first_upsert
|
deps: first_upsert
|
||||||
second_validate:
|
second_validate:
|
||||||
config:
|
config:
|
||||||
|
validate_once_every_itr : 5
|
||||||
validate_hive: false
|
validate_hive: false
|
||||||
delete_input_data: true
|
delete_input_data: true
|
||||||
type: ValidateDatasetNode
|
type: ValidateDatasetNode
|
||||||
|
|||||||
@@ -59,6 +59,7 @@ dag_content:
|
|||||||
deps: first_upsert
|
deps: first_upsert
|
||||||
second_validate:
|
second_validate:
|
||||||
config:
|
config:
|
||||||
|
validate_once_every_itr : 5
|
||||||
validate_hive: false
|
validate_hive: false
|
||||||
delete_input_data: true
|
delete_input_data: true
|
||||||
type: ValidateDatasetNode
|
type: ValidateDatasetNode
|
||||||
|
|||||||
@@ -62,6 +62,7 @@ dag_content:
|
|||||||
deps: first_upsert
|
deps: first_upsert
|
||||||
second_validate:
|
second_validate:
|
||||||
config:
|
config:
|
||||||
|
validate_once_every_itr : 5
|
||||||
validate_hive: false
|
validate_hive: false
|
||||||
delete_input_data: false
|
delete_input_data: false
|
||||||
type: ValidateDatasetNode
|
type: ValidateDatasetNode
|
||||||
|
|||||||
@@ -41,11 +41,6 @@ dag_content:
|
|||||||
num_records_insert: 300
|
num_records_insert: 300
|
||||||
deps: second_insert
|
deps: second_insert
|
||||||
type: InsertNode
|
type: InsertNode
|
||||||
first_validate:
|
|
||||||
config:
|
|
||||||
validate_hive: false
|
|
||||||
type: ValidateDatasetNode
|
|
||||||
deps: third_insert
|
|
||||||
first_upsert:
|
first_upsert:
|
||||||
config:
|
config:
|
||||||
record_size: 1000
|
record_size: 1000
|
||||||
@@ -55,7 +50,7 @@ dag_content:
|
|||||||
num_records_upsert: 100
|
num_records_upsert: 100
|
||||||
num_partitions_upsert: 1
|
num_partitions_upsert: 1
|
||||||
type: UpsertNode
|
type: UpsertNode
|
||||||
deps: first_validate
|
deps: third_insert
|
||||||
first_delete:
|
first_delete:
|
||||||
config:
|
config:
|
||||||
num_partitions_delete: 1
|
num_partitions_delete: 1
|
||||||
@@ -64,6 +59,7 @@ dag_content:
|
|||||||
deps: first_upsert
|
deps: first_upsert
|
||||||
second_validate:
|
second_validate:
|
||||||
config:
|
config:
|
||||||
|
validate_once_every_itr : 5
|
||||||
validate_hive: false
|
validate_hive: false
|
||||||
delete_input_data: true
|
delete_input_data: true
|
||||||
type: ValidateDatasetNode
|
type: ValidateDatasetNode
|
||||||
|
|||||||
@@ -89,6 +89,7 @@ public class DeltaConfig implements Serializable {
|
|||||||
private static String START_PARTITION = "start_partition";
|
private static String START_PARTITION = "start_partition";
|
||||||
private static String DELETE_INPUT_DATA = "delete_input_data";
|
private static String DELETE_INPUT_DATA = "delete_input_data";
|
||||||
private static String VALIDATE_HIVE = "validate_hive";
|
private static String VALIDATE_HIVE = "validate_hive";
|
||||||
|
private static String VALIDATE_ONCE_EVERY_ITR = "validate_once_every_itr";
|
||||||
private static String EXECUTE_ITR_COUNT = "execute_itr_count";
|
private static String EXECUTE_ITR_COUNT = "execute_itr_count";
|
||||||
private static String VALIDATE_ARCHIVAL = "validate_archival";
|
private static String VALIDATE_ARCHIVAL = "validate_archival";
|
||||||
private static String VALIDATE_CLEAN = "validate_clean";
|
private static String VALIDATE_CLEAN = "validate_clean";
|
||||||
@@ -216,6 +217,10 @@ public class DeltaConfig implements Serializable {
|
|||||||
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_HIVE, false).toString());
|
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_HIVE, false).toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int validateOnceEveryIteration() {
|
||||||
|
return Integer.valueOf(configsMap.getOrDefault(VALIDATE_ONCE_EVERY_ITR, 1).toString());
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isValidateFullData() {
|
public boolean isValidateFullData() {
|
||||||
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_FULL_DATA, false).toString());
|
return Boolean.valueOf(configsMap.getOrDefault(VALIDATE_FULL_DATA, false).toString());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -74,81 +74,84 @@ public abstract class BaseValidateDatasetNode extends DagNode<Boolean> {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void execute(ExecutionContext context, int curItrCount) throws Exception {
|
public void execute(ExecutionContext context, int curItrCount) throws Exception {
|
||||||
|
int validateOnceEveryItr = config.validateOnceEveryIteration();
|
||||||
SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
|
int itrCountToExecute = config.getIterationCountToExecute();
|
||||||
|
if ((itrCountToExecute != -1 && itrCountToExecute == curItrCount) ||
|
||||||
// todo: Fix partitioning schemes. For now, assumes data based partitioning.
|
(itrCountToExecute == -1 && ((curItrCount % validateOnceEveryItr) == 0))) {
|
||||||
String inputPath = context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
|
SparkSession session = SparkSession.builder().sparkContext(context.getJsc().sc()).getOrCreate();
|
||||||
log.warn("Validation using data from input path " + inputPath);
|
// todo: Fix partitioning schemes. For now, assumes data based partitioning.
|
||||||
// listing batches to be validated
|
String inputPath = context.getHoodieTestSuiteWriter().getCfg().inputBasePath + "/*/*";
|
||||||
String inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
|
log.warn("Validation using data from input path " + inputPath);
|
||||||
if (log.isDebugEnabled()) {
|
// listing batches to be validated
|
||||||
FileSystem fs = new Path(inputPathStr)
|
String inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
|
||||||
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
|
if (log.isDebugEnabled()) {
|
||||||
FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
|
|
||||||
log.info("fileStatuses length: " + fileStatuses.length);
|
|
||||||
for (FileStatus fileStatus : fileStatuses) {
|
|
||||||
log.debug("Listing all Micro batches to be validated :: " + fileStatus.getPath().toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Dataset<Row> inputSnapshotDf = getInputDf(context, session, inputPath);
|
|
||||||
|
|
||||||
// read from hudi and remove meta columns.
|
|
||||||
Dataset<Row> trimmedHudiDf = getDatasetToValidate(session, context, inputSnapshotDf.schema());
|
|
||||||
if (config.isValidateFullData()) {
|
|
||||||
log.debug("Validating full dataset");
|
|
||||||
Dataset<Row> exceptInputDf = inputSnapshotDf.except(trimmedHudiDf);
|
|
||||||
Dataset<Row> exceptHudiDf = trimmedHudiDf.except(inputSnapshotDf);
|
|
||||||
long exceptInputCount = exceptInputDf.count();
|
|
||||||
long exceptHudiCount = exceptHudiDf.count();
|
|
||||||
log.debug("Except input df count " + exceptInputDf + ", except hudi count " + exceptHudiCount);
|
|
||||||
if (exceptInputCount != 0 || exceptHudiCount != 0) {
|
|
||||||
log.error("Data set validation failed. Total count in hudi " + trimmedHudiDf.count() + ", input df count " + inputSnapshotDf.count()
|
|
||||||
+ ". InputDf except hudi df = " + exceptInputCount + ", Hudi df except Input df " + exceptHudiCount);
|
|
||||||
throw new AssertionError("Hudi contents does not match contents input data. ");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf);
|
|
||||||
long inputCount = inputSnapshotDf.count();
|
|
||||||
long outputCount = trimmedHudiDf.count();
|
|
||||||
log.debug("Input count: " + inputCount + "; output count: " + outputCount);
|
|
||||||
// the intersected df should be same as inputDf. if not, there is some mismatch.
|
|
||||||
if (outputCount == 0 || inputCount == 0 || inputSnapshotDf.except(intersectionDf).count() != 0) {
|
|
||||||
log.error("Data set validation failed. Total count in hudi " + outputCount + ", input df count " + inputCount);
|
|
||||||
throw new AssertionError("Hudi contents does not match contents input data. ");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (config.isValidateHive()) {
|
|
||||||
String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key());
|
|
||||||
String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
|
|
||||||
log.warn("Validating hive table with db : " + database + " and table : " + tableName);
|
|
||||||
session.sql("REFRESH TABLE " + database + "." + tableName);
|
|
||||||
Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " +
|
|
||||||
"test_suite_source_ordering_field FROM " + database + "." + tableName);
|
|
||||||
Dataset<Row> reorderedInputDf = inputSnapshotDf.select("_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare",
|
|
||||||
"_hoodie_is_deleted", "test_suite_source_ordering_field");
|
|
||||||
|
|
||||||
Dataset<Row> intersectedHiveDf = reorderedInputDf.intersect(cowDf);
|
|
||||||
outputCount = trimmedHudiDf.count();
|
|
||||||
log.warn("Input count: " + inputCount + "; output count: " + outputCount);
|
|
||||||
// the intersected df should be same as inputDf. if not, there is some mismatch.
|
|
||||||
if (outputCount == 0 || reorderedInputDf.except(intersectedHiveDf).count() != 0) {
|
|
||||||
log.error("Data set validation failed for COW hive table. Total count in hudi " + outputCount + ", input df count " + inputCount);
|
|
||||||
throw new AssertionError("Hudi hive table contents does not match contents input data. ");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// if delete input data is enabled, erase input data.
|
|
||||||
if (config.isDeleteInputData()) {
|
|
||||||
// clean up input data for current group of writes.
|
|
||||||
inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
|
|
||||||
FileSystem fs = new Path(inputPathStr)
|
FileSystem fs = new Path(inputPathStr)
|
||||||
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
|
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
|
||||||
FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
|
FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
|
||||||
|
log.info("fileStatuses length: " + fileStatuses.length);
|
||||||
for (FileStatus fileStatus : fileStatuses) {
|
for (FileStatus fileStatus : fileStatuses) {
|
||||||
log.debug("Micro batch to be deleted " + fileStatus.getPath().toString());
|
log.debug("Listing all Micro batches to be validated :: " + fileStatus.getPath().toString());
|
||||||
fs.delete(fileStatus.getPath(), true);
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Dataset<Row> inputSnapshotDf = getInputDf(context, session, inputPath);
|
||||||
|
|
||||||
|
// read from hudi and remove meta columns.
|
||||||
|
Dataset<Row> trimmedHudiDf = getDatasetToValidate(session, context, inputSnapshotDf.schema());
|
||||||
|
if (config.isValidateFullData()) {
|
||||||
|
log.debug("Validating full dataset");
|
||||||
|
Dataset<Row> exceptInputDf = inputSnapshotDf.except(trimmedHudiDf);
|
||||||
|
Dataset<Row> exceptHudiDf = trimmedHudiDf.except(inputSnapshotDf);
|
||||||
|
long exceptInputCount = exceptInputDf.count();
|
||||||
|
long exceptHudiCount = exceptHudiDf.count();
|
||||||
|
log.debug("Except input df count " + exceptInputDf + ", except hudi count " + exceptHudiCount);
|
||||||
|
if (exceptInputCount != 0 || exceptHudiCount != 0) {
|
||||||
|
log.error("Data set validation failed. Total count in hudi " + trimmedHudiDf.count() + ", input df count " + inputSnapshotDf.count()
|
||||||
|
+ ". InputDf except hudi df = " + exceptInputCount + ", Hudi df except Input df " + exceptHudiCount);
|
||||||
|
throw new AssertionError("Hudi contents does not match contents input data. ");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Dataset<Row> intersectionDf = inputSnapshotDf.intersect(trimmedHudiDf);
|
||||||
|
long inputCount = inputSnapshotDf.count();
|
||||||
|
long outputCount = trimmedHudiDf.count();
|
||||||
|
log.debug("Input count: " + inputCount + "; output count: " + outputCount);
|
||||||
|
// the intersected df should be same as inputDf. if not, there is some mismatch.
|
||||||
|
if (outputCount == 0 || inputCount == 0 || inputSnapshotDf.except(intersectionDf).count() != 0) {
|
||||||
|
log.error("Data set validation failed. Total count in hudi " + outputCount + ", input df count " + inputCount);
|
||||||
|
throw new AssertionError("Hudi contents does not match contents input data. ");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (config.isValidateHive()) {
|
||||||
|
String database = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_DATABASE().key());
|
||||||
|
String tableName = context.getWriterContext().getProps().getString(DataSourceWriteOptions.HIVE_TABLE().key());
|
||||||
|
log.warn("Validating hive table with db : " + database + " and table : " + tableName);
|
||||||
|
session.sql("REFRESH TABLE " + database + "." + tableName);
|
||||||
|
Dataset<Row> cowDf = session.sql("SELECT _row_key, rider, driver, begin_lat, begin_lon, end_lat, end_lon, fare, _hoodie_is_deleted, " +
|
||||||
|
"test_suite_source_ordering_field FROM " + database + "." + tableName);
|
||||||
|
Dataset<Row> reorderedInputDf = inputSnapshotDf.select("_row_key", "rider", "driver", "begin_lat", "begin_lon", "end_lat", "end_lon", "fare",
|
||||||
|
"_hoodie_is_deleted", "test_suite_source_ordering_field");
|
||||||
|
|
||||||
|
Dataset<Row> intersectedHiveDf = reorderedInputDf.intersect(cowDf);
|
||||||
|
outputCount = trimmedHudiDf.count();
|
||||||
|
log.warn("Input count: " + inputCount + "; output count: " + outputCount);
|
||||||
|
// the intersected df should be same as inputDf. if not, there is some mismatch.
|
||||||
|
if (outputCount == 0 || reorderedInputDf.except(intersectedHiveDf).count() != 0) {
|
||||||
|
log.error("Data set validation failed for COW hive table. Total count in hudi " + outputCount + ", input df count " + inputCount);
|
||||||
|
throw new AssertionError("Hudi hive table contents does not match contents input data. ");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// if delete input data is enabled, erase input data.
|
||||||
|
if (config.isDeleteInputData()) {
|
||||||
|
// clean up input data for current group of writes.
|
||||||
|
inputPathStr = context.getHoodieTestSuiteWriter().getCfg().inputBasePath;
|
||||||
|
FileSystem fs = new Path(inputPathStr)
|
||||||
|
.getFileSystem(context.getHoodieTestSuiteWriter().getConfiguration());
|
||||||
|
FileStatus[] fileStatuses = fs.listStatus(new Path(inputPathStr));
|
||||||
|
for (FileStatus fileStatus : fileStatuses) {
|
||||||
|
log.debug("Micro batch to be deleted " + fileStatus.getPath().toString());
|
||||||
|
fs.delete(fileStatus.getPath(), true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user