[HUDI-1357] Added a check to validate records are not lost during merges. (#2216)
- Turned off by default
This commit is contained in:
@@ -117,6 +117,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
public static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
|
||||
public static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
|
||||
|
||||
// Data validation check performed during merges before actual commits
|
||||
private static final String MERGE_DATA_VALIDATION_CHECK_ENABLED = "hoodie.merge.data.validation.enabled";
|
||||
private static final String DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED = "false";
|
||||
|
||||
/**
|
||||
* HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow
|
||||
* multiple write operations (upsert/buk-insert/...) to be executed within a single commit.
|
||||
@@ -282,6 +286,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
return BulkInsertSortMode.valueOf(sortMode.toUpperCase());
|
||||
}
|
||||
|
||||
public boolean isMergeDataValidationCheckEnabled() {
|
||||
return Boolean.parseBoolean(props.getProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED));
|
||||
}
|
||||
|
||||
/**
|
||||
* compaction properties.
|
||||
*/
|
||||
@@ -983,6 +991,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withMergeDataValidationCheckEnabled(boolean enabled) {
|
||||
props.setProperty(MERGE_DATA_VALIDATION_CHECK_ENABLED, String.valueOf(enabled));
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withProperties(Properties properties) {
|
||||
this.props.putAll(properties);
|
||||
return this;
|
||||
@@ -1032,6 +1045,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
||||
setDefaultOnCondition(props, !props.containsKey(AVRO_SCHEMA_VALIDATE), AVRO_SCHEMA_VALIDATE, DEFAULT_AVRO_SCHEMA_VALIDATE);
|
||||
setDefaultOnCondition(props, !props.containsKey(BULKINSERT_SORT_MODE),
|
||||
BULKINSERT_SORT_MODE, DEFAULT_BULKINSERT_SORT_MODE);
|
||||
setDefaultOnCondition(props, !props.containsKey(MERGE_DATA_VALIDATION_CHECK_ENABLED),
|
||||
MERGE_DATA_VALIDATION_CHECK_ENABLED, DEFAULT_MERGE_DATA_VALIDATION_CHECK_ENABLED);
|
||||
|
||||
// Make sure the props is propagated
|
||||
setDefaultOnCondition(props, !isIndexConfigSet, HoodieIndexConfig.newBuilder().fromProperties(props).build());
|
||||
|
||||
@@ -34,8 +34,11 @@ import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieCorruptedDataException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.exception.HoodieUpsertException;
|
||||
import org.apache.hudi.io.storage.HoodieFileReader;
|
||||
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
|
||||
import org.apache.hudi.io.storage.HoodieFileWriter;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
|
||||
@@ -292,6 +295,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
||||
runtimeStats.setTotalUpsertTime(timer.endTimer());
|
||||
stat.setRuntimeStats(runtimeStats);
|
||||
|
||||
performMergeDataValidationCheck(writeStatus);
|
||||
|
||||
LOG.info(String.format("MergeHandle for partitionPath %s fileID %s, took %d ms.", stat.getPartitionPath(),
|
||||
stat.getFileId(), runtimeStats.getTotalUpsertTime()));
|
||||
|
||||
@@ -301,6 +306,28 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
|
||||
}
|
||||
}
|
||||
|
||||
public void performMergeDataValidationCheck(WriteStatus writeStatus) {
|
||||
if (!config.isMergeDataValidationCheckEnabled()) {
|
||||
return;
|
||||
}
|
||||
|
||||
long oldNumWrites = 0;
|
||||
try {
|
||||
HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(hoodieTable.getHadoopConf(), oldFilePath);
|
||||
oldNumWrites = reader.getTotalRecords();
|
||||
} catch (IOException e) {
|
||||
throw new HoodieUpsertException("Failed to check for merge data validation", e);
|
||||
}
|
||||
|
||||
if ((writeStatus.getStat().getNumWrites() + writeStatus.getStat().getNumDeletes()) < oldNumWrites) {
|
||||
throw new HoodieCorruptedDataException(
|
||||
String.format("Record write count decreased for file: %s, Partition Path: %s (%s:%d + %d < %s:%d)",
|
||||
writeStatus.getFileId(), writeStatus.getPartitionPath(),
|
||||
instantTime, writeStatus.getStat().getNumWrites(), writeStatus.getStat().getNumDeletes(),
|
||||
FSUtils.getCommitTime(oldFilePath.toString()), oldNumWrites));
|
||||
}
|
||||
}
|
||||
|
||||
public Path getOldFilePath() {
|
||||
return oldFilePath;
|
||||
}
|
||||
|
||||
@@ -47,10 +47,12 @@ import org.apache.hudi.config.HoodieIndexConfig;
|
||||
import org.apache.hudi.config.HoodieStorageConfig;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
import org.apache.hudi.exception.HoodieCommitException;
|
||||
import org.apache.hudi.exception.HoodieCorruptedDataException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.hudi.exception.HoodieRollbackException;
|
||||
import org.apache.hudi.index.HoodieIndex;
|
||||
import org.apache.hudi.index.HoodieIndex.IndexType;
|
||||
import org.apache.hudi.io.HoodieMergeHandle;
|
||||
import org.apache.hudi.table.HoodieSparkTable;
|
||||
import org.apache.hudi.table.HoodieTable;
|
||||
import org.apache.hudi.table.MarkerFiles;
|
||||
@@ -376,6 +378,53 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
|
||||
instants.get(3));
|
||||
assertEquals(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, "006"),
|
||||
instants.get(4));
|
||||
|
||||
final HoodieWriteConfig cfg = hoodieWriteConfig;
|
||||
final String instantTime = "007";
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
|
||||
String basePathStr = basePath;
|
||||
HoodieTable table = getHoodieTable(metaClient, cfg);
|
||||
jsc.parallelize(Arrays.asList(1)).map(e -> {
|
||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||
.fromBytes(metaClient.getActiveTimeline().getInstantDetails(
|
||||
metaClient.getCommitsTimeline().filterCompletedInstants().lastInstant().get()).get(),
|
||||
HoodieCommitMetadata.class);
|
||||
String filePath = commitMetadata.getPartitionToWriteStats().values().stream()
|
||||
.flatMap(w -> w.stream()).filter(s -> s.getPath().endsWith(".parquet")).findAny()
|
||||
.map(ee -> ee.getPath()).orElse(null);
|
||||
String partitionPath = commitMetadata.getPartitionToWriteStats().values().stream()
|
||||
.flatMap(w -> w.stream()).filter(s -> s.getPath().endsWith(".parquet")).findAny()
|
||||
.map(ee -> ee.getPartitionPath()).orElse(null);
|
||||
Path parquetFilePath = new Path(basePathStr, filePath);
|
||||
HoodieBaseFile baseFile = new HoodieBaseFile(parquetFilePath.toString());
|
||||
|
||||
try {
|
||||
HoodieMergeHandle handle = new HoodieMergeHandle(cfg, instantTime, table, new HashMap<>(),
|
||||
partitionPath, FSUtils.getFileId(parquetFilePath.getName()), baseFile, new SparkTaskContextSupplier());
|
||||
WriteStatus writeStatus = new WriteStatus(false, 0.0);
|
||||
writeStatus.setStat(new HoodieWriteStat());
|
||||
writeStatus.getStat().setNumWrites(0);
|
||||
handle.performMergeDataValidationCheck(writeStatus);
|
||||
} catch (HoodieCorruptedDataException e1) {
|
||||
fail("Exception not expected because merge validation check is disabled");
|
||||
}
|
||||
|
||||
try {
|
||||
final String newInstantTime = "006";
|
||||
cfg.getProps().setProperty("hoodie.merge.data.validation.enabled", "true");
|
||||
HoodieWriteConfig cfg2 = HoodieWriteConfig.newBuilder().withProps(cfg.getProps()).build();
|
||||
HoodieMergeHandle handle = new HoodieMergeHandle(cfg2, newInstantTime, table, new HashMap<>(),
|
||||
partitionPath, FSUtils.getFileId(parquetFilePath.getName()), baseFile, new SparkTaskContextSupplier());
|
||||
WriteStatus writeStatus = new WriteStatus(false, 0.0);
|
||||
writeStatus.setStat(new HoodieWriteStat());
|
||||
writeStatus.getStat().setNumWrites(0);
|
||||
handle.performMergeDataValidationCheck(writeStatus);
|
||||
fail("The above line should have thrown an exception");
|
||||
} catch (HoodieCorruptedDataException e2) {
|
||||
// expected
|
||||
}
|
||||
return true;
|
||||
}).collect();
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -39,6 +39,7 @@ import org.apache.parquet.avro.AvroReadSupport;
|
||||
import org.apache.parquet.avro.AvroSchemaConverter;
|
||||
import org.apache.parquet.hadoop.ParquetFileReader;
|
||||
import org.apache.parquet.hadoop.ParquetReader;
|
||||
import org.apache.parquet.hadoop.metadata.BlockMetaData;
|
||||
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
|
||||
import org.apache.parquet.schema.MessageType;
|
||||
|
||||
@@ -261,6 +262,22 @@ public class ParquetUtils {
|
||||
return records;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the number of records in the parquet file.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @param parquetFilePath path of the file
|
||||
*/
|
||||
public static long getRowCount(Configuration conf, Path parquetFilePath) {
|
||||
ParquetMetadata footer;
|
||||
long rowCount = 0;
|
||||
footer = readMetadata(conf, parquetFilePath);
|
||||
for (BlockMetaData b : footer.getBlocks()) {
|
||||
rowCount += b.getRowCount();
|
||||
}
|
||||
return rowCount;
|
||||
}
|
||||
|
||||
static class RecordKeysFilterFunction implements Function<String, Boolean> {
|
||||
|
||||
private final Set<String> candidateKeys;
|
||||
|
||||
@@ -74,7 +74,6 @@ public class HoodieParquetReader<R extends IndexedRecord> implements HoodieFileR
|
||||
|
||||
@Override
|
||||
public long getTotalRecords() {
|
||||
// TODO Auto-generated method stub
|
||||
return 0;
|
||||
return ParquetUtils.getRowCount(conf, path);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -36,6 +36,7 @@ import org.apache.parquet.avro.AvroSchemaConverter;
|
||||
import org.apache.parquet.hadoop.ParquetWriter;
|
||||
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
@@ -147,6 +148,18 @@ public class TestParquetUtils extends HoodieCommonTestHarness {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadCounts() throws Exception {
|
||||
String filePath = basePath + "/test.parquet";
|
||||
List<String> rowKeys = new ArrayList<>();
|
||||
for (int i = 0; i < 123; i++) {
|
||||
rowKeys.add(UUID.randomUUID().toString());
|
||||
}
|
||||
writeParquetFile(BloomFilterTypeCode.SIMPLE.name(), filePath, rowKeys);
|
||||
|
||||
assertEquals(123, ParquetUtils.getRowCount(HoodieTestUtils.getDefaultHadoopConf(), new Path(filePath)));
|
||||
}
|
||||
|
||||
private void writeParquetFile(String typeCode, String filePath, List<String> rowKeys) throws Exception {
|
||||
writeParquetFile(typeCode, filePath, rowKeys, HoodieAvroUtils.getRecordKeySchema(), false, "");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user