1
0

[HUDI-2792] Configure metadata payload consistency check (#4035)

- Relax metadata payload consistency check to consider spark task failures with spurious deletes
This commit is contained in:
Sivabalan Narayanan
2021-11-24 21:56:31 -05:00
committed by GitHub
parent 83f8ed2ae3
commit a9bd20804b
9 changed files with 125 additions and 43 deletions

View File

@@ -45,7 +45,6 @@ import org.apache.hudi.exception.HoodieClusteringException;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.SparkHoodieIndexFactory;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metrics.DistributedRegistry;
import org.apache.hudi.table.BulkInsertPartitioner;
@@ -314,9 +313,9 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
List<HoodieWriteStat> writeStats = writeStatuses.map(WriteStatus::getStat).collect();
writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime));
// commit to data table after committing to metadata table.
finalizeWrite(table, compactionCommitTime, writeStats);
writeTableMetadataForTableServices(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime));
// commit to data table after committing to metadata table.
LOG.info("Committing Compaction " + compactionCommitTime + ". Finished with result " + metadata);
CompactHelpers.getInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
WriteMarkersFactory.get(config.getMarkersType(), table, compactionCommitTime)
@@ -386,8 +385,8 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
throw new HoodieClusteringException("Clustering failed to write to files:"
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(s -> s.getFileId()).collect(Collectors.joining(",")));
}
writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime));
finalizeWrite(table, clusteringCommitTime, writeStats);
writeTableMetadataForTableServices(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.REPLACE_COMMIT_ACTION, clusteringCommitTime));
try {
// try to save statistics info to hudi
if (config.isDataSkippingEnabled() && config.isLayoutOptimizationEnabled() && !config.getClusteringSortColumns().isEmpty()) {
@@ -415,8 +414,8 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
LOG.info("Clustering successfully on commit " + clusteringCommitTime);
}
private void writeTableMetadata(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, HoodieCommitMetadata commitMetadata,
HoodieInstant hoodieInstant) {
private void writeTableMetadataForTableServices(HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, HoodieCommitMetadata commitMetadata,
HoodieInstant hoodieInstant) {
try {
this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty());
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction());
@@ -497,8 +496,6 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
HoodieTable table = createTable(config, hadoopConf);
TransactionUtils.resolveWriteConflictIfAny(table, this.txnManager.getCurrentTransactionOwner(),
Option.of(metadata), config, txnManager.getLastCompletedTransactionOwner());
table.getMetadataWriter().ifPresent(w -> ((HoodieTableMetadataWriter)w).update(metadata, inflightInstant.getTimestamp(),
table.isTableServiceAction(inflightInstant.getAction())));
}
@Override

View File

@@ -96,6 +96,7 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -121,6 +122,7 @@ import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("functional")
@@ -224,7 +226,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
@ParameterizedTest
@MethodSource("bootstrapAndTableOperationTestArgs")
public void testTableOperations(HoodieTableType tableType, boolean enableFullScan) throws Exception {
init(tableType, true, enableFullScan, false);
init(tableType, true, enableFullScan, false, false);
doWriteInsertAndUpsert(testTable);
// trigger an upsert
@@ -482,7 +484,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testMetadataBootstrapLargeCommitList(HoodieTableType tableType) throws Exception {
init(tableType, true, true, true);
init(tableType, true, true, true, false);
long baseCommitTime = Long.parseLong(HoodieActiveTimeline.createNewInstantTime());
for (int i = 1; i < 25; i += 7) {
long commitTime1 = getNextCommitTime(baseCommitTime);
@@ -541,6 +543,34 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
}
}
/**
* Tests the metadata payload spurious deletes.
* Lets say a commit was applied to metadata table, and later was explicitly got rolledback. Due to spark task failures, there could be more files in rollback
* metadata when compared to the original commit metadata. When payload consistency check is enabled, it will throw exception. If not, it will succeed.
* @throws Exception
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testMetadataPayloadSpuriousDeletes(boolean ignoreSpuriousDeletes) throws Exception {
tableType = COPY_ON_WRITE;
init(tableType, true, true, false, ignoreSpuriousDeletes);
doWriteInsertAndUpsert(testTable);
// trigger an upsert
doWriteOperationAndValidate(testTable, "0000003");
// trigger a commit and rollback
doWriteOperation(testTable, "0000004");
// add extra files in rollback to check for payload consistency
Map<String, List<String>> extraFiles = new HashMap<>();
extraFiles.put("p1", Collections.singletonList("f10"));
extraFiles.put("p2", Collections.singletonList("f12"));
testTable.doRollbackWithExtraFiles("0000004", "0000005", extraFiles);
if (!ignoreSpuriousDeletes) {
assertThrows(HoodieMetadataException.class, () -> validateMetadata(testTable));
} else {
validateMetadata(testTable);
}
}
/**
* Test several table operations with restore. This test uses SparkRDDWriteClient.
@@ -1101,7 +1131,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(),
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false, false).build(),
true)) {
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
@@ -1132,7 +1162,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
}
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(),
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false, false).build(),
true)) {
String newCommitTime = client.startCommit();
// Next insert
@@ -1154,7 +1184,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
// TESTCASE: If commit on the metadata table succeeds but fails on the dataset, then on next init the metadata table
// should be rolled back to last valid commit.
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(),
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false, false).build(),
true)) {
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
@@ -1178,7 +1208,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
}
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(),
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false, false).build(),
true)) {
String newCommitTime = client.startCommit();
// Next insert

View File

@@ -2079,7 +2079,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
private void testRollbackAfterConsistencyCheckFailureUsingFileList(boolean rollbackUsingMarkers, boolean enableOptimisticConsistencyGuard,
boolean populateMetaFields) throws Exception {
String instantTime = "000";
String instantTime = "00000000000010";
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();
Properties properties = new Properties();

View File

@@ -75,10 +75,11 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
}
public void init(HoodieTableType tableType, boolean enableMetadataTable) throws IOException {
init(tableType, enableMetadataTable, true, false);
init(tableType, enableMetadataTable, true, false, false);
}
public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableFullScan, boolean enableMetrics) throws IOException {
public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableFullScan, boolean enableMetrics, boolean
validateMetadataPayloadStateConsistency) throws IOException {
this.tableType = tableType;
initPath();
initSparkContexts("TestHoodieMetadata");
@@ -88,7 +89,7 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
initTestDataGenerator();
metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, enableMetadataTable, enableMetrics,
enableFullScan, true).build();
enableFullScan, true, validateMetadataPayloadStateConsistency).build();
initWriteConfigAndMetatableWriter(writeConfig, enableMetadataTable);
}
@@ -266,11 +267,12 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata,
boolean enableMetrics) {
return getWriteConfigBuilder(policy, autoCommit, useFileListingMetadata, enableMetrics, true, true);
return getWriteConfigBuilder(policy, autoCommit, useFileListingMetadata, enableMetrics, true, true, false);
}
protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata,
boolean enableMetrics, boolean enableFullScan, boolean useRollbackUsingMarkers) {
boolean enableMetrics, boolean enableFullScan, boolean useRollbackUsingMarkers,
boolean validateMetadataPayloadConsistency) {
Properties properties = new Properties();
properties.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName());
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
@@ -290,6 +292,7 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
.enableFullScan(enableFullScan)
.enableMetrics(enableMetrics)
.withPopulateMetaFields(false)
.ignoreSpuriousDeletes(validateMetadataPayloadConsistency)
.build())
.withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
.withExecutorMetrics(true).build())