From ac23d2587f58d2199535dea779925cec02304b2d Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Tue, 1 Dec 2020 13:44:57 -0800 Subject: [PATCH] [HUDI-1357] Added a check to validate records are not lost during merges. (#2216) - Turned off by default --- .../apache/hudi/config/HoodieWriteConfig.java | 15 ++++++ .../org/apache/hudi/io/HoodieMergeHandle.java | 27 ++++++++++ .../TestHoodieClientOnCopyOnWriteStorage.java | 49 +++++++++++++++++++ .../apache/hudi/common/util/ParquetUtils.java | 17 +++++++ .../hudi/io/storage/HoodieParquetReader.java | 3 +- .../hudi/common/util/TestParquetUtils.java | 13 +++++ 6 files changed, 122 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 42d3e2b40..b06f994b0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -117,6 +117,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { public static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks"; public static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7; + // Data validation check performed during merges before actual commits + private static final String MERGE_DATA_VALIDATION_CHECK_ENABLED = "hoodie.merge.data.validation.enabled"; + private static final String DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED = "false"; + /** * HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow * multiple write operations (upsert/buk-insert/...) to be executed within a single commit. @@ -282,6 +286,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return BulkInsertSortMode.valueOf(sortMode.toUpperCase()); } + public boolean isMergeDataValidationCheckEnabled() { + return Boolean.parseBoolean(props.getProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED)); + } + /** * compaction properties. */ @@ -983,6 +991,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withMergeDataValidationCheckEnabled(boolean enabled) { + props.setProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED, String.valueOf(enabled)); + return this; + } + public Builder withProperties(Properties properties) { this.props.putAll(properties); return this; @@ -1032,6 +1045,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { setDefaultOnCondition(props, !props.containsKey(AVRO_SCHEMA_VALIDATE), AVRO_SCHEMA_VALIDATE, DEFAULT_AVRO_SCHEMA_VALIDATE); setDefaultOnCondition(props, !props.containsKey(BULKINSERT_SORT_MODE), BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE); + setDefaultOnCondition(props, !props.containsKey(MERGE_DATA_VALIDATION_CHECK_ENABLED), + MERGE_DATA_VALIDATION_CHECK_ENABLED, DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED); // Make sure the props is propagated setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().fromProperties(props).build()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index ad03023d7..cab7283f4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -34,8 +34,11 @@ import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieCorruptedDataException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieUpsertException; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.table.HoodieTable; @@ -292,6 +295,8 @@ public class HoodieMergeHandle extends H runtimeStats.setTotalUpsertTime(timer.endTimer()); stat.setRuntimeStats(runtimeStats); + performMergeDataValidationCheck(writeStatus); + LOG.info(String.format("MergeHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(), stat.getFileId(), runtimeStats.getTotalUpsertTime())); @@ -301,6 +306,28 @@ public class HoodieMergeHandle extends H } } + public void performMergeDataValidationCheck(WriteStatus writeStatus) { + if (!config.isMergeDataValidationCheckEnabled()) { + return; + } + + long oldNumWrites = 0; + try { + HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath); + oldNumWrites = reader.getTotalRecords(); + } catch (IOException e) { + throw new HoodieUpsertException("Failed to check for merge data validation", e); + } + + if ((writeStatus.getStat().getNumWrites() + writeStatus.getStat().getNumDeletes()) < oldNumWrites) { + throw new HoodieCorruptedDataException( + String.format("Record write count decreased for file: %s, Partition Path: %s (%s:%d + %d < %s:%d)", + writeStatus.getFileId(), writeStatus.getPartitionPath(), + instantTime, writeStatus.getStat().getNumWrites(), writeStatus.getStat().getNumDeletes(), + FSUtils.getCommitTime(oldFilePath.toString()), oldNumWrites)); + } + } + public Path getOldFilePath() { return oldFilePath; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index bbb40488b..d0fda81be 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -47,10 +47,12 @@ import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; +import org.apache.hudi.exception.HoodieCorruptedDataException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieRollbackException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; +import org.apache.hudi.io.HoodieMergeHandle; import org.apache.hudi.table.HoodieSparkTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; @@ -376,6 +378,53 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { instants.get(3)); assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "006"), instants.get(4)); + + final HoodieWriteConfig cfg = hoodieWriteConfig; + final String instantTime = "007"; + HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); + String basePathStr = basePath; + HoodieTable table = getHoodieTable(metaClient, cfg); + jsc.parallelize(Arrays.asList(1)).map(e -> { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(metaClient.getActiveTimeline().getInstantDetails( + metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get()).get(), + HoodieCommitMetadata.class); + String filePath = commitMetadata.getPartitionToWriteStats().values().stream() + .flatMap(w -> w.stream()).filter(s -> s.getPath().endsWith(".parquet")).findAny() + .map(ee -> ee.getPath()).orElse(null); + String partitionPath = commitMetadata.getPartitionToWriteStats().values().stream() + .flatMap(w -> w.stream()).filter(s -> s.getPath().endsWith(".parquet")).findAny() + .map(ee -> ee.getPartitionPath()).orElse(null); + Path parquetFilePath = new Path(basePathStr, filePath); + HoodieBaseFile baseFile = new HoodieBaseFile(parquetFilePath.toString()); + + try { + HoodieMergeHandle handle = new HoodieMergeHandle(cfg, instantTime, table, new HashMap<>(), + partitionPath, FSUtils.getFileId(parquetFilePath.getName()), baseFile, new SparkTaskContextSupplier()); + WriteStatus writeStatus = new WriteStatus(false, 0.0); + writeStatus.setStat(new HoodieWriteStat()); + writeStatus.getStat().setNumWrites(0); + handle.performMergeDataValidationCheck(writeStatus); + } catch (HoodieCorruptedDataException e1) { + fail("Exception not expected because merge validation check is disabled"); + } + + try { + final String newInstantTime = "006"; + cfg.getProps().setProperty("hoodie.merge.data.validation.enabled", "true"); + HoodieWriteConfig cfg2 = HoodieWriteConfig.newBuilder().withProps(cfg.getProps()).build(); + HoodieMergeHandle handle = new HoodieMergeHandle(cfg2, newInstantTime, table, new HashMap<>(), + partitionPath, FSUtils.getFileId(parquetFilePath.getName()), baseFile, new SparkTaskContextSupplier()); + WriteStatus writeStatus = new WriteStatus(false, 0.0); + writeStatus.setStat(new HoodieWriteStat()); + writeStatus.getStat().setNumWrites(0); + handle.performMergeDataValidationCheck(writeStatus); + fail("The above line should have thrown an exception"); + } catch (HoodieCorruptedDataException e2) { + // expected + } + return true; + }).collect(); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java index eb5e2b5fa..dc444aa21 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ParquetUtils.java @@ -39,6 +39,7 @@ import org.apache.parquet.avro.AvroReadSupport; import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; @@ -261,6 +262,22 @@ public class ParquetUtils { return records; } + /** + * Returns the number of records in the parquet file. + * + * @param conf Configuration + * @param parquetFilePath path of the file + */ + public static long getRowCount(Configuration conf, Path parquetFilePath) { + ParquetMetadata footer; + long rowCount = 0; + footer = readMetadata(conf, parquetFilePath); + for (BlockMetaData b : footer.getBlocks()) { + rowCount += b.getRowCount(); + } + return rowCount; + } + static class RecordKeysFilterFunction implements Function { private final Set candidateKeys; diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java index 107f50318..feacbda54 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetReader.java @@ -74,7 +74,6 @@ public class HoodieParquetReader implements HoodieFileR @Override public long getTotalRecords() { - // TODO Auto-generated method stub - return 0; + return ParquetUtils.getRowCount(conf, path); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java index 9496f0195..2bcbcbdab 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestParquetUtils.java @@ -36,6 +36,7 @@ import org.apache.parquet.avro.AvroSchemaConverter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -147,6 +148,18 @@ public class TestParquetUtils extends HoodieCommonTestHarness { } } + @Test + public void testReadCounts() throws Exception { + String filePath = basePath + "/test.parquet"; + List rowKeys = new ArrayList<>(); + for (int i = 0; i < 123; i++) { + rowKeys.add(UUID.randomUUID().toString()); + } + writeParquetFile(BloomFilterTypeCode.SIMPLE.name(), filePath, rowKeys); + + assertEquals(123, ParquetUtils.getRowCount(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath))); + } + private void writeParquetFile(String typeCode, String filePath, List rowKeys) throws Exception { writeParquetFile(typeCode, filePath, rowKeys, HoodieAvroUtils.getRecordKeySchema(), false, ""); }