From 826a04d1425f47fdd80c293569a359021d1b6586 Mon Sep 17 00:00:00 2001 From: satishkotha Date: Tue, 3 Aug 2021 12:07:45 -0700 Subject: [PATCH] [HUDI-2072] Add pre-commit validator framework (#3153) * [HUDI-2072] Add pre-commit validator framework * trigger Travis rebuild --- .../HoodiePreCommitValidatorConfig.java | 122 ++++++++ .../apache/hudi/config/HoodieWriteConfig.java | 26 +- .../commit/BaseCommitActionExecutor.java | 14 + .../client/utils/SparkValidatorUtils.java | 167 +++++++++++ .../validator/SparkPreCommitValidator.java | 99 +++++++ .../SqlQueryEqualityPreCommitValidator.java | 76 +++++ .../SqlQueryInequalityPreCommitValidator.java | 74 +++++ .../validator/SqlQueryPreCommitValidator.java | 92 ++++++ ...qlQuerySingleResultPreCommitValidator.java | 78 +++++ .../SparkBootstrapCommitActionExecutor.java | 6 + .../commit/BaseSparkCommitActionExecutor.java | 5 + .../TestHoodieClientOnCopyOnWriteStorage.java | 278 ++++++++++++++++-- .../HoodieTablePreCommitFileSystemView.java | 90 ++++++ .../exception/HoodieValidationException.java | 35 +++ 14 files changed, 1130 insertions(+), 32 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryEqualityPreCommitValidator.java create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryInequalityPreCommitValidator.java create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryPreCommitValidator.java create mode 100644 hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQuerySingleResultPreCommitValidator.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/exception/HoodieValidationException.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java new file mode 100644 index 000000000..1373c1368 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieConfig; + +import javax.annotation.concurrent.Immutable; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; + +/** + * Storage related config. + */ +@Immutable +public class HoodiePreCommitValidatorConfig extends HoodieConfig { + + public static final ConfigProperty PRE_COMMIT_VALIDATORS = ConfigProperty + .key("hoodie.precommit.validators") + .defaultValue("") + .withDocumentation("Comma separated list of class names that can be invoked to validate commit"); + public static final String VALIDATOR_TABLE_VARIABLE = ""; + + public static final ConfigProperty PRE_COMMIT_VALIDATORS_EQUALITY_SQL_QUERIES = ConfigProperty + .key("hoodie.precommit.validators.equality.sql.queries") + .defaultValue("") + .withDocumentation("Spark SQL queries to run on table before committing new data to validate state before and after commit." + + " Multiple queries separated by ';' delimiter are supported." + + " Example: \"select count(*) from \\" + + " Note \\ is replaced by table state before and after commit."); + + public static final ConfigProperty PRE_COMMIT_VALIDATORS_SINGLE_VALUE_SQL_QUERIES = ConfigProperty + .key("hoodie.precommit.validators.single.value.sql.queries") + .defaultValue("") + .withDocumentation("Spark SQL queries to run on table before committing new data to validate state after commit." + + "Multiple queries separated by ';' delimiter are supported." + + "Expected result is included as part of query separated by '#'. Example query: 'query1#result1:query2#result2'" + + "Note \\ variable is expected to be present in query."); + + /** + * Spark SQL queries to run on table before committing new data to validate state before and after commit. + * Multiple queries separated by ';' delimiter are supported. + * Example query: 'select count(*) from \ where col=null' + * Note \ variable is expected to be present in query. + */ + public static final ConfigProperty PRE_COMMIT_VALIDATORS_INEQUALITY_SQL_QUERIES = ConfigProperty + .key("hoodie.precommit.validators.inequality.sql.queries") + .defaultValue("") + .withDocumentation("Spark SQL queries to run on table before committing new data to validate state before and after commit." + + "Multiple queries separated by ';' delimiter are supported." + + "Example query: 'select count(*) from \\ where col=null'" + + "Note \\ variable is expected to be present in query."); + + private HoodiePreCommitValidatorConfig() { + super(); + } + + public static HoodiePreCommitValidatorConfig.Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + + private final HoodiePreCommitValidatorConfig preCommitValidatorConfig = new HoodiePreCommitValidatorConfig(); + + public Builder fromFile(File propertiesFile) throws IOException { + try (FileReader reader = new FileReader(propertiesFile)) { + this.preCommitValidatorConfig.getProps().load(reader); + return this; + } + } + + public Builder fromProperties(Properties props) { + this.preCommitValidatorConfig.getProps().putAll(props); + return this; + } + + public Builder withPreCommitValidator(String preCommitValidators) { + preCommitValidatorConfig.setValue(PRE_COMMIT_VALIDATORS, preCommitValidators); + return this; + } + + public Builder withPrecommitValidatorEqualitySqlQueries(String preCommitValidators) { + preCommitValidatorConfig.setValue(PRE_COMMIT_VALIDATORS_EQUALITY_SQL_QUERIES, preCommitValidators); + return this; + } + + public Builder withPrecommitValidatorSingleResultSqlQueries(String preCommitValidators) { + preCommitValidatorConfig.setValue(PRE_COMMIT_VALIDATORS_SINGLE_VALUE_SQL_QUERIES, preCommitValidators); + return this; + } + + public Builder withPrecommitValidatorInequalitySqlQueries(String preCommitValidators) { + preCommitValidatorConfig.setValue(PRE_COMMIT_VALIDATORS_INEQUALITY_SQL_QUERIES, preCommitValidators); + return this; + } + + public HoodiePreCommitValidatorConfig build() { + preCommitValidatorConfig.setDefaults(HoodiePreCommitValidatorConfig.class.getName()); + return preCommitValidatorConfig; + } + } + +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 6bb57da87..6fdc6dd88 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1280,6 +1280,22 @@ public class HoodieWriteConfig extends HoodieConfig { public String getWriteMetaKeyPrefixes() { return getString(WRITE_META_KEY_PREFIXES_PROP); } + + public String getPreCommitValidators() { + return getString(HoodiePreCommitValidatorConfig.PRE_COMMIT_VALIDATORS); + } + + public String getPreCommitValidatorEqualitySqlQueries() { + return getString(HoodiePreCommitValidatorConfig.PRE_COMMIT_VALIDATORS_EQUALITY_SQL_QUERIES); + } + + public String getPreCommitValidatorSingleResultSqlQueries() { + return getString(HoodiePreCommitValidatorConfig.PRE_COMMIT_VALIDATORS_SINGLE_VALUE_SQL_QUERIES); + } + + public String getPreCommitValidatorInequalitySqlQueries() { + return getString(HoodiePreCommitValidatorConfig.PRE_COMMIT_VALIDATORS_INEQUALITY_SQL_QUERIES); + } public boolean allowEmptyCommit() { return getBooleanOrDefault(ALLOW_EMPTY_COMMIT); @@ -1302,6 +1318,7 @@ public class HoodieWriteConfig extends HoodieConfig { private boolean isPayloadConfigSet = false; private boolean isMetadataConfigSet = false; private boolean isLockConfigSet = false; + private boolean isPreCommitValidationConfigSet = false; public Builder withEngineType(EngineType engineType) { this.engineType = engineType; @@ -1451,6 +1468,12 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } + public Builder withPreCommitValidatorConfig(HoodiePreCommitValidatorConfig validatorConfig) { + writeConfig.getProps().putAll(validatorConfig.getProps()); + isPreCommitValidationConfigSet = true; + return this; + } + public Builder withMetricsConfig(HoodieMetricsConfig metricsConfig) { writeConfig.getProps().putAll(metricsConfig.getProps()); isMetricsConfigSet = true; @@ -1620,7 +1643,8 @@ public class HoodieWriteConfig extends HoodieConfig { HoodieMetadataConfig.newBuilder().fromProperties(writeConfig.getProps()).build()); writeConfig.setDefaultOnCondition(!isLockConfigSet, HoodieLockConfig.newBuilder().fromProperties(writeConfig.getProps()).build()); - + writeConfig.setDefaultOnCondition(!isPreCommitValidationConfigSet, + HoodiePreCommitValidatorConfig.newBuilder().fromProperties(writeConfig.getProps()).build()); writeConfig.setDefaultValue(TIMELINE_LAYOUT_VERSION, String.valueOf(TimelineLayoutVersion.CURR_VERSION)); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index c124922a6..4b519ed92 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant.State; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; @@ -123,7 +124,20 @@ public abstract class BaseCommitActionExecutor writeMetadata) { + if (StringUtils.isNullOrEmpty(config.getPreCommitValidators())) { + return; + } + throw new HoodieIOException("Precommit validation not implemented for all engines yet"); + } + protected void commitOnAutoCommit(HoodieWriteMetadata result) { + // validate commit action before committing result + runPrecommitValidators(result); if (config.shouldAutoCommit()) { LOG.info("Auto commit enabled: Committing " + instantTime); autoCommit(extraMetadata, result); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java new file mode 100644 index 000000000..ab65e504e --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.utils; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.client.validator.SparkPreCommitValidator; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.table.view.HoodieTablePreCommitFileSystemView; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieValidationException; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; +import scala.collection.JavaConverters; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Spark validator utils to verify and run any precommit validators configured. + */ +public class SparkValidatorUtils { + private static final Logger LOG = LogManager.getLogger(BaseSparkCommitActionExecutor.class); + + /** + * Check configured pre-commit validators and run them. Note that this only works for COW tables + * + * Throw error if there are validation failures. + */ + public static void runValidators(HoodieWriteConfig config, + HoodieWriteMetadata> writeMetadata, + HoodieEngineContext context, + HoodieTable table, + String instantTime) { + if (StringUtils.isNullOrEmpty(config.getPreCommitValidators())) { + LOG.info("no validators configured."); + } else { + if (!writeMetadata.getWriteStats().isPresent()) { + writeMetadata.setWriteStats(writeMetadata.getWriteStatuses().map(WriteStatus::getStat).collect()); + } + Set partitionsModified = new HashSet<>(writeMetadata.getWriteStats().get().stream().map(writeStats -> + writeStats.getPartitionPath()).collect(Collectors.toList())); + SQLContext sqlContext = new SQLContext(HoodieSparkEngineContext.getSparkContext(context)); + // Refresh timeline to ensure validator sees the any other operations done on timeline (async operations such as other clustering/compaction/rollback) + table.getMetaClient().reloadActiveTimeline(); + Dataset beforeState = getRecordsFromCommittedFiles(sqlContext, partitionsModified, table).cache(); + Dataset afterState = getRecordsFromPendingCommits(sqlContext, partitionsModified, writeMetadata, table, instantTime).cache(); + + Stream validators = Arrays.stream(config.getPreCommitValidators().split(",")) + .map(validatorClass -> { + return ((SparkPreCommitValidator) ReflectionUtils.loadClass(validatorClass, + new Class[] {HoodieSparkTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, + table, context, config)); + }); + + boolean allSuccess = validators.map(v -> runValidatorAsync(v, writeMetadata, beforeState, afterState, instantTime)).map(CompletableFuture::join) + .reduce(true, Boolean::logicalAnd); + + if (allSuccess) { + LOG.info("All validations succeeded"); + } else { + LOG.error("At least one pre-commit validation failed"); + throw new HoodieValidationException("At least one pre-commit validation failed"); + } + } + } + + /** + * Run validators in a separate threadpool for parallelism. Each of validator can submit a distributed spark job if needed. + */ + private static CompletableFuture runValidatorAsync(SparkPreCommitValidator validator, HoodieWriteMetadata writeMetadata, + Dataset beforeState, Dataset afterState, String instantTime) { + return CompletableFuture.supplyAsync(() -> { + try { + validator.validate(instantTime, writeMetadata, beforeState, afterState); + LOG.info("validation complete for " + validator.getClass().getName()); + return true; + } catch (HoodieValidationException e) { + LOG.error("validation failed for " + validator.getClass().getName()); + return false; + } + }); + } + + /** + * Get records from partitions modified as a dataset. + * Note that this only works for COW tables. + */ + public static Dataset getRecordsFromCommittedFiles(SQLContext sqlContext, + Set partitionsAffected, HoodieTable table) { + + List committedFiles = partitionsAffected.stream() + .flatMap(partition -> table.getBaseFileOnlyView().getLatestBaseFiles(partition).map(bf -> bf.getPath())) + .collect(Collectors.toList()); + + if (committedFiles.isEmpty()) { + return sqlContext.emptyDataFrame(); + } + return readRecordsForBaseFiles(sqlContext, committedFiles); + } + + /** + * Get records from specified list of data files. + */ + public static Dataset readRecordsForBaseFiles(SQLContext sqlContext, List baseFilePaths) { + return sqlContext.read().parquet(JavaConverters.asScalaBufferConverter(baseFilePaths).asScala()); + } + + /** + * Get reads from paritions modified including any inflight commits. + * Note that this only works for COW tables + */ + public static Dataset getRecordsFromPendingCommits(SQLContext sqlContext, + Set partitionsAffected, + HoodieWriteMetadata> writeMetadata, + HoodieTable table, + String instantTime) { + + // build file system view with pending commits + HoodieTablePreCommitFileSystemView fsView = new HoodieTablePreCommitFileSystemView(table.getMetaClient(), + table.getHoodieView(), + writeMetadata.getWriteStats().get(), + writeMetadata.getPartitionToReplaceFileIds(), + instantTime); + + List newFiles = partitionsAffected.stream() + .flatMap(partition -> fsView.getLatestBaseFiles(partition).map(bf -> bf.getPath())) + .collect(Collectors.toList()); + + if (newFiles.isEmpty()) { + return sqlContext.emptyDataFrame(); + } + + return readRecordsForBaseFiles(sqlContext, newFiles); + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java new file mode 100644 index 000000000..f12d337bb --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.validator; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieValidationException; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Validator can be configured pre-commit. + */ +public abstract class SparkPreCommitValidator> { + private static final Logger LOG = LogManager.getLogger(SparkPreCommitValidator.class); + + private HoodieSparkTable table; + private HoodieEngineContext engineContext; + private HoodieWriteConfig writeConfig; + + protected SparkPreCommitValidator(HoodieSparkTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) { + this.table = table; + this.engineContext = engineContext; + this.writeConfig = writeConfig; + } + + protected Set getPartitionsModified(HoodieWriteMetadata writeResult) { + Set partitionsModified; + if (writeResult.getWriteStats().isPresent()) { + partitionsModified = writeResult.getWriteStats().get().stream().map(HoodieWriteStat::getPartitionPath).collect(Collectors.toSet()); + } else { + partitionsModified = new HashSet<>(writeResult.getWriteStatuses().map(WriteStatus::getPartitionPath).collect()); + } + return partitionsModified; + } + + /** + * Verify the data written as part of specified instant. + * Throw HoodieValidationException if any unexpected data is written (Example: data files are not readable for some reason). + */ + public void validate(String instantTime, HoodieWriteMetadata writeResult, Dataset before, Dataset after) throws HoodieValidationException { + HoodieTimer timer = new HoodieTimer().startTimer(); + try { + validateRecordsBeforeAndAfter(before, after, getPartitionsModified(writeResult)); + } finally { + LOG.info(getClass() + " validator took " + timer.endTimer() + " ms"); + } + } + + /** + * Takes input of RDD 1) before clustering and 2) after clustering. Perform required validation + * and throw error if validation fails + */ + protected abstract void validateRecordsBeforeAndAfter(Dataset before, + Dataset after, + Set partitionsAffected); + + public HoodieTable getHoodieTable() { + return this.table; + } + + public HoodieEngineContext getEngineContext() { + return this.engineContext; + } + + public HoodieWriteConfig getWriteConfig() { + return this.writeConfig; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryEqualityPreCommitValidator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryEqualityPreCommitValidator.java new file mode 100644 index 000000000..9432a3cb5 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryEqualityPreCommitValidator.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.validator; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodiePreCommitValidatorConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieValidationException; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; + +/** + * Validator to run sql query and compare table state + * 1) before new commit started. + * 2) current inflight commit (if successful). + * + * Expects both queries to return same result. + */ +public class SqlQueryEqualityPreCommitValidator> extends SqlQueryPreCommitValidator { + + private static final Logger LOG = LogManager.getLogger(SqlQueryEqualityPreCommitValidator.class); + + public SqlQueryEqualityPreCommitValidator(HoodieSparkTable table, HoodieEngineContext engineContext, HoodieWriteConfig config) { + super(table, engineContext, config); + } + + @Override + protected String getQueryConfigName() { + return HoodiePreCommitValidatorConfig.PRE_COMMIT_VALIDATORS_EQUALITY_SQL_QUERIES.key(); + } + + @Override + protected void validateUsingQuery(String query, String prevTableSnapshot, String newTableSnapshot, SQLContext sqlContext) { + String queryWithPrevSnapshot = query.replaceAll(HoodiePreCommitValidatorConfig.VALIDATOR_TABLE_VARIABLE, prevTableSnapshot); + String queryWithNewSnapshot = query.replaceAll(HoodiePreCommitValidatorConfig.VALIDATOR_TABLE_VARIABLE, newTableSnapshot); + LOG.info("Running query on previous state: " + queryWithPrevSnapshot); + Dataset prevRows = sqlContext.sql(queryWithPrevSnapshot); + LOG.info("Running query on new state: " + queryWithNewSnapshot); + Dataset newRows = sqlContext.sql(queryWithNewSnapshot); + printAllRowsIfDebugEnabled(prevRows); + printAllRowsIfDebugEnabled(newRows); + boolean areDatasetsEqual = prevRows.intersect(newRows).count() == prevRows.count(); + LOG.info("Completed Equality Validation, datasets equal? " + areDatasetsEqual); + if (!areDatasetsEqual) { + LOG.error("query validation failed. See stdout for sample query results. Query: " + query); + System.out.println("Expected result (sample records only):"); + prevRows.show(); + System.out.println("Actual result (sample records only):"); + newRows.show(); + throw new HoodieValidationException("Query validation failed for '" + query + "'. See stdout for expected vs actual records"); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryInequalityPreCommitValidator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryInequalityPreCommitValidator.java new file mode 100644 index 000000000..0ff2757cc --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryInequalityPreCommitValidator.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.validator; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodiePreCommitValidatorConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieValidationException; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; + +/** + * Validator to run sql query and compare table state + * 1) before new commit started. + * 2) current inflight commit (if successful). + * + * Expects query results dont match. + */ +public class SqlQueryInequalityPreCommitValidator> extends SqlQueryPreCommitValidator { + private static final Logger LOG = LogManager.getLogger(SqlQueryInequalityPreCommitValidator.class); + + public SqlQueryInequalityPreCommitValidator(HoodieSparkTable table, HoodieEngineContext engineContext, HoodieWriteConfig config) { + super(table, engineContext, config); + } + + @Override + protected String getQueryConfigName() { + return HoodiePreCommitValidatorConfig.PRE_COMMIT_VALIDATORS_INEQUALITY_SQL_QUERIES.key(); + } + + @Override + protected void validateUsingQuery(String query, String prevTableSnapshot, String newTableSnapshot, SQLContext sqlContext) { + String queryWithPrevSnapshot = query.replaceAll(HoodiePreCommitValidatorConfig.VALIDATOR_TABLE_VARIABLE, prevTableSnapshot); + String queryWithNewSnapshot = query.replaceAll(HoodiePreCommitValidatorConfig.VALIDATOR_TABLE_VARIABLE, newTableSnapshot); + LOG.info("Running query on previous state: " + queryWithPrevSnapshot); + Dataset prevRows = sqlContext.sql(queryWithPrevSnapshot); + LOG.info("Running query on new state: " + queryWithNewSnapshot); + Dataset newRows = sqlContext.sql(queryWithNewSnapshot); + printAllRowsIfDebugEnabled(prevRows); + printAllRowsIfDebugEnabled(newRows); + boolean areDatasetsEqual = prevRows.intersect(newRows).count() == prevRows.count(); + LOG.info("Completed Inequality Validation, datasets equal? " + areDatasetsEqual); + if (areDatasetsEqual) { + LOG.error("query validation failed. See stdout for sample query results. Query: " + query); + System.out.println("Expected query results to be inequal, but they are same. Result (sample records only):"); + prevRows.show(); + throw new HoodieValidationException("Query validation failed for '" + query + + "'. Expected " + prevRows.count() + " rows, Found " + newRows.count()); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryPreCommitValidator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryPreCommitValidator.java new file mode 100644 index 000000000..122cf2be5 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQueryPreCommitValidator.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.validator; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieValidationException; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; + +import java.util.Arrays; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Validator framework to run sql queries and compare table state at different locations. + */ +public abstract class SqlQueryPreCommitValidator> extends SparkPreCommitValidator { + private static final Logger LOG = LogManager.getLogger(SqlQueryPreCommitValidator.class); + private static final AtomicInteger TABLE_COUNTER = new AtomicInteger(0); + + public SqlQueryPreCommitValidator(HoodieSparkTable table, HoodieEngineContext engineContext, HoodieWriteConfig config) { + super(table, engineContext, config); + } + + /** + * Takes input datasets 1) before commit started and 2) with inflight commit. Perform required validation + * and throw error if validation fails + */ + @Override + public void validateRecordsBeforeAndAfter(Dataset before, Dataset after, final Set partitionsAffected) { + String hoodieTableName = "staged_table_" + TABLE_COUNTER.incrementAndGet(); + String hoodieTableBeforeCurrentCommit = hoodieTableName + "_before"; + String hoodieTableWithInflightCommit = hoodieTableName + "_after"; + before.registerTempTable(hoodieTableBeforeCurrentCommit); + after.registerTempTable(hoodieTableWithInflightCommit); + JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext()); + SQLContext sqlContext = new SQLContext(jsc); + + String[] queries = getQueriesToRun(); + //TODO run this in a thread pool to improve parallelism + Arrays.stream(queries).forEach(query -> + validateUsingQuery(query, hoodieTableBeforeCurrentCommit, hoodieTableWithInflightCommit, sqlContext)); + } + + protected String[] getQueriesToRun() { + String sqlQueriesConfigured = getWriteConfig().getProps().getProperty(getQueryConfigName()); + if (StringUtils.isNullOrEmpty(sqlQueriesConfigured)) { + throw new HoodieValidationException("Sql validator configured incorrectly. expecting at least one query. Found 0 queries in " + + sqlQueriesConfigured); + } + return sqlQueriesConfigured.trim().split(";"); + } + + protected void printAllRowsIfDebugEnabled(Dataset dataset) { + if (LOG.isDebugEnabled()) { + dataset = dataset.cache(); + LOG.debug("Printing all rows from query validation:"); + dataset.show(Integer.MAX_VALUE,false); + } + } + + protected abstract String getQueryConfigName(); + + protected abstract void validateUsingQuery(String query, String prevTableSnapshot, String newTableSnapshot, SQLContext sqlContext); +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQuerySingleResultPreCommitValidator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQuerySingleResultPreCommitValidator.java new file mode 100644 index 000000000..c41c3ac3a --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SqlQuerySingleResultPreCommitValidator.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.validator; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.config.HoodiePreCommitValidatorConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieValidationException; +import org.apache.hudi.table.HoodieSparkTable; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SQLContext; + +import java.util.List; + +/** + * Validator to run sql queries on new table state and expects a single result. If the result doesnt match expected result, + * throw validation error. + * + * Example configuration: "query1#expectedResult1;query2#expectedResult2;" + */ +public class SqlQuerySingleResultPreCommitValidator> extends SqlQueryPreCommitValidator { + private static final Logger LOG = LogManager.getLogger(SqlQueryInequalityPreCommitValidator.class); + + public SqlQuerySingleResultPreCommitValidator(HoodieSparkTable table, HoodieEngineContext engineContext, HoodieWriteConfig config) { + super(table, engineContext, config); + } + + @Override + protected String getQueryConfigName() { + return HoodiePreCommitValidatorConfig.PRE_COMMIT_VALIDATORS_SINGLE_VALUE_SQL_QUERIES.key(); + } + + @Override + protected void validateUsingQuery(String query, String prevTableSnapshot, String newTableSnapshot, SQLContext sqlContext) { + String[] queryWithExpectedResult = query.split("#"); + if (queryWithExpectedResult.length != 2) { + throw new HoodieValidationException("Invalid query format " + query); + } + + String queryToRun = queryWithExpectedResult[0]; + String expectedResult = queryWithExpectedResult[1]; + LOG.info("Running query on new state: " + queryToRun); + String queryWithNewSnapshot = queryToRun.replaceAll(HoodiePreCommitValidatorConfig.VALIDATOR_TABLE_VARIABLE, newTableSnapshot); + List newRows = sqlContext.sql(queryWithNewSnapshot).collectAsList(); + if (newRows.size() != 1 && newRows.get(0).size() != 1) { + throw new HoodieValidationException("Invalid query result. expect single value for '" + query + "'"); + } + Object result = newRows.get(0).apply(0); + if (result == null || !expectedResult.equals(result.toString())) { + LOG.error("Mismatch query result. Expected: " + expectedResult + " got " + result + "Query: " + query); + throw new HoodieValidationException("Query validation failed for '" + query + + "'. Expected " + expectedResult + " rows, Found " + result); + } else { + LOG.info("Query validation successful. Expected: " + expectedResult + " got " + result); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java index f6cc8a0c3..821b3071e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java @@ -31,6 +31,7 @@ import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector; import org.apache.hudi.client.bootstrap.translator.BootstrapPartitionPathTranslator; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.utils.SparkMemoryUtils; +import org.apache.hudi.client.utils.SparkValidatorUtils; import org.apache.hudi.common.bootstrap.FileStatusUtils; import org.apache.hudi.common.bootstrap.index.BootstrapIndex; import org.apache.hudi.common.config.TypedProperties; @@ -422,4 +423,9 @@ public class SparkBootstrapCommitActionExecutor protected Iterator> handleUpdate(String partitionPath, String fileId, Iterator> recordItr) { throw new UnsupportedOperationException("Should not called in bootstrap code path"); } + + @Override + protected void runPrecommitValidators(HoodieWriteMetadata> writeMetadata) { + SparkValidatorUtils.runValidators(config, writeMetadata, context, table, instantTime); + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index 50f3d68a8..a9ddab398 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -21,6 +21,7 @@ package org.apache.hudi.table.action.commit; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.utils.SparkMemoryUtils; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.client.utils.SparkValidatorUtils; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; @@ -387,4 +388,8 @@ public abstract class BaseSparkCommitActionExecutor> writeMetadata) { + SparkValidatorUtils.runValidators(config, writeMetadata, context, table, instantTime); + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index 117c327d2..d69828199 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -25,6 +25,10 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.client.validator.SparkPreCommitValidator; +import org.apache.hudi.client.validator.SqlQueryEqualityPreCommitValidator; +import org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; @@ -34,6 +38,7 @@ import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.model.WriteOperationType; @@ -53,17 +58,21 @@ import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; +import org.apache.hudi.config.HoodiePreCommitValidatorConfig; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieCorruptedDataException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieInsertException; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.io.HoodieMergeHandle; @@ -169,6 +178,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { private HoodieTestTable testTable; + private static final String COUNT_SQL_QUERY_FOR_VALIDATION = "select count(*) from "; + @BeforeEach public void setUpTestTable() { testTable = HoodieSparkWriteableTestTable.of(metaClient); @@ -256,6 +267,106 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { } } + @Test + public void testPreCommitValidatorsOnInsert() throws Exception { + int numRecords = 200; + HoodiePreCommitValidatorConfig validatorConfig = HoodiePreCommitValidatorConfig.newBuilder() + .withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName()) + .withPrecommitValidatorSingleResultSqlQueries(COUNT_SQL_QUERY_FOR_VALIDATION + "#" + numRecords) + .build(); + HoodieWriteConfig config = getConfigBuilder().withAutoCommit(true) + .withPreCommitValidatorConfig(validatorConfig).build(); + try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + Function3, SparkRDDWriteClient, JavaRDD, String> writeFn = (writeClient, recordRDD, instantTime) -> + writeClient.bulkInsert(recordRDD, instantTime, Option.empty()); + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + JavaRDD result = insertFirstBatch(config, client, newCommitTime, + "000", numRecords, writeFn, false, false, numRecords); + assertTrue(testTable.commitExists(newCommitTime)); + } + } + + @Test + public void testPreCommitValidationFailureOnInsert() throws Exception { + int numRecords = 200; + HoodiePreCommitValidatorConfig validatorConfig = HoodiePreCommitValidatorConfig.newBuilder() + .withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName()) + //set wrong value for expected number of rows + .withPrecommitValidatorSingleResultSqlQueries(COUNT_SQL_QUERY_FOR_VALIDATION + "#" + 500) + .build(); + HoodieWriteConfig config = getConfigBuilder().withPreCommitValidatorConfig(validatorConfig).build(); + String newCommitTime = HoodieActiveTimeline.createNewInstantTime(); + try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + Function3, SparkRDDWriteClient, JavaRDD, String> writeFn = (writeClient, recordRDD, instantTime) -> + writeClient.bulkInsert(recordRDD, instantTime, Option.empty()); + JavaRDD result = insertFirstBatch(config, client, newCommitTime, + "000", numRecords, writeFn, false, false, numRecords); + fail("Expected validation to fail because we only insert 200 rows. Validation is configured to expect 500 rows"); + } catch (HoodieInsertException e) { + if (e.getCause() instanceof HoodieValidationException) { + // expected because wrong value passed + } else { + throw e; + } + } + + assertFalse(testTable.commitExists(newCommitTime)); + } + + @Test + public void testPreCommitValidationWithMultipleInflights() throws Exception { + int numRecords = 200; + HoodiePreCommitValidatorConfig validatorConfig = HoodiePreCommitValidatorConfig.newBuilder() + .withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName()) + //set wrong value for expected number of rows + .withPrecommitValidatorSingleResultSqlQueries(COUNT_SQL_QUERY_FOR_VALIDATION + "#" + 500) + .build(); + HoodieWriteConfig config = getConfigBuilder() + .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build()) + .withPreCommitValidatorConfig(validatorConfig) + .build(); + + String instant1 = HoodieActiveTimeline.createNewInstantTime(); + try { + insertWithConfig(config, numRecords, instant1); + fail("Expected validation to fail because we only insert 200 rows. Validation is configured to expect 500 rows"); + } catch (HoodieInsertException e) { + if (e.getCause() instanceof HoodieValidationException) { + // expected because wrong value passed + } else { + throw e; + } + } + + assertFalse(testTable.commitExists(instant1)); + assertTrue(testTable.inflightCommitExists(instant1)); + + numRecords = 300; + validatorConfig = HoodiePreCommitValidatorConfig.newBuilder() + .withPreCommitValidator(SqlQuerySingleResultPreCommitValidator.class.getName()) + //set wrong value for expected number of rows + .withPrecommitValidatorSingleResultSqlQueries(COUNT_SQL_QUERY_FOR_VALIDATION + "#" + numRecords) + .build(); + config = getConfigBuilder() + .withCompactionConfig(HoodieCompactionConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.NEVER).build()) + .withPreCommitValidatorConfig(validatorConfig) + .build(); + String instant2 = HoodieActiveTimeline.createNewInstantTime(); + // expect pre-commit validators to succeed. Note that validator is expected to exclude inflight instant1 + insertWithConfig(config, numRecords, instant2); + assertTrue(testTable.inflightCommitExists(instant1)); + assertTrue(testTable.commitExists(instant2)); + } + + private void insertWithConfig(HoodieWriteConfig config, int numRecords, String instant) throws Exception { + try (SparkRDDWriteClient client = getHoodieWriteClient(config)) { + Function3, SparkRDDWriteClient, JavaRDD, String> writeFn = (writeClient, recordRDD, instantTime) -> + writeClient.bulkInsert(recordRDD, instantTime, Option.empty()); + JavaRDD result = insertFirstBatch(config, client, instant, + "000", numRecords, writeFn, false, false, numRecords); + } + } + /** * Test De-duplication behavior for HoodieWriteClient insert API. */ @@ -1176,31 +1287,31 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { @ParameterizedTest @MethodSource("populateMetaFieldsParams") public void testSimpleClustering(boolean populateMetaFields) throws Exception { - // setup clustering config + // setup clustering config. HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); - testClustering(clusteringConfig, populateMetaFields); + testInsertAndClustering(clusteringConfig, populateMetaFields, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, ""); } @ParameterizedTest @MethodSource("populateMetaFieldsParams") public void testClusteringWithSortColumns(boolean populateMetaFields) throws Exception { - // setup clustering config + // setup clustering config. HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) .withClusteringSortColumns(populateMetaFields ? "_hoodie_record_key" : "_row_key") .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); - testClustering(clusteringConfig, populateMetaFields); + testInsertAndClustering(clusteringConfig, populateMetaFields, true, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, ""); } @ParameterizedTest @MethodSource("populateMetaFieldsParams") public void testPendingClusteringRollback(boolean populateMetaFields) throws Exception { - // setup clustering config + // setup clustering config. HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); // start clustering, but don't commit - List allRecords = testClustering(clusteringConfig, populateMetaFields); + List allRecords = testInsertAndClustering(clusteringConfig, populateMetaFields, false); HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); List> pendingClusteringPlans = ClusteringUtils.getAllPendingClusteringPlans(metaClient).collect(Collectors.toList()); @@ -1215,7 +1326,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { dataGen = new HoodieTestDataGenerator(); String commitTime = HoodieActiveTimeline.createNewInstantTime(); allRecords.addAll(dataGen.generateInserts(commitTime, 200)); - writeAndVerifyBatch(client, allRecords, commitTime); + writeAndVerifyBatch(client, allRecords, commitTime, populateMetaFields); // verify pending clustering can be rolled back (even though there is a completed commit greater than pending clustering) client.rollback(pendingClusteringInstant.getTimestamp()); @@ -1224,50 +1335,141 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { assertEquals(0, ClusteringUtils.getAllPendingClusteringPlans(metaClient).count()); } - private List testClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields) throws Exception { - return testClustering(clusteringConfig, false, populateMetaFields); + @Test + public void testClusteringWithFailingValidator() throws Exception { + // setup clustering config. + HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) + .withClusteringSortColumns("_hoodie_record_key") + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); + try { + testInsertAndClustering(clusteringConfig, true, true, FailingPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, ""); + fail("expected pre-commit clustering validation to fail"); + } catch (HoodieValidationException e) { + // expected + } + } + + @Test + public void testClusteringInvalidConfigForSqlQueryValidator() throws Exception { + // setup clustering config. + HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); + try { + testInsertAndClustering(clusteringConfig, false, true, SqlQueryEqualityPreCommitValidator.class.getName(), "", ""); + fail("expected pre-commit clustering validation to fail because sql query is not configured"); + } catch (HoodieValidationException e) { + // expected + } + } + + @Test + public void testClusteringInvalidConfigForSqlQuerySingleResultValidator() throws Exception { + // setup clustering config. + HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); + + testInsertAndClustering(clusteringConfig, false, true, SqlQuerySingleResultPreCommitValidator.class.getName(), + "", COUNT_SQL_QUERY_FOR_VALIDATION + "#400"); + } + + @Test + public void testClusteringInvalidConfigForSqlQuerySingleResultValidatorFailure() throws Exception { + // setup clustering config. + HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).build(); + + try { + testInsertAndClustering(clusteringConfig, false, true, SqlQuerySingleResultPreCommitValidator.class.getName(), + "", COUNT_SQL_QUERY_FOR_VALIDATION + "#802"); + fail("expected pre-commit clustering validation to fail because of count mismatch. expect 400 rows, not 802"); + } catch (HoodieValidationException e) { + // expected + } + } + + private List testInsertAndClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering) throws Exception { + return testInsertAndClustering(clusteringConfig, populateMetaFields, completeClustering, "", "", ""); } - private List testClustering(HoodieClusteringConfig clusteringConfig, boolean completeClustering, boolean populateMetaFields) throws Exception { + private List testInsertAndClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, + boolean completeClustering, String validatorClasses, + String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation) throws Exception { + List allRecords = testInsertTwoBatches(populateMetaFields); + testClustering(clusteringConfig, populateMetaFields, completeClustering, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, allRecords); + return allRecords; + + } + + private List testInsertTwoBatches(boolean populateMetaFields) throws IOException { // create config to not update small files. HoodieWriteConfig config = getSmallInsertWriteConfig(2000, TRIP_EXAMPLE_SCHEMA, 10, false, populateMetaFields, populateMetaFields ? new Properties() : getPropertiesForKeyGen()); SparkRDDWriteClient client = getHoodieWriteClient(config); - dataGen = new HoodieTestDataGenerator(); + dataGen = new HoodieTestDataGenerator(new String[] {"2015/03/16"}); String commitTime = HoodieActiveTimeline.createNewInstantTime(); List records1 = dataGen.generateInserts(commitTime, 200); - List statuses1 = writeAndVerifyBatch(client, records1, commitTime); + List statuses1 = writeAndVerifyBatch(client, records1, commitTime, populateMetaFields); Set fileIds1 = getFileGroupIdsFromWriteStatus(statuses1); commitTime = HoodieActiveTimeline.createNewInstantTime(); List records2 = dataGen.generateInserts(commitTime, 200); - List statuses2 = writeAndVerifyBatch(client, records2, commitTime); + List statuses2 = writeAndVerifyBatch(client, records2, commitTime, populateMetaFields); Set fileIds2 = getFileGroupIdsFromWriteStatus(statuses2); //verify new files are created for 2nd write Set fileIdIntersection = new HashSet<>(fileIds1); fileIdIntersection.retainAll(fileIds2); assertEquals(0, fileIdIntersection.size()); + return Stream.concat(records1.stream(), records2.stream()).collect(Collectors.toList()); + } + + private String testClustering(HoodieClusteringConfig clusteringConfig, boolean populateMetaFields, boolean completeClustering, + String validatorClasses, String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation, + List allRecords) throws IOException { - config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(completeClustering) + HoodieWriteConfig config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false) .withClusteringConfig(clusteringConfig) - .withProps(populateMetaFields ? new Properties() : getPropertiesForKeyGen()).build(); + .withProps(getPropertiesForKeyGen()).build(); + HoodieWriteMetadata> clusterMetadata = + performClustering(clusteringConfig, populateMetaFields, completeClustering, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, allRecords); + + if (completeClustering) { + String clusteringCommitTime = metaClient.reloadActiveTimeline().getCompletedReplaceTimeline() + .getReverseOrderedInstants().findFirst().get().getTimestamp(); + verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords, clusterMetadata.getWriteStatuses().collect(), config); + return clusteringCommitTime; + } else { + return ""; + } + } + + private HoodieWriteMetadata> performClustering(HoodieClusteringConfig clusteringConfig, + boolean populateMetaFields, + boolean completeClustering, + String validatorClasses, + String sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation, + List allRecords) throws IOException { + HoodiePreCommitValidatorConfig validatorConfig = HoodiePreCommitValidatorConfig.newBuilder() + .withPreCommitValidator(StringUtils.nullToEmpty(validatorClasses)) + .withPrecommitValidatorEqualitySqlQueries(sqlQueryForEqualityValidation) + .withPrecommitValidatorSingleResultSqlQueries(sqlQueryForSingleResultValidation) + .build(); + + HoodieWriteConfig config = getConfigBuilder().withAutoCommit(false) + .withPreCommitValidatorConfig(validatorConfig) + .withProps(populateMetaFields ? new Properties() : getPropertiesForKeyGen()) + .withClusteringConfig(clusteringConfig).build(); // create client with new config. - client = getHoodieWriteClient(config); + SparkRDDWriteClient client = getHoodieWriteClient(config); String clusteringCommitTime = client.scheduleClustering(Option.empty()).get().toString(); HoodieWriteMetadata> clusterMetadata = client.cluster(clusteringCommitTime, completeClustering); - List allRecords = Stream.concat(records1.stream(), records2.stream()).collect(Collectors.toList()); - verifyRecordsWritten(clusteringCommitTime, allRecords, clusterMetadata.getWriteStatuses().collect(), config); - Set insertedFileIds = new HashSet<>(); - insertedFileIds.addAll(fileIds1); - insertedFileIds.addAll(fileIds2); + verifyRecordsWritten(clusteringCommitTime, populateMetaFields, allRecords, clusterMetadata.getWriteStatuses().collect(), config); Set replacedFileIds = new HashSet<>(); clusterMetadata.getPartitionToReplaceFileIds().entrySet().forEach(partitionFiles -> partitionFiles.getValue().stream().forEach(file -> replacedFileIds.add(new HoodieFileGroupId(partitionFiles.getKey(), file)))); - assertEquals(insertedFileIds, replacedFileIds); - return allRecords; + return clusterMetadata; } private Set getFileGroupIdsFromWriteStatus(List statuses) { @@ -1317,7 +1519,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { // Do Inserts String commit1 = "001"; - List statuses = writeAndVerifyBatch(client, dataGen.generateInserts(commit1, batch1RecordsCount), commit1); + List statuses = writeAndVerifyBatch(client, dataGen.generateInserts(commit1, batch1RecordsCount), commit1, populateMetaFields); Set batch1Buckets = getFileIdsFromWriteStatus(statuses); // Do Insert Overwrite @@ -1332,7 +1534,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { assertNoWriteErrors(statuses); assertEquals(batch1Buckets, new HashSet<>(writeResult.getPartitionToReplaceFileIds().get(testPartitionPath))); - verifyRecordsWritten(commitTime2, inserts2, statuses, config); + verifyRecordsWritten(commitTime2, populateMetaFields, inserts2, statuses, config); } private Set getFileIdsFromWriteStatus(List statuses) { @@ -1373,7 +1575,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { List statuses = client.upsert(insertRecordsRDD1, commitTime1).collect(); assertNoWriteErrors(statuses); Set batchBuckets = statuses.stream().map(s -> s.getFileId()).collect(Collectors.toSet()); - verifyRecordsWritten(commitTime1, inserts1, statuses, client.config); + verifyRecordsWritten(commitTime1, true, inserts1, statuses, client.config); return batchBuckets; } @@ -1451,8 +1653,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { /** * Verify data in base files matches expected records and commit time. */ - private void verifyRecordsWritten(String commitTime, List expectedRecords, List allStatus, - HoodieWriteConfig config) throws IOException { + private void verifyRecordsWritten(String commitTime, boolean populateMetadataField, + List expectedRecords, List allStatus, HoodieWriteConfig config) throws IOException { List records = new ArrayList<>(); for (WriteStatus status : allStatus) { Path filePath = new Path(basePath, status.getStat().getPath()); @@ -1472,18 +1674,20 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { KeyGenerator keyGenerator = HoodieSparkKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps())); for (GenericRecord record : records) { String recordKey = keyGenerator.getKey(record).getRecordKey(); - assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); + if (!populateMetadataField) { + assertNull(record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD)); + } assertTrue(expectedKeys.contains(recordKey)); } } } - private List writeAndVerifyBatch(SparkRDDWriteClient client, List inserts, String commitTime) throws IOException { + private List writeAndVerifyBatch(SparkRDDWriteClient client, List inserts, String commitTime, boolean populateMetaFields) throws IOException { client.startCommitWithTime(commitTime); JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts, 2); List statuses = client.upsert(insertRecordsRDD1, commitTime).collect(); assertNoWriteErrors(statuses); - verifyRecordsWritten(commitTime, inserts, statuses, client.config); + verifyRecordsWritten(commitTime, populateMetaFields, inserts, statuses, client.config); return statuses; } @@ -2130,4 +2334,16 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { .withProperties(populateMetaFields ? new Properties() : getPropertiesForKeyGen()).build(); } + public static class FailingPreCommitValidator> extends SparkPreCommitValidator { + + public FailingPreCommitValidator(HoodieSparkTable table, HoodieEngineContext context, HoodieWriteConfig config) { + super(table, context, config); + } + + @Override + protected void validateRecordsBeforeAndAfter(final Dataset before, final Dataset after, final Set partitionsAffected) { + throw new HoodieValidationException("simulate failure"); + } + } + } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java new file mode 100644 index 000000000..76f7e3ca5 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.view; + +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Create PreCommitFileSystemView by only filtering instants that are of interest. + * For example, we want to exclude + * other inflight instants. This is achieved by combining + * 1) FileSystemView with completed commits + * 2) Using list of files written/replaced by inflight instant that we are validating + * + */ +public class HoodieTablePreCommitFileSystemView { + + private Map> partitionToReplaceFileIds; + private List filesWritten; + private String preCommitInstantTime; + private SyncableFileSystemView completedCommitsFileSystemView; + private HoodieTableMetaClient tableMetaClient; + + /** + * Create a file system view for the inflight commit that we are validating. + */ + public HoodieTablePreCommitFileSystemView(HoodieTableMetaClient metaClient, + SyncableFileSystemView completedCommitsFileSystemView, + List filesWritten, + Map> partitionToReplaceFileIds, + String instantTime) { + this.completedCommitsFileSystemView = completedCommitsFileSystemView; + this.filesWritten = filesWritten; + this.partitionToReplaceFileIds = partitionToReplaceFileIds; + this.preCommitInstantTime = instantTime; + this.tableMetaClient = metaClient; + } + + /** + * Combine committed base files + new files created/replaced for given partition. + */ + public final Stream getLatestBaseFiles(String partitionStr) { + // get fileIds replaced by current inflight commit + List replacedFileIdsForPartition = partitionToReplaceFileIds.getOrDefault(partitionStr, Collections.emptyList()); + + // get new files written by current inflight commit + Map newFilesWrittenForPartition = filesWritten.stream() + .filter(file -> partitionStr.equals(file.getPartitionPath())) + .collect(Collectors.toMap(HoodieWriteStat::getFileId, writeStat -> + new HoodieBaseFile(new Path(tableMetaClient.getBasePath(), writeStat.getPath()).toString()))); + + Stream committedBaseFiles = this.completedCommitsFileSystemView.getLatestBaseFiles(partitionStr); + Stream baseFilesForCommittedFileIds = committedBaseFiles + // Remove files replaced by current inflight commit + .filter(baseFile -> !replacedFileIdsForPartition.contains(baseFile.getFileId())) + // if there is new version of file created by inflight commit, use that file instead of committed version + .map(baseFile -> { + HoodieBaseFile fileIdNewVersionExists = newFilesWrittenForPartition.remove(baseFile.getFileId()); + return Option.ofNullable(fileIdNewVersionExists).orElse(baseFile); + }); + + Stream baseFilesWithNewFileIds = newFilesWrittenForPartition.values().stream(); + return Stream.concat(baseFilesForCommittedFileIds, baseFilesWithNewFileIds); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieValidationException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieValidationException.java new file mode 100644 index 000000000..04b696d30 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieValidationException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.exception; + +/** + *

+ * Exception thrown for validation failures. + *

+ */ +public class HoodieValidationException extends HoodieException { + + public HoodieValidationException(String msg, Throwable t) { + super(msg, t); + } + + public HoodieValidationException(String msg) { + super(msg); + } +}