1
0

[HUDI-2072] Add pre-commit validator framework (#3153)

* [HUDI-2072] Add pre-commit validator framework

* trigger Travis rebuild
This commit is contained in:
satishkotha
2021-08-03 12:07:45 -07:00
committed by GitHub
parent bec23bda50
commit 826a04d142
14 changed files with 1130 additions and 32 deletions

View File

@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.config;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import javax.annotation.concurrent.Immutable;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
/**
* Storage related config.
*/
@Immutable
public class HoodiePreCommitValidatorConfig extends HoodieConfig {
public static final ConfigProperty<String> PRE_COMMIT_VALIDATORS = ConfigProperty
.key("hoodie.precommit.validators")
.defaultValue("")
.withDocumentation("Comma separated list of class names that can be invoked to validate commit");
public static final String VALIDATOR_TABLE_VARIABLE = "<TABLE_NAME>";
public static final ConfigProperty<String> PRE_COMMIT_VALIDATORS_EQUALITY_SQL_QUERIES = ConfigProperty
.key("hoodie.precommit.validators.equality.sql.queries")
.defaultValue("")
.withDocumentation("Spark SQL queries to run on table before committing new data to validate state before and after commit."
+ " Multiple queries separated by ';' delimiter are supported."
+ " Example: \"select count(*) from \\<TABLE_NAME\\>"
+ " Note \\<TABLE_NAME\\> is replaced by table state before and after commit.");
public static final ConfigProperty<String> PRE_COMMIT_VALIDATORS_SINGLE_VALUE_SQL_QUERIES = ConfigProperty
.key("hoodie.precommit.validators.single.value.sql.queries")
.defaultValue("")
.withDocumentation("Spark SQL queries to run on table before committing new data to validate state after commit."
+ "Multiple queries separated by ';' delimiter are supported."
+ "Expected result is included as part of query separated by '#'. Example query: 'query1#result1:query2#result2'"
+ "Note \\<TABLE_NAME\\> variable is expected to be present in query.");
/**
* Spark SQL queries to run on table before committing new data to validate state before and after commit.
* Multiple queries separated by ';' delimiter are supported.
* Example query: 'select count(*) from \<TABLE_NAME\> where col=null'
* Note \<TABLE_NAME\> variable is expected to be present in query.
*/
public static final ConfigProperty<String> PRE_COMMIT_VALIDATORS_INEQUALITY_SQL_QUERIES = ConfigProperty
.key("hoodie.precommit.validators.inequality.sql.queries")
.defaultValue("")
.withDocumentation("Spark SQL queries to run on table before committing new data to validate state before and after commit."
+ "Multiple queries separated by ';' delimiter are supported."
+ "Example query: 'select count(*) from \\<TABLE_NAME\\> where col=null'"
+ "Note \\<TABLE_NAME\\> variable is expected to be present in query.");
private HoodiePreCommitValidatorConfig() {
super();
}
public static HoodiePreCommitValidatorConfig.Builder newBuilder() {
return new Builder();
}
public static class Builder {
private final HoodiePreCommitValidatorConfig preCommitValidatorConfig = new HoodiePreCommitValidatorConfig();
public Builder fromFile(File propertiesFile) throws IOException {
try (FileReader reader = new FileReader(propertiesFile)) {
this.preCommitValidatorConfig.getProps().load(reader);
return this;
}
}
public Builder fromProperties(Properties props) {
this.preCommitValidatorConfig.getProps().putAll(props);
return this;
}
public Builder withPreCommitValidator(String preCommitValidators) {
preCommitValidatorConfig.setValue(PRE_COMMIT_VALIDATORS, preCommitValidators);
return this;
}
public Builder withPrecommitValidatorEqualitySqlQueries(String preCommitValidators) {
preCommitValidatorConfig.setValue(PRE_COMMIT_VALIDATORS_EQUALITY_SQL_QUERIES, preCommitValidators);
return this;
}
public Builder withPrecommitValidatorSingleResultSqlQueries(String preCommitValidators) {
preCommitValidatorConfig.setValue(PRE_COMMIT_VALIDATORS_SINGLE_VALUE_SQL_QUERIES, preCommitValidators);
return this;
}
public Builder withPrecommitValidatorInequalitySqlQueries(String preCommitValidators) {
preCommitValidatorConfig.setValue(PRE_COMMIT_VALIDATORS_INEQUALITY_SQL_QUERIES, preCommitValidators);
return this;
}
public HoodiePreCommitValidatorConfig build() {
preCommitValidatorConfig.setDefaults(HoodiePreCommitValidatorConfig.class.getName());
return preCommitValidatorConfig;
}
}
}

View File

@@ -1280,6 +1280,22 @@ public class HoodieWriteConfig extends HoodieConfig {
public String getWriteMetaKeyPrefixes() {
return getString(WRITE_META_KEY_PREFIXES_PROP);
}
public String getPreCommitValidators() {
return getString(HoodiePreCommitValidatorConfig.PRE_COMMIT_VALIDATORS);
}
public String getPreCommitValidatorEqualitySqlQueries() {
return getString(HoodiePreCommitValidatorConfig.PRE_COMMIT_VALIDATORS_EQUALITY_SQL_QUERIES);
}
public String getPreCommitValidatorSingleResultSqlQueries() {
return getString(HoodiePreCommitValidatorConfig.PRE_COMMIT_VALIDATORS_SINGLE_VALUE_SQL_QUERIES);
}
public String getPreCommitValidatorInequalitySqlQueries() {
return getString(HoodiePreCommitValidatorConfig.PRE_COMMIT_VALIDATORS_INEQUALITY_SQL_QUERIES);
}
public boolean allowEmptyCommit() {
return getBooleanOrDefault(ALLOW_EMPTY_COMMIT);
@@ -1302,6 +1318,7 @@ public class HoodieWriteConfig extends HoodieConfig {
private boolean isPayloadConfigSet = false;
private boolean isMetadataConfigSet = false;
private boolean isLockConfigSet = false;
private boolean isPreCommitValidationConfigSet = false;
public Builder withEngineType(EngineType engineType) {
this.engineType = engineType;
@@ -1451,6 +1468,12 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
public Builder withPreCommitValidatorConfig(HoodiePreCommitValidatorConfig validatorConfig) {
writeConfig.getProps().putAll(validatorConfig.getProps());
isPreCommitValidationConfigSet = true;
return this;
}
public Builder withMetricsConfig(HoodieMetricsConfig metricsConfig) {
writeConfig.getProps().putAll(metricsConfig.getProps());
isMetricsConfigSet = true;
@@ -1620,7 +1643,8 @@ public class HoodieWriteConfig extends HoodieConfig {
HoodieMetadataConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
writeConfig.setDefaultOnCondition(!isLockConfigSet,
HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
writeConfig.setDefaultOnCondition(!isPreCommitValidationConfigSet,
HoodiePreCommitValidatorConfig.newBuilder().fromProperties(writeConfig.getProps()).build());
writeConfig.setDefaultValue(TIMELINE_LAYOUT_VERSION, String.valueOf(TimelineLayoutVersion.CURR_VERSION));
}

View File

@@ -33,6 +33,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
@@ -123,7 +124,20 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload, I,
return table.getMetaClient().getCommitActionType();
}
/**
* Check if any validators are configured and run those validations. If any of the validations fail, throws HoodieValidationException.
*/
protected void runPrecommitValidators(HoodieWriteMetadata<O> writeMetadata) {
if (StringUtils.isNullOrEmpty(config.getPreCommitValidators())) {
return;
}
throw new HoodieIOException("Precommit validation not implemented for all engines yet");
}
protected void commitOnAutoCommit(HoodieWriteMetadata result) {
// validate commit action before committing result
runPrecommitValidators(result);
if (config.shouldAutoCommit()) {
LOG.info("Auto commit enabled: Committing " + instantTime);
autoCommit(extraMetadata, result);

View File

@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.utils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.validator.SparkPreCommitValidator;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.view.HoodieTablePreCommitFileSystemView;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import scala.collection.JavaConverters;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Spark validator utils to verify and run any precommit validators configured.
*/
public class SparkValidatorUtils {
private static final Logger LOG = LogManager.getLogger(BaseSparkCommitActionExecutor.class);
/**
* Check configured pre-commit validators and run them. Note that this only works for COW tables
*
* Throw error if there are validation failures.
*/
public static void runValidators(HoodieWriteConfig config,
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata,
HoodieEngineContext context,
HoodieTable table,
String instantTime) {
if (StringUtils.isNullOrEmpty(config.getPreCommitValidators())) {
LOG.info("no validators configured.");
} else {
if (!writeMetadata.getWriteStats().isPresent()) {
writeMetadata.setWriteStats(writeMetadata.getWriteStatuses().map(WriteStatus::getStat).collect());
}
Set<String> partitionsModified = new HashSet<>(writeMetadata.getWriteStats().get().stream().map(writeStats ->
writeStats.getPartitionPath()).collect(Collectors.toList()));
SQLContext sqlContext = new SQLContext(HoodieSparkEngineContext.getSparkContext(context));
// Refresh timeline to ensure validator sees the any other operations done on timeline (async operations such as other clustering/compaction/rollback)
table.getMetaClient().reloadActiveTimeline();
Dataset<Row> beforeState = getRecordsFromCommittedFiles(sqlContext, partitionsModified, table).cache();
Dataset<Row> afterState = getRecordsFromPendingCommits(sqlContext, partitionsModified, writeMetadata, table, instantTime).cache();
Stream<SparkPreCommitValidator> validators = Arrays.stream(config.getPreCommitValidators().split(","))
.map(validatorClass -> {
return ((SparkPreCommitValidator) ReflectionUtils.loadClass(validatorClass,
new Class<?>[] {HoodieSparkTable.class, HoodieEngineContext.class, HoodieWriteConfig.class},
table, context, config));
});
boolean allSuccess = validators.map(v -> runValidatorAsync(v, writeMetadata, beforeState, afterState, instantTime)).map(CompletableFuture::join)
.reduce(true, Boolean::logicalAnd);
if (allSuccess) {
LOG.info("All validations succeeded");
} else {
LOG.error("At least one pre-commit validation failed");
throw new HoodieValidationException("At least one pre-commit validation failed");
}
}
}
/**
* Run validators in a separate threadpool for parallelism. Each of validator can submit a distributed spark job if needed.
*/
private static CompletableFuture<Boolean> runValidatorAsync(SparkPreCommitValidator validator, HoodieWriteMetadata writeMetadata,
Dataset<Row> beforeState, Dataset<Row> afterState, String instantTime) {
return CompletableFuture.supplyAsync(() -> {
try {
validator.validate(instantTime, writeMetadata, beforeState, afterState);
LOG.info("validation complete for " + validator.getClass().getName());
return true;
} catch (HoodieValidationException e) {
LOG.error("validation failed for " + validator.getClass().getName());
return false;
}
});
}
/**
* Get records from partitions modified as a dataset.
* Note that this only works for COW tables.
*/
public static Dataset<Row> getRecordsFromCommittedFiles(SQLContext sqlContext,
Set<String> partitionsAffected, HoodieTable table) {
List<String> committedFiles = partitionsAffected.stream()
.flatMap(partition -> table.getBaseFileOnlyView().getLatestBaseFiles(partition).map(bf -> bf.getPath()))
.collect(Collectors.toList());
if (committedFiles.isEmpty()) {
return sqlContext.emptyDataFrame();
}
return readRecordsForBaseFiles(sqlContext, committedFiles);
}
/**
* Get records from specified list of data files.
*/
public static Dataset<Row> readRecordsForBaseFiles(SQLContext sqlContext, List<String> baseFilePaths) {
return sqlContext.read().parquet(JavaConverters.asScalaBufferConverter(baseFilePaths).asScala());
}
/**
* Get reads from paritions modified including any inflight commits.
* Note that this only works for COW tables
*/
public static Dataset<Row> getRecordsFromPendingCommits(SQLContext sqlContext,
Set<String> partitionsAffected,
HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata,
HoodieTable table,
String instantTime) {
// build file system view with pending commits
HoodieTablePreCommitFileSystemView fsView = new HoodieTablePreCommitFileSystemView(table.getMetaClient(),
table.getHoodieView(),
writeMetadata.getWriteStats().get(),
writeMetadata.getPartitionToReplaceFileIds(),
instantTime);
List<String> newFiles = partitionsAffected.stream()
.flatMap(partition -> fsView.getLatestBaseFiles(partition).map(bf -> bf.getPath()))
.collect(Collectors.toList());
if (newFiles.isEmpty()) {
return sqlContext.emptyDataFrame();
}
return readRecordsForBaseFiles(sqlContext, newFiles);
}
}

View File

@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.validator;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Validator can be configured pre-commit.
*/
public abstract class SparkPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> {
private static final Logger LOG = LogManager.getLogger(SparkPreCommitValidator.class);
private HoodieSparkTable<T> table;
private HoodieEngineContext engineContext;
private HoodieWriteConfig writeConfig;
protected SparkPreCommitValidator(HoodieSparkTable<T> table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
this.table = table;
this.engineContext = engineContext;
this.writeConfig = writeConfig;
}
protected Set<String> getPartitionsModified(HoodieWriteMetadata<O> writeResult) {
Set<String> partitionsModified;
if (writeResult.getWriteStats().isPresent()) {
partitionsModified = writeResult.getWriteStats().get().stream().map(HoodieWriteStat::getPartitionPath).collect(Collectors.toSet());
} else {
partitionsModified = new HashSet<>(writeResult.getWriteStatuses().map(WriteStatus::getPartitionPath).collect());
}
return partitionsModified;
}
/**
* Verify the data written as part of specified instant.
* Throw HoodieValidationException if any unexpected data is written (Example: data files are not readable for some reason).
*/
public void validate(String instantTime, HoodieWriteMetadata<O> writeResult, Dataset<Row> before, Dataset<Row> after) throws HoodieValidationException {
HoodieTimer timer = new HoodieTimer().startTimer();
try {
validateRecordsBeforeAndAfter(before, after, getPartitionsModified(writeResult));
} finally {
LOG.info(getClass() + " validator took " + timer.endTimer() + " ms");
}
}
/**
* Takes input of RDD 1) before clustering and 2) after clustering. Perform required validation
* and throw error if validation fails
*/
protected abstract void validateRecordsBeforeAndAfter(Dataset<Row> before,
Dataset<Row> after,
Set<String> partitionsAffected);
public HoodieTable getHoodieTable() {
return this.table;
}
public HoodieEngineContext getEngineContext() {
return this.engineContext;
}
public HoodieWriteConfig getWriteConfig() {
return this.writeConfig;
}
}

View File

@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.validator;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
/**
* Validator to run sql query and compare table state
* 1) before new commit started.
* 2) current inflight commit (if successful).
*
* Expects both queries to return same result.
*/
public class SqlQueryEqualityPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends SqlQueryPreCommitValidator<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(SqlQueryEqualityPreCommitValidator.class);
public SqlQueryEqualityPreCommitValidator(HoodieSparkTable<T> table, HoodieEngineContext engineContext, HoodieWriteConfig config) {
super(table, engineContext, config);
}
@Override
protected String getQueryConfigName() {
return HoodiePreCommitValidatorConfig.PRE_COMMIT_VALIDATORS_EQUALITY_SQL_QUERIES.key();
}
@Override
protected void validateUsingQuery(String query, String prevTableSnapshot, String newTableSnapshot, SQLContext sqlContext) {
String queryWithPrevSnapshot = query.replaceAll(HoodiePreCommitValidatorConfig.VALIDATOR_TABLE_VARIABLE, prevTableSnapshot);
String queryWithNewSnapshot = query.replaceAll(HoodiePreCommitValidatorConfig.VALIDATOR_TABLE_VARIABLE, newTableSnapshot);
LOG.info("Running query on previous state: " + queryWithPrevSnapshot);
Dataset<Row> prevRows = sqlContext.sql(queryWithPrevSnapshot);
LOG.info("Running query on new state: " + queryWithNewSnapshot);
Dataset<Row> newRows = sqlContext.sql(queryWithNewSnapshot);
printAllRowsIfDebugEnabled(prevRows);
printAllRowsIfDebugEnabled(newRows);
boolean areDatasetsEqual = prevRows.intersect(newRows).count() == prevRows.count();
LOG.info("Completed Equality Validation, datasets equal? " + areDatasetsEqual);
if (!areDatasetsEqual) {
LOG.error("query validation failed. See stdout for sample query results. Query: " + query);
System.out.println("Expected result (sample records only):");
prevRows.show();
System.out.println("Actual result (sample records only):");
newRows.show();
throw new HoodieValidationException("Query validation failed for '" + query + "'. See stdout for expected vs actual records");
}
}
}

View File

@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.validator;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
/**
* Validator to run sql query and compare table state
* 1) before new commit started.
* 2) current inflight commit (if successful).
*
* Expects query results dont match.
*/
public class SqlQueryInequalityPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends SqlQueryPreCommitValidator<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(SqlQueryInequalityPreCommitValidator.class);
public SqlQueryInequalityPreCommitValidator(HoodieSparkTable<T> table, HoodieEngineContext engineContext, HoodieWriteConfig config) {
super(table, engineContext, config);
}
@Override
protected String getQueryConfigName() {
return HoodiePreCommitValidatorConfig.PRE_COMMIT_VALIDATORS_INEQUALITY_SQL_QUERIES.key();
}
@Override
protected void validateUsingQuery(String query, String prevTableSnapshot, String newTableSnapshot, SQLContext sqlContext) {
String queryWithPrevSnapshot = query.replaceAll(HoodiePreCommitValidatorConfig.VALIDATOR_TABLE_VARIABLE, prevTableSnapshot);
String queryWithNewSnapshot = query.replaceAll(HoodiePreCommitValidatorConfig.VALIDATOR_TABLE_VARIABLE, newTableSnapshot);
LOG.info("Running query on previous state: " + queryWithPrevSnapshot);
Dataset<Row> prevRows = sqlContext.sql(queryWithPrevSnapshot);
LOG.info("Running query on new state: " + queryWithNewSnapshot);
Dataset<Row> newRows = sqlContext.sql(queryWithNewSnapshot);
printAllRowsIfDebugEnabled(prevRows);
printAllRowsIfDebugEnabled(newRows);
boolean areDatasetsEqual = prevRows.intersect(newRows).count() == prevRows.count();
LOG.info("Completed Inequality Validation, datasets equal? " + areDatasetsEqual);
if (areDatasetsEqual) {
LOG.error("query validation failed. See stdout for sample query results. Query: " + query);
System.out.println("Expected query results to be inequal, but they are same. Result (sample records only):");
prevRows.show();
throw new HoodieValidationException("Query validation failed for '" + query
+ "'. Expected " + prevRows.count() + " rows, Found " + newRows.count());
}
}
}

View File

@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.validator;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import java.util.Arrays;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Validator framework to run sql queries and compare table state at different locations.
*/
public abstract class SqlQueryPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends SparkPreCommitValidator<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(SqlQueryPreCommitValidator.class);
private static final AtomicInteger TABLE_COUNTER = new AtomicInteger(0);
public SqlQueryPreCommitValidator(HoodieSparkTable<T> table, HoodieEngineContext engineContext, HoodieWriteConfig config) {
super(table, engineContext, config);
}
/**
* Takes input datasets 1) before commit started and 2) with inflight commit. Perform required validation
* and throw error if validation fails
*/
@Override
public void validateRecordsBeforeAndAfter(Dataset<Row> before, Dataset<Row> after, final Set<String> partitionsAffected) {
String hoodieTableName = "staged_table_" + TABLE_COUNTER.incrementAndGet();
String hoodieTableBeforeCurrentCommit = hoodieTableName + "_before";
String hoodieTableWithInflightCommit = hoodieTableName + "_after";
before.registerTempTable(hoodieTableBeforeCurrentCommit);
after.registerTempTable(hoodieTableWithInflightCommit);
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext());
SQLContext sqlContext = new SQLContext(jsc);
String[] queries = getQueriesToRun();
//TODO run this in a thread pool to improve parallelism
Arrays.stream(queries).forEach(query ->
validateUsingQuery(query, hoodieTableBeforeCurrentCommit, hoodieTableWithInflightCommit, sqlContext));
}
protected String[] getQueriesToRun() {
String sqlQueriesConfigured = getWriteConfig().getProps().getProperty(getQueryConfigName());
if (StringUtils.isNullOrEmpty(sqlQueriesConfigured)) {
throw new HoodieValidationException("Sql validator configured incorrectly. expecting at least one query. Found 0 queries in "
+ sqlQueriesConfigured);
}
return sqlQueriesConfigured.trim().split(";");
}
protected void printAllRowsIfDebugEnabled(Dataset<Row> dataset) {
if (LOG.isDebugEnabled()) {
dataset = dataset.cache();
LOG.debug("Printing all rows from query validation:");
dataset.show(Integer.MAX_VALUE,false);
}
}
protected abstract String getQueryConfigName();
protected abstract void validateUsingQuery(String query, String prevTableSnapshot, String newTableSnapshot, SQLContext sqlContext);
}

View File

@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.client.validator;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import java.util.List;
/**
* Validator to run sql queries on new table state and expects a single result. If the result doesnt match expected result,
* throw validation error.
*
* Example configuration: "query1#expectedResult1;query2#expectedResult2;"
*/
public class SqlQuerySingleResultPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends SqlQueryPreCommitValidator<T, I, K, O> {
private static final Logger LOG = LogManager.getLogger(SqlQueryInequalityPreCommitValidator.class);
public SqlQuerySingleResultPreCommitValidator(HoodieSparkTable<T> table, HoodieEngineContext engineContext, HoodieWriteConfig config) {
super(table, engineContext, config);
}
@Override
protected String getQueryConfigName() {
return HoodiePreCommitValidatorConfig.PRE_COMMIT_VALIDATORS_SINGLE_VALUE_SQL_QUERIES.key();
}
@Override
protected void validateUsingQuery(String query, String prevTableSnapshot, String newTableSnapshot, SQLContext sqlContext) {
String[] queryWithExpectedResult = query.split("#");
if (queryWithExpectedResult.length != 2) {
throw new HoodieValidationException("Invalid query format " + query);
}
String queryToRun = queryWithExpectedResult[0];
String expectedResult = queryWithExpectedResult[1];
LOG.info("Running query on new state: " + queryToRun);
String queryWithNewSnapshot = queryToRun.replaceAll(HoodiePreCommitValidatorConfig.VALIDATOR_TABLE_VARIABLE, newTableSnapshot);
List<Row> newRows = sqlContext.sql(queryWithNewSnapshot).collectAsList();
if (newRows.size() != 1 && newRows.get(0).size() != 1) {
throw new HoodieValidationException("Invalid query result. expect single value for '" + query + "'");
}
Object result = newRows.get(0).apply(0);
if (result == null || !expectedResult.equals(result.toString())) {
LOG.error("Mismatch query result. Expected: " + expectedResult + " got " + result + "Query: " + query);
throw new HoodieValidationException("Query validation failed for '" + query
+ "'. Expected " + expectedResult + " rows, Found " + result);
} else {
LOG.info("Query validation successful. Expected: " + expectedResult + " got " + result);
}
}
}

View File

@@ -31,6 +31,7 @@ import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector;
import org.apache.hudi.client.bootstrap.translator.BootstrapPartitionPathTranslator;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.utils.SparkMemoryUtils;
import org.apache.hudi.client.utils.SparkValidatorUtils;
import org.apache.hudi.common.bootstrap.FileStatusUtils;
import org.apache.hudi.common.bootstrap.index.BootstrapIndex;
import org.apache.hudi.common.config.TypedProperties;
@@ -422,4 +423,9 @@ public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>
protected Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
throw new UnsupportedOperationException("Should not called in bootstrap code path");
}
@Override
protected void runPrecommitValidators(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
SparkValidatorUtils.runValidators(config, writeMetadata, context, table, instantTime);
}
}

View File

@@ -21,6 +21,7 @@ package org.apache.hudi.table.action.commit;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.utils.SparkMemoryUtils;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.client.utils.SparkValidatorUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
@@ -387,4 +388,8 @@ public abstract class BaseSparkCommitActionExecutor<T extends HoodieRecordPayloa
return getUpsertPartitioner(profile);
}
@Override
protected void runPrecommitValidators(HoodieWriteMetadata<JavaRDD<WriteStatus>> writeMetadata) {
SparkValidatorUtils.runValidators(config, writeMetadata, context, table, instantTime);
}
}

View File

@@ -25,6 +25,10 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.client.validator.SparkPreCommitValidator;
import org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator;
import org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
@@ -34,6 +38,7 @@ import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.WriteOperationType;
@@ -53,17 +58,21 @@ import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieClusteringConfig;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
import org.apache.hudi.exception.HoodieRollbackException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndex.IndexType;
import org.apache.hudi.io.HoodieMergeHandle;
@@ -169,6 +178,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
private HoodieTestTable testTable;
private static final String COUNT_SQL_QUERY_FOR_VALIDATION = "select count(*) from <TABLE_NAME>";
@BeforeEach
public void setUpTestTable() {
testTable = HoodieSparkWriteableTestTable.of(metaClient);
@@ -256,6 +267,106 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
}
}
@Test
public void testPreCommitValidatorsOnInsert() throws Exception {
int numRecords = 200;
HoodiePreCommitValidatorConfig validatorConfig = HoodiePreCommitValidatorConfig.newBuilder()
.withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName())
.withPrecommitValidatorSingleResultSqlQueries(COUNT_SQL_QUERY_FOR_VALIDATION + "#" + numRecords)
.build();
HoodieWriteConfig config = getConfigBuilder().withAutoCommit(true)
.withPreCommitValidatorConfig(validatorConfig).build();
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn = (writeClient, recordRDD, instantTime) ->
writeClient.bulkInsert(recordRDD, instantTime, Option.empty());
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
JavaRDD<WriteStatus> result = insertFirstBatch(config, client, newCommitTime,
"000", numRecords, writeFn, false, false, numRecords);
assertTrue(testTable.commitExists(newCommitTime));
}
}
@Test
public void testPreCommitValidationFailureOnInsert() throws Exception {
int numRecords = 200;
HoodiePreCommitValidatorConfig validatorConfig = HoodiePreCommitValidatorConfig.newBuilder()
.withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName())
//set wrong value for expected number of rows
.withPrecommitValidatorSingleResultSqlQueries(COUNT_SQL_QUERY_FOR_VALIDATION + "#" + 500)
.build();
HoodieWriteConfig config = getConfigBuilder().withPreCommitValidatorConfig(validatorConfig).build();
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn = (writeClient, recordRDD, instantTime) ->
writeClient.bulkInsert(recordRDD, instantTime, Option.empty());
JavaRDD<WriteStatus> result = insertFirstBatch(config, client, newCommitTime,
"000", numRecords, writeFn, false, false, numRecords);
fail("Expected validation to fail because we only insert 200 rows. Validation is configured to expect 500 rows");
} catch (HoodieInsertException e) {
if (e.getCause() instanceof HoodieValidationException) {
// expected because wrong value passed
} else {
throw e;
}
}
assertFalse(testTable.commitExists(newCommitTime));
}
@Test
public void testPreCommitValidationWithMultipleInflights() throws Exception {
int numRecords = 200;
HoodiePreCommitValidatorConfig validatorConfig = HoodiePreCommitValidatorConfig.newBuilder()
.withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName())
//set wrong value for expected number of rows
.withPrecommitValidatorSingleResultSqlQueries(COUNT_SQL_QUERY_FOR_VALIDATION + "#" + 500)
.build();
HoodieWriteConfig config = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build())
.withPreCommitValidatorConfig(validatorConfig)
.build();
String instant1 = HoodieActiveTimeline.createNewInstantTime();
try {
insertWithConfig(config, numRecords, instant1);
fail("Expected validation to fail because we only insert 200 rows. Validation is configured to expect 500 rows");
} catch (HoodieInsertException e) {
if (e.getCause() instanceof HoodieValidationException) {
// expected because wrong value passed
} else {
throw e;
}
}
assertFalse(testTable.commitExists(instant1));
assertTrue(testTable.inflightCommitExists(instant1));
numRecords = 300;
validatorConfig = HoodiePreCommitValidatorConfig.newBuilder()
.withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName())
//set wrong value for expected number of rows
.withPrecommitValidatorSingleResultSqlQueries(COUNT_SQL_QUERY_FOR_VALIDATION + "#" + numRecords)
.build();
config = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build())
.withPreCommitValidatorConfig(validatorConfig)
.build();
String instant2 = HoodieActiveTimeline.createNewInstantTime();
// expect pre-commit validators to succeed. Note that validator is expected to exclude inflight instant1
insertWithConfig(config, numRecords, instant2);
assertTrue(testTable.inflightCommitExists(instant1));
assertTrue(testTable.commitExists(instant2));
}
private void insertWithConfig(HoodieWriteConfig config, int numRecords, String instant) throws Exception {
try (SparkRDDWriteClient client = getHoodieWriteClient(config)) {
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn = (writeClient, recordRDD, instantTime) ->
writeClient.bulkInsert(recordRDD, instantTime, Option.empty());
JavaRDD<WriteStatus> result = insertFirstBatch(config, client, instant,
"000", numRecords, writeFn, false, false, numRecords);
}
}
/**
* Test De-duplication behavior for HoodieWriteClient insert API.
*/
@@ -1176,31 +1287,31 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testSimpleClustering(boolean populateMetaFields) throws Exception {
// setup clustering config
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
testClustering(clusteringConfig, populateMetaFields);
testInsertAndClustering(clusteringConfig, populateMetaFields, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testClusteringWithSortColumns(boolean populateMetaFields) throws Exception {
// setup clustering config
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringSortColumns(populateMetaFields ? "_hoodie_record_key" : "_row_key")
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
testClustering(clusteringConfig, populateMetaFields);
testInsertAndClustering(clusteringConfig, populateMetaFields, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
}
@ParameterizedTest
@MethodSource("populateMetaFieldsParams")
public void testPendingClusteringRollback(boolean populateMetaFields) throws Exception {
// setup clustering config
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
// start clustering, but don't commit
List<HoodieRecord> allRecords = testClustering(clusteringConfig, populateMetaFields);
List<HoodieRecord> allRecords = testInsertAndClustering(clusteringConfig, populateMetaFields, false);
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
List<Pair<HoodieInstant, HoodieClusteringPlan>> pendingClusteringPlans =
ClusteringUtils.getAllPendingClusteringPlans(metaClient).collect(Collectors.toList());
@@ -1215,7 +1326,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
dataGen = new HoodieTestDataGenerator();
String commitTime = HoodieActiveTimeline.createNewInstantTime();
allRecords.addAll(dataGen.generateInserts(commitTime, 200));
writeAndVerifyBatch(client, allRecords, commitTime);
writeAndVerifyBatch(client, allRecords, commitTime, populateMetaFields);
// verify pending clustering can be rolled back (even though there is a completed commit greater than pending clustering)
client.rollback(pendingClusteringInstant.getTimestamp());
@@ -1224,50 +1335,141 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
assertEquals(0, ClusteringUtils.getAllPendingClusteringPlans(metaClient).count());
}
private List<HoodieRecord> testClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields) throws Exception {
return testClustering(clusteringConfig, false, populateMetaFields);
@Test
public void testClusteringWithFailingValidator() throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringSortColumns("_hoodie_record_key")
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
try {
testInsertAndClustering(clusteringConfig, true, true, FailingPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, "");
fail("expected pre-commit clustering validation to fail");
} catch (HoodieValidationException e) {
// expected
}
}
@Test
public void testClusteringInvalidConfigForSqlQueryValidator() throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
try {
testInsertAndClustering(clusteringConfig, false, true, SqlQueryEqualityPreCommitValidator.class.getName(), "", "");
fail("expected pre-commit clustering validation to fail because sql query is not configured");
} catch (HoodieValidationException e) {
// expected
}
}
@Test
public void testClusteringInvalidConfigForSqlQuerySingleResultValidator() throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
testInsertAndClustering(clusteringConfig, false, true, SqlQuerySingleResultPreCommitValidator.class.getName(),
"", COUNT_SQL_QUERY_FOR_VALIDATION + "#400");
}
@Test
public void testClusteringInvalidConfigForSqlQuerySingleResultValidatorFailure() throws Exception {
// setup clustering config.
HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10)
.withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build();
try {
testInsertAndClustering(clusteringConfig, false, true, SqlQuerySingleResultPreCommitValidator.class.getName(),
"", COUNT_SQL_QUERY_FOR_VALIDATION + "#802");
fail("expected pre-commit clustering validation to fail because of count mismatch. expect 400 rows, not 802");
} catch (HoodieValidationException e) {
// expected
}
}
private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering) throws Exception {
return testInsertAndClustering(clusteringConfig, populateMetaFields, completeClustering, "", "", "");
}
private List<HoodieRecord> testClustering(HoodieClusteringConfig clusteringConfig, boolean completeClustering, boolean populateMetaFields) throws Exception {
private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields,
boolean completeClustering, String validatorClasses,
String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation) throws Exception {
List<HoodieRecord> allRecords = testInsertTwoBatches(populateMetaFields);
testClustering(clusteringConfig, populateMetaFields, completeClustering, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, allRecords);
return allRecords;
}
private List<HoodieRecord> testInsertTwoBatches(boolean populateMetaFields) throws IOException {
// create config to not update small files.
HoodieWriteConfig config = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, 10, false, populateMetaFields,
populateMetaFields ? new Properties() : getPropertiesForKeyGen());
SparkRDDWriteClient client = getHoodieWriteClient(config);
dataGen = new HoodieTestDataGenerator();
dataGen = new HoodieTestDataGenerator(new String[] {"2015/03/16"});
String commitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records1 = dataGen.generateInserts(commitTime, 200);
List<WriteStatus> statuses1 = writeAndVerifyBatch(client, records1, commitTime);
List<WriteStatus> statuses1 = writeAndVerifyBatch(client, records1, commitTime, populateMetaFields);
Set<HoodieFileGroupId> fileIds1 = getFileGroupIdsFromWriteStatus(statuses1);
commitTime = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records2 = dataGen.generateInserts(commitTime, 200);
List<WriteStatus> statuses2 = writeAndVerifyBatch(client, records2, commitTime);
List<WriteStatus> statuses2 = writeAndVerifyBatch(client, records2, commitTime, populateMetaFields);
Set<HoodieFileGroupId> fileIds2 = getFileGroupIdsFromWriteStatus(statuses2);
//verify new files are created for 2nd write
Set<HoodieFileGroupId> fileIdIntersection = new HashSet<>(fileIds1);
fileIdIntersection.retainAll(fileIds2);
assertEquals(0, fileIdIntersection.size());
return Stream.concat(records1.stream(), records2.stream()).collect(Collectors.toList());
}
private String testClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering,
String validatorClasses, String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation,
List<HoodieRecord> allRecords) throws IOException {
config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(completeClustering)
HoodieWriteConfig config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false)
.withClusteringConfig(clusteringConfig)
.withProps(populateMetaFields ? new Properties() : getPropertiesForKeyGen()).build();
.withProps(getPropertiesForKeyGen()).build();
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusterMetadata =
performClustering(clusteringConfig, populateMetaFields, completeClustering, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, allRecords);
if (completeClustering) {
String clusteringCommitTime = metaClient.reloadActiveTimeline().getCompletedReplaceTimeline()
.getReverseOrderedInstants().findFirst().get().getTimestamp();
verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords, clusterMetadata.getWriteStatuses().collect(), config);
return clusteringCommitTime;
} else {
return "";
}
}
private HoodieWriteMetadata<JavaRDD<WriteStatus>> performClustering(HoodieClusteringConfig clusteringConfig,
boolean populateMetaFields,
boolean completeClustering,
String validatorClasses,
String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation,
List<HoodieRecord> allRecords) throws IOException {
HoodiePreCommitValidatorConfig validatorConfig = HoodiePreCommitValidatorConfig.newBuilder()
.withPreCommitValidator(StringUtils.nullToEmpty(validatorClasses))
.withPrecommitValidatorEqualitySqlQueries(sqlQueryForEqualityValidation)
.withPrecommitValidatorSingleResultSqlQueries(sqlQueryForSingleResultValidation)
.build();
HoodieWriteConfig config = getConfigBuilder().withAutoCommit(false)
.withPreCommitValidatorConfig(validatorConfig)
.withProps(populateMetaFields ? new Properties() : getPropertiesForKeyGen())
.withClusteringConfig(clusteringConfig).build();
// create client with new config.
client = getHoodieWriteClient(config);
SparkRDDWriteClient client = getHoodieWriteClient(config);
String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString();
HoodieWriteMetadata<JavaRDD<WriteStatus>> clusterMetadata = client.cluster(clusteringCommitTime, completeClustering);
List<HoodieRecord> allRecords = Stream.concat(records1.stream(), records2.stream()).collect(Collectors.toList());
verifyRecordsWritten(clusteringCommitTime, allRecords, clusterMetadata.getWriteStatuses().collect(), config);
Set<HoodieFileGroupId> insertedFileIds = new HashSet<>();
insertedFileIds.addAll(fileIds1);
insertedFileIds.addAll(fileIds2);
verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords, clusterMetadata.getWriteStatuses().collect(), config);
Set<HoodieFileGroupId> replacedFileIds = new HashSet<>();
clusterMetadata.getPartitionToReplaceFileIds().entrySet().forEach(partitionFiles ->
partitionFiles.getValue().stream().forEach(file ->
replacedFileIds.add(new HoodieFileGroupId(partitionFiles.getKey(), file))));
assertEquals(insertedFileIds, replacedFileIds);
return allRecords;
return clusterMetadata;
}
private Set<HoodieFileGroupId> getFileGroupIdsFromWriteStatus(List<WriteStatus> statuses) {
@@ -1317,7 +1519,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
// Do Inserts
String commit1 = "001";
List<WriteStatus> statuses = writeAndVerifyBatch(client, dataGen.generateInserts(commit1, batch1RecordsCount), commit1);
List<WriteStatus> statuses = writeAndVerifyBatch(client, dataGen.generateInserts(commit1, batch1RecordsCount), commit1, populateMetaFields);
Set<String> batch1Buckets = getFileIdsFromWriteStatus(statuses);
// Do Insert Overwrite
@@ -1332,7 +1534,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
assertNoWriteErrors(statuses);
assertEquals(batch1Buckets, new HashSet<>(writeResult.getPartitionToReplaceFileIds().get(testPartitionPath)));
verifyRecordsWritten(commitTime2, inserts2, statuses, config);
verifyRecordsWritten(commitTime2, populateMetaFields, inserts2, statuses, config);
}
private Set<String> getFileIdsFromWriteStatus(List<WriteStatus> statuses) {
@@ -1373,7 +1575,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime1).collect();
assertNoWriteErrors(statuses);
Set<String> batchBuckets = statuses.stream().map(s -> s.getFileId()).collect(Collectors.toSet());
verifyRecordsWritten(commitTime1, inserts1, statuses, client.config);
verifyRecordsWritten(commitTime1, true, inserts1, statuses, client.config);
return batchBuckets;
}
@@ -1451,8 +1653,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
/**
* Verify data in base files matches expected records and commit time.
*/
private void verifyRecordsWritten(String commitTime, List<HoodieRecord> expectedRecords, List<WriteStatus> allStatus,
HoodieWriteConfig config) throws IOException {
private void verifyRecordsWritten(String commitTime, boolean populateMetadataField,
List<HoodieRecord> expectedRecords, List<WriteStatus> allStatus, HoodieWriteConfig config) throws IOException {
List<GenericRecord> records = new ArrayList<>();
for (WriteStatus status : allStatus) {
Path filePath = new Path(basePath, status.getStat().getPath());
@@ -1472,18 +1674,20 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps()));
for (GenericRecord record : records) {
String recordKey = keyGenerator.getKey(record).getRecordKey();
assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
if (!populateMetadataField) {
assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD));
}
assertTrue(expectedKeys.contains(recordKey));
}
}
}
private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client, List<HoodieRecord> inserts, String commitTime) throws IOException {
private List<WriteStatus> writeAndVerifyBatch(SparkRDDWriteClient client, List<HoodieRecord> inserts, String commitTime, boolean populateMetaFields) throws IOException {
client.startCommitWithTime(commitTime);
JavaRDD<HoodieRecord> insertRecordsRDD1 = jsc.parallelize(inserts, 2);
List<WriteStatus> statuses = client.upsert(insertRecordsRDD1, commitTime).collect();
assertNoWriteErrors(statuses);
verifyRecordsWritten(commitTime, inserts, statuses, client.config);
verifyRecordsWritten(commitTime, populateMetaFields, inserts, statuses, client.config);
return statuses;
}
@@ -2130,4 +2334,16 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
.withProperties(populateMetaFields ? new Properties() : getPropertiesForKeyGen()).build();
}
public static class FailingPreCommitValidator<T extends HoodieRecordPayload, I, K, O extends JavaRDD<WriteStatus>> extends SparkPreCommitValidator<T, I, K, O> {
public FailingPreCommitValidator(HoodieSparkTable table, HoodieEngineContext context, HoodieWriteConfig config) {
super(table, context, config);
}
@Override
protected void validateRecordsBeforeAndAfter(final Dataset<Row> before, final Dataset<Row> after, final Set<String> partitionsAffected) {
throw new HoodieValidationException("simulate failure");
}
}
}

View File

@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.table.view;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Create PreCommitFileSystemView by only filtering instants that are of interest.
* For example, we want to exclude
* other inflight instants. This is achieved by combining
* 1) FileSystemView with completed commits
* 2) Using list of files written/replaced by inflight instant that we are validating
*
*/
public class HoodieTablePreCommitFileSystemView {
private Map<String, List<String>> partitionToReplaceFileIds;
private List<HoodieWriteStat> filesWritten;
private String preCommitInstantTime;
private SyncableFileSystemView completedCommitsFileSystemView;
private HoodieTableMetaClient tableMetaClient;
/**
* Create a file system view for the inflight commit that we are validating.
*/
public HoodieTablePreCommitFileSystemView(HoodieTableMetaClient metaClient,
SyncableFileSystemView completedCommitsFileSystemView,
List<HoodieWriteStat> filesWritten,
Map<String, List<String>> partitionToReplaceFileIds,
String instantTime) {
this.completedCommitsFileSystemView = completedCommitsFileSystemView;
this.filesWritten = filesWritten;
this.partitionToReplaceFileIds = partitionToReplaceFileIds;
this.preCommitInstantTime = instantTime;
this.tableMetaClient = metaClient;
}
/**
* Combine committed base files + new files created/replaced for given partition.
*/
public final Stream<HoodieBaseFile> getLatestBaseFiles(String partitionStr) {
// get fileIds replaced by current inflight commit
List<String> replacedFileIdsForPartition = partitionToReplaceFileIds.getOrDefault(partitionStr, Collections.emptyList());
// get new files written by current inflight commit
Map<String, HoodieBaseFile> newFilesWrittenForPartition = filesWritten.stream()
.filter(file -> partitionStr.equals(file.getPartitionPath()))
.collect(Collectors.toMap(HoodieWriteStat::getFileId, writeStat ->
new HoodieBaseFile(new Path(tableMetaClient.getBasePath(), writeStat.getPath()).toString())));
Stream<HoodieBaseFile> committedBaseFiles = this.completedCommitsFileSystemView.getLatestBaseFiles(partitionStr);
Stream<HoodieBaseFile> baseFilesForCommittedFileIds = committedBaseFiles
// Remove files replaced by current inflight commit
.filter(baseFile -> !replacedFileIdsForPartition.contains(baseFile.getFileId()))
// if there is new version of file created by inflight commit, use that file instead of committed version
.map(baseFile -> {
HoodieBaseFile fileIdNewVersionExists = newFilesWrittenForPartition.remove(baseFile.getFileId());
return Option.ofNullable(fileIdNewVersionExists).orElse(baseFile);
});
Stream<HoodieBaseFile> baseFilesWithNewFileIds = newFilesWrittenForPartition.values().stream();
return Stream.concat(baseFilesForCommittedFileIds, baseFilesWithNewFileIds);
}
}

View File

@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.exception;
/**
* <p>
* Exception thrown for validation failures.
* </p>
*/
public class HoodieValidationException extends HoodieException {
public HoodieValidationException(String msg, Throwable t) {
super(msg, t);
}
public HoodieValidationException(String msg) {
super(msg);
}
}