1
0

[HUDI-2712] Fixing a bug with rollback of partially failed commit which has new partitions (#3947)

This commit is contained in:
Sivabalan Narayanan
2021-11-15 22:36:03 -05:00
committed by GitHub
parent 38b6934352
commit bff8769ed4
5 changed files with 80 additions and 23 deletions

View File

@@ -60,7 +60,7 @@ public class ListingBasedRollbackStrategy implements BaseRollbackPlanActionExecu
List<ListingBasedRollbackRequest> rollbackRequests = null;
if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context,
table.getMetaClient().getBasePath(), config);
table.getMetaClient().getBasePath());
} else {
rollbackRequests = RollbackUtils
.generateRollbackRequestsUsingFileListingMOR(instantToRollback, table, context);

View File

@@ -18,10 +18,6 @@
package org.apache.hudi.table.action.rollback;
import org.apache.hadoop.fs.FileStatus;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.hudi.avro.model.HoodieRollbackPlan;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -42,6 +38,10 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.fs.FileStatus;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -56,7 +56,8 @@ public class RollbackUtils {
/**
* Get Latest version of Rollback plan corresponding to a clean instant.
* @param metaClient Hoodie Table Meta Client
*
* @param metaClient Hoodie Table Meta Client
* @param rollbackInstant Instant referring to rollback action
* @return Rollback plan corresponding to rollback instant
* @throws IOException
@@ -106,12 +107,10 @@ public class RollbackUtils {
* Generate all rollback requests that needs rolling back this action without actually performing rollback for COW table type.
* @param engineContext instance of {@link HoodieEngineContext} to use.
* @param basePath base path of interest.
* @param config instance of {@link HoodieWriteConfig} to use.
* @return {@link List} of {@link ListingBasedRollbackRequest}s thus collected.
*/
public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListingCOW(HoodieEngineContext engineContext,
String basePath, HoodieWriteConfig config) {
return FSUtils.getAllPartitionPaths(engineContext, config.getMetadataConfig(), basePath).stream()
public static List<ListingBasedRollbackRequest> generateRollbackRequestsByListingCOW(HoodieEngineContext engineContext, String basePath) {
return FSUtils.getAllPartitionPaths(engineContext, basePath, false, false).stream()
.map(ListingBasedRollbackRequest::createRollbackRequestWithDeleteDataAndLogFilesAction)
.collect(Collectors.toList());
}
@@ -127,7 +126,7 @@ public class RollbackUtils {
public static List<ListingBasedRollbackRequest> generateRollbackRequestsUsingFileListingMOR(HoodieInstant instantToRollback, HoodieTable table, HoodieEngineContext context) throws IOException {
String commit = instantToRollback.getTimestamp();
HoodieWriteConfig config = table.getConfig();
List<String> partitions = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath());
List<String> partitions = FSUtils.getAllPartitionPaths(context, table.getMetaClient().getBasePath(), false, false);
if (partitions.isEmpty()) {
return new ArrayList<>();
}

View File

@@ -102,7 +102,7 @@ public class ZeroToOneUpgradeHandler implements UpgradeHandler {
// generate rollback stats
List<ListingBasedRollbackRequest> rollbackRequests;
if (table.getMetaClient().getTableType() == HoodieTableType.COPY_ON_WRITE) {
rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getBasePath(), table.getConfig());
rollbackRequests = RollbackUtils.generateRollbackRequestsByListingCOW(context, table.getMetaClient().getBasePath());
} else {
rollbackRequests = RollbackUtils.generateRollbackRequestsUsingFileListingMOR(commitInstantOpt.get(), table, context);
}

View File

@@ -82,7 +82,6 @@ import org.apache.hadoop.util.Time;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -93,6 +92,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@@ -307,9 +307,8 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
/**
* Test rollback of various table operations sync to Metadata Table correctly.
*/
//@ParameterizedTest
//@EnumSource(HoodieTableType.class)
@Disabled
@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testRollbackOperations(HoodieTableType tableType) throws Exception {
init(tableType);
doWriteInsertAndUpsert(testTable);
@@ -1087,18 +1086,72 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
assertTrue(oldStatus.getModificationTime() < newStatus.getModificationTime());
}
/**
* Tests rollback of a commit which has new partitions which is not present in hudi table prior to the commit being rolledback.
*
* @throws Exception
*/
@Test
public void testRollbackOfPartiallyFailedCommitWithNewPartitions() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(),
true)) {
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
List<HoodieRecord> upsertRecords = new ArrayList<>();
for (HoodieRecord entry : records) {
if (entry.getPartitionPath().equals(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)
|| entry.getPartitionPath().equals(HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH)) {
upsertRecords.add(entry);
}
}
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(upsertRecords, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
records = dataGen.generateInserts(newCommitTime, 20);
writeStatuses = client.insert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
// There is no way to simulate failed commit on the main dataset, hence we simply delete the completed
// instant so that only the inflight is left over.
String commitInstantFileName = HoodieTimeline.makeCommitFileName(newCommitTime);
assertTrue(fs.delete(new Path(basePath + Path.SEPARATOR + HoodieTableMetaClient.METAFOLDER_NAME,
commitInstantFileName), false));
}
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(),
true)) {
String newCommitTime = client.startCommit();
// Next insert
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
List<WriteStatus> writeStatuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
}
}
/**
* Test various error scenarios.
*/
//@Test
@Disabled
@Test
public void testErrorCases() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
// TESTCASE: If commit on the metadata table succeeds but fails on the dataset, then on next init the metadata table
// should be rolled back to last valid commit.
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(),
true)) {
String newCommitTime = HoodieActiveTimeline.createNewInstantTime();
client.startCommitWithTime(newCommitTime);
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 10);
@@ -1111,6 +1164,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
records = dataGen.generateInserts(newCommitTime, 5);
writeStatuses = client.bulkInsert(jsc.parallelize(records, 1), newCommitTime).collect();
assertNoWriteErrors(writeStatuses);
validateMetadata(client);
// There is no way to simulate failed commit on the main dataset, hence we simply delete the completed
// instant so that only the inflight is left over.
@@ -1119,7 +1173,9 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
commitInstantFileName), false));
}
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, getWriteConfig(true, true), true)) {
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext,
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, true, false, true, false).build(),
true)) {
String newCommitTime = client.startCommit();
// Next insert
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 5);

View File

@@ -87,7 +87,8 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
initMetaClient(tableType);
initTestDataGenerator();
metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, enableMetadataTable, enableMetrics, enableFullScan).build();
writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, enableMetadataTable, enableMetrics,
enableFullScan, true).build();
initWriteConfigAndMetatableWriter(writeConfig, enableMetadataTable);
}
@@ -265,11 +266,11 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata,
boolean enableMetrics) {
return getWriteConfigBuilder(policy, autoCommit, useFileListingMetadata, enableMetrics, true);
return getWriteConfigBuilder(policy, autoCommit, useFileListingMetadata, enableMetrics, true, true);
}
protected HoodieWriteConfig.Builder getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean autoCommit, boolean useFileListingMetadata,
boolean enableMetrics, boolean enableFullScan) {
boolean enableMetrics, boolean enableFullScan, boolean useRollbackUsingMarkers) {
Properties properties = new Properties();
properties.put(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key(), SimpleKeyGenerator.class.getName());
return HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
@@ -292,6 +293,7 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
.withExecutorMetrics(true).build())
.withMetricsGraphiteConfig(HoodieMetricsGraphiteConfig.newBuilder()
.usePrefix("unit-test").build())
.withRollbackUsingMarkers(useRollbackUsingMarkers)
.withProperties(properties);
}