1
0

[HUDI-2573] Fixing double locking with multi-writers (#3827)

- There are two code paths, where we are taking double locking. this was added as part of adding data table locks to update metadata table. Fixing those flows to avoid taking locks if a parent transaction already acquired a lock.
This commit is contained in:
Sivabalan Narayanan
2021-10-29 12:14:39 -04:00
committed by GitHub
parent 69ee790a47
commit 29574af239
18 changed files with 281 additions and 61 deletions

View File

@@ -501,7 +501,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
} else {
// Do not reuse instantTime for clean as metadata table requires all changes to have unique instant timestamps.
LOG.info("Auto cleaning is enabled. Running cleaner now");
clean();
clean(true);
}
}
}
@@ -570,16 +570,22 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
SavepointHelpers.validateSavepointRestore(table, savepointTime);
}
@Deprecated
public boolean rollback(final String commitInstantTime) throws HoodieRollbackException {
return rollback(commitInstantTime, false);
}
/**
* @Deprecated
* Rollback the inflight record changes with the given commit time. This
* will be removed in future in favor of {@link AbstractHoodieWriteClient#restoreToInstant(String)}
*
* @param commitInstantTime Instant time of the commit
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
* @throws HoodieRollbackException if rollback cannot be performed successfully
*/
@Deprecated
public boolean rollback(final String commitInstantTime) throws HoodieRollbackException {
public boolean rollback(final String commitInstantTime, boolean skipLocking) throws HoodieRollbackException {
LOG.info("Begin rollback of instant " + commitInstantTime);
final String rollbackInstantTime = HoodieActiveTimeline.createNewInstantTime();
final Timer.Context timerContext = this.metrics.getRollbackCtx();
@@ -590,10 +596,12 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
.findFirst());
if (commitInstantOpt.isPresent()) {
LOG.info("Scheduling Rollback at instant time :" + rollbackInstantTime);
Option<HoodieRollbackPlan> rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime, commitInstantOpt.get(), false);
Option<HoodieRollbackPlan> rollbackPlanOption = table.scheduleRollback(context, rollbackInstantTime,
commitInstantOpt.get(), false);
if (rollbackPlanOption.isPresent()) {
// execute rollback
HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true);
HoodieRollbackMetadata rollbackMetadata = table.rollback(context, rollbackInstantTime, commitInstantOpt.get(), true,
skipLocking);
if (timerContext != null) {
long durationInMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateRollbackMetrics(durationInMs, rollbackMetadata.getTotalFilesDeleted());
@@ -644,7 +652,19 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
* cleaned)
*/
public HoodieCleanMetadata clean(String cleanInstantTime) throws HoodieIOException {
return clean(cleanInstantTime, true);
return clean(cleanInstantTime, true, false);
}
/**
* Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
* configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
* cleaned)
* @param cleanInstantTime instant time for clean.
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
* @return instance of {@link HoodieCleanMetadata}.
*/
public HoodieCleanMetadata clean(String cleanInstantTime, boolean skipLocking) throws HoodieIOException {
return clean(cleanInstantTime, true, skipLocking);
}
/**
@@ -653,8 +673,11 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
* cleaned). This API provides the flexibility to schedule clean instant asynchronously via
* {@link AbstractHoodieWriteClient#scheduleTableService(String, Option, TableServiceType)} and disable inline scheduling
* of clean.
* @param cleanInstantTime instant time for clean.
* @param scheduleInline true if needs to be scheduled inline. false otherwise.
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
*/
public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline) throws HoodieIOException {
public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline, boolean skipLocking) throws HoodieIOException {
if (scheduleInline) {
scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
}
@@ -662,8 +685,8 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
final Timer.Context timerContext = metrics.getCleanCtx();
LOG.info("Cleaned failed attempts if any");
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites());
HoodieCleanMetadata metadata = createTable(config, hadoopConf).clean(context, cleanInstantTime);
HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites(skipLocking));
HoodieCleanMetadata metadata = createTable(config, hadoopConf).clean(context, cleanInstantTime, skipLocking);
if (timerContext != null && metadata != null) {
long durationMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
@@ -675,7 +698,17 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
}
public HoodieCleanMetadata clean() {
return clean(HoodieActiveTimeline.createNewInstantTime());
return clean(false);
}
/**
* Triggers clean for the table. This refers to Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
* * configurations and CleaningPolicy used.
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
* @return instance of {@link HoodieCleanMetadata}.
*/
public HoodieCleanMetadata clean(boolean skipLocking) {
return clean(HoodieActiveTimeline.createNewInstantTime(), skipLocking);
}
/**
@@ -797,20 +830,29 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
* Rollback all failed writes.
*/
public Boolean rollbackFailedWrites() {
return rollbackFailedWrites(false);
}
/**
* Rollback all failed writes.
* @param skipLocking if this is triggered by another parent transaction, locking can be skipped.
*/
public Boolean rollbackFailedWrites(boolean skipLocking) {
HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
List<String> instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy());
rollbackFailedWrites(instantsToRollback);
List<String> instantsToRollback = getInstantsToRollback(table.getMetaClient(), config.getFailedWritesCleanPolicy(),
Option.empty());
rollbackFailedWrites(instantsToRollback, skipLocking);
return true;
}
protected void rollbackFailedWrites(List<String> instantsToRollback) {
protected void rollbackFailedWrites(List<String> instantsToRollback, boolean skipLocking) {
for (String instant : instantsToRollback) {
if (HoodieTimeline.compareTimestamps(instant, HoodieTimeline.LESSER_THAN_OR_EQUALS,
HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
rollbackFailedBootstrap();
break;
} else {
rollback(instant);
rollback(instant, skipLocking);
}
}
// Delete any heartbeat files for already rolled back commits
@@ -822,11 +864,17 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
}
}
protected List<String> getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy) {
protected List<String> getInstantsToRollback(HoodieTableMetaClient metaClient, HoodieFailedWritesCleaningPolicy cleaningPolicy, Option<String> curInstantTime) {
Stream<HoodieInstant> inflightInstantsStream = getInflightTimelineExcludeCompactionAndClustering(metaClient)
.getReverseOrderedInstants();
if (cleaningPolicy.isEager()) {
return inflightInstantsStream.map(HoodieInstant::getTimestamp).collect(Collectors.toList());
return inflightInstantsStream.map(HoodieInstant::getTimestamp).filter(entry -> {
if (curInstantTime.isPresent()) {
return !entry.equals(curInstantTime.get());
} else {
return true;
}
}).collect(Collectors.toList());
} else if (cleaningPolicy.isLazy()) {
return inflightInstantsStream.filter(instant -> {
try {
@@ -975,7 +1023,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable<T, I, K, O> table) {
String commitTime = HoodieActiveTimeline.createNewInstantTime();
table.scheduleRollback(context, commitTime, inflightInstant, false);
table.rollback(context, commitTime, inflightInstant, false);
table.rollback(context, commitTime, inflightInstant, false, false);
table.getActiveTimeline().revertReplaceCommitInflightToRequested(inflightInstant);
}

View File

@@ -424,7 +424,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
*
* @return information on cleaned file slices
*/
public abstract HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime);
public abstract HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime, boolean skipLocking);
/**
* Schedule rollback for the instant time.
@@ -452,7 +452,8 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
public abstract HoodieRollbackMetadata rollback(HoodieEngineContext context,
String rollbackInstantTime,
HoodieInstant commitInstant,
boolean deleteInstants);
boolean deleteInstants,
boolean skipLocking);
/**
* Create a savepoint at the specified instant, so that the table can be restored
@@ -480,7 +481,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
public void rollbackInflightCompaction(HoodieInstant inflightInstant) {
String commitTime = HoodieActiveTimeline.createNewInstantTime();
scheduleRollback(context, commitTime, inflightInstant, false);
rollback(context, commitTime, inflightInstant, false);
rollback(context, commitTime, inflightInstant, false, false);
getActiveTimeline().revertCompactionInflightToRequested(inflightInstant);
}

View File

@@ -60,10 +60,16 @@ public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends
private static final long serialVersionUID = 1L;
private static final Logger LOG = LogManager.getLogger(CleanActionExecutor.class);
private final TransactionManager txnManager;
private final boolean skipLocking;
public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) {
this(context, config, table, instantTime, false);
}
public CleanActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime, boolean skipLocking) {
super(context, config, table, instantTime);
this.txnManager = new TransactionManager(config, table.getMetaClient().getFs());
this.skipLocking = skipLocking;
}
static Boolean deleteFileAndGetResult(FileSystem fs, String deletePathStr) throws IOException {
@@ -214,11 +220,17 @@ public class CleanActionExecutor<T extends HoodieRecordPayload, I, K, O> extends
* @param cleanMetadata instance of {@link HoodieCleanMetadata} to be applied to metadata.
*/
private void writeMetadata(HoodieCleanMetadata cleanMetadata) {
try {
this.txnManager.beginTransaction(Option.empty(), Option.empty());
writeTableMetadata(cleanMetadata);
} finally {
this.txnManager.endTransaction();
if (config.isMetadataTableEnabled()) {
try {
if (!skipLocking) {
this.txnManager.beginTransaction(Option.empty(), Option.empty());
}
writeTableMetadata(cleanMetadata);
} finally {
if (!skipLocking) {
this.txnManager.endTransaction();
}
}
}
}

View File

@@ -58,6 +58,7 @@ public class CopyOnWriteRestoreActionExecutor<T extends HoodieRecordPayload, I,
instantToRollback,
true,
true,
false,
false);
return rollbackActionExecutor.execute();
}

View File

@@ -62,6 +62,7 @@ public class MergeOnReadRestoreActionExecutor<T extends HoodieRecordPayload, I,
instantToRollback,
true,
true,
false,
false);
// TODO : Get file status and create a rollback stat and file

View File

@@ -59,15 +59,17 @@ public abstract class BaseRollbackActionExecutor<T extends HoodieRecordPayload,
protected final boolean skipTimelinePublish;
protected final boolean useMarkerBasedStrategy;
private final TransactionManager txnManager;
private final boolean skipLocking;
public BaseRollbackActionExecutor(HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
HoodieInstant instantToRollback,
boolean deleteInstants) {
boolean deleteInstants,
boolean skipLocking) {
this(context, config, table, instantTime, instantToRollback, deleteInstants,
false, config.shouldRollbackUsingMarkers());
false, config.shouldRollbackUsingMarkers(), skipLocking);
}
public BaseRollbackActionExecutor(HoodieEngineContext context,
@@ -77,7 +79,8 @@ public abstract class BaseRollbackActionExecutor<T extends HoodieRecordPayload,
HoodieInstant instantToRollback,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy) {
boolean useMarkerBasedStrategy,
boolean skipLocking) {
super(context, config, table, instantTime);
this.instantToRollback = instantToRollback;
this.deleteInstants = deleteInstants;
@@ -87,6 +90,7 @@ public abstract class BaseRollbackActionExecutor<T extends HoodieRecordPayload,
ValidationUtils.checkArgument(!instantToRollback.isCompleted(),
"Cannot use marker based rollback strategy on completed instant:" + instantToRollback);
}
this.skipLocking = skipLocking;
this.txnManager = new TransactionManager(config, table.getMetaClient().getFs());
}
@@ -265,11 +269,17 @@ public abstract class BaseRollbackActionExecutor<T extends HoodieRecordPayload,
* @param rollbackMetadata instance of {@link HoodieRollbackMetadata} to be applied to metadata.
*/
private void writeToMetadata(HoodieRollbackMetadata rollbackMetadata) {
try {
this.txnManager.beginTransaction(Option.empty(), Option.empty());
writeTableMetadata(rollbackMetadata);
} finally {
this.txnManager.endTransaction();
if (config.isMetadataTableEnabled()) {
try {
if (!skipLocking) {
this.txnManager.beginTransaction(Option.empty(), Option.empty());
}
writeTableMetadata(rollbackMetadata);
} finally {
if (!skipLocking) {
this.txnManager.endTransaction();
}
}
}
}

View File

@@ -43,8 +43,9 @@ public class CopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload, I,
HoodieTable<T, I, K, O> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
super(context, config, table, instantTime, commitInstant, deleteInstants);
boolean deleteInstants,
boolean skipLocking) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipLocking);
}
public CopyOnWriteRollbackActionExecutor(HoodieEngineContext context,
@@ -54,8 +55,9 @@ public class CopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload, I,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
boolean useMarkerBasedStrategy,
boolean skipLocking) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy, skipLocking);
}
@Override

View File

@@ -43,8 +43,9 @@ public class MergeOnReadRollbackActionExecutor<T extends HoodieRecordPayload, I,
HoodieTable<T, I, K, O> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
super(context, config, table, instantTime, commitInstant, deleteInstants);
boolean deleteInstants,
boolean skipLocking) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipLocking);
}
public MergeOnReadRollbackActionExecutor(HoodieEngineContext context,
@@ -54,8 +55,9 @@ public class MergeOnReadRollbackActionExecutor<T extends HoodieRecordPayload, I,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy);
boolean useMarkerBasedStrategy,
boolean skipLocking) {
super(context, config, table, instantTime, commitInstant, deleteInstants, skipTimelinePublish, useMarkerBasedStrategy, skipLocking);
}
@Override

View File

@@ -309,13 +309,14 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
}
@Override
public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime) {
public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime, boolean skipLocking) {
return new CleanActionExecutor(context, config, this, cleanInstantTime).execute();
}
@Override
public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
return new CopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant,
boolean deleteInstants, boolean skipLocking) {
return new CopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute();
}
@Override

View File

@@ -119,8 +119,10 @@ public class HoodieFlinkMergeOnReadTable<T extends HoodieRecordPayload>
}
@Override
public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant,
boolean deleteInstants, boolean skipLocking) {
return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants,
skipLocking).execute();
}
}

View File

@@ -192,7 +192,7 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
@Override
public HoodieCleanMetadata clean(HoodieEngineContext context,
String cleanInstantTime) {
String cleanInstantTime, boolean skipLocking) {
return new CleanActionExecutor(context, config, this, cleanInstantTime).execute();
}
@@ -200,9 +200,10 @@ public class HoodieJavaCopyOnWriteTable<T extends HoodieRecordPayload> extends H
public HoodieRollbackMetadata rollback(HoodieEngineContext context,
String rollbackInstantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
boolean deleteInstants,
boolean skipLocking) {
return new CopyOnWriteRollbackActionExecutor(
context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute();
}
@Override

View File

@@ -424,7 +424,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
this.txnManager.beginTransaction();
try {
// Ensure no inflight commits by setting EAGER policy and explicitly cleaning all failed commits
this.rollbackFailedWrites(getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER));
this.rollbackFailedWrites(getInstantsToRollback(metaClient, HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime)), true);
new UpgradeDowngrade(
metaClient, config, context, SparkUpgradeDowngradeHelper.getInstance())
.run(HoodieTableVersion.current(), instantTime);
@@ -434,6 +434,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
} else {
upgradeDowngrade.run(HoodieTableVersion.current(), instantTime);
}
metaClient.reloadActiveTimeline();
}
metaClient.validateTableProperties(config.getProps(), operationType);
return getTableAndInitCtx(metaClient, operationType, instantTime);

View File

@@ -256,13 +256,15 @@ public class HoodieSparkCopyOnWriteTable<T extends HoodieRecordPayload>
}
@Override
public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime) {
return new CleanActionExecutor(context, config, this, cleanInstantTime).execute();
public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime, boolean skipLocking) {
return new CleanActionExecutor(context, config, this, cleanInstantTime, skipLocking).execute();
}
@Override
public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant, boolean deleteInstants) {
return new CopyOnWriteRollbackActionExecutor((HoodieSparkEngineContext) context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant,
boolean deleteInstants, boolean skipLocking) {
return new CopyOnWriteRollbackActionExecutor((HoodieSparkEngineContext) context, config, this, rollbackInstantTime, commitInstant,
deleteInstants, skipLocking).execute();
}
@Override

View File

@@ -159,8 +159,9 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends
public HoodieRollbackMetadata rollback(HoodieEngineContext context,
String rollbackInstantTime,
HoodieInstant commitInstant,
boolean deleteInstants) {
return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants).execute();
boolean deleteInstants,
boolean skipLocking) {
return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute();
}
@Override

View File

@@ -264,7 +264,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
@EnumSource(HoodieTableType.class)
public void testInsertUpsertCluster(HoodieTableType tableType) throws Exception {
init(tableType);
doWriteOperation(testTable,"0000001", INSERT);
doWriteOperation(testTable, "0000001", INSERT);
doWriteOperation(testTable, "0000002");
doClusterAndValidate(testTable, "0000003");
if (tableType == MERGE_ON_READ) {
@@ -638,6 +638,51 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
validateMetadata(writeClients[0]);
}
/**
* Tests that when inline cleaning is enabled and with auto commit set to true, there is no double locking.
* bcoz, auto clean is triggered within post commit which is already happening within a lock.
*
* @throws Exception
*/
@Test
public void testMultiWriterForDoubleLocking() throws Exception {
init(HoodieTableType.COPY_ON_WRITE);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000");
HoodieWriteConfig writeConfig = getWriteConfigBuilder(true, true, false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(true).retainCommits(4).build())
.withAutoCommit(false)
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).build())
.withProperties(properties)
.build();
SparkRDDWriteClient writeClient = new SparkRDDWriteClient(engineContext, writeConfig);
String partitionPath = dataGen.getPartitionPaths()[0];
for (int j = 0; j < 6; j++) {
String newCommitTime = "000000" + j;
List<HoodieRecord> records = dataGen.generateInsertsForPartition(newCommitTime, 100, partitionPath);
writeClient.startCommitWithTime(newCommitTime);
JavaRDD writeStatuses = writeClient.insert(jsc.parallelize(records, 1), newCommitTime);
writeClient.commit(newCommitTime, writeStatuses);
}
// Ensure all commits were synced to the Metadata Table
HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(metadataTableBasePath).build();
LOG.warn("total commits in metadata table " + metadataMetaClient.getActiveTimeline().getCommitsTimeline().countInstants());
// 6 commits and 2 cleaner commits.
assertEquals(metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().countInstants(), 8);
assertTrue(metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().countInstants() <= 1);
// Validation
validateMetadata(writeClient);
}
/**
* Lets say clustering commit succeeded in metadata table, but failed before committing to datatable.
* Next time, when clustering kicks in, hudi will rollback pending clustering and re-attempt the clustering with same instant time.
@@ -924,6 +969,92 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist");
}
/**
* When table needs to be upgraded and when multi writer is enabled, hudi rollsback partial commits. Upgrade itself is happening
* within a lock and hence rollback should not lock again.
*
* @throws IOException
* @throws InterruptedException
*/
@Test
public void testRollbackDuringUpgradeForDoubleLocking() throws IOException, InterruptedException {
init(HoodieTableType.COPY_ON_WRITE, false);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
// Perform a commit. This should bootstrap the metadata table with latest version.
List<HoodieRecord> records;
JavaRDD<WriteStatus> writeStatuses;
String commitTimestamp = HoodieActiveTimeline.createNewInstantTime();
Properties properties = new Properties();
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000");
HoodieWriteConfig writeConfig = getWriteConfigBuilder(false, true, false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).build())
.withProperties(properties)
.build();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
records = dataGen.generateInserts(commitTimestamp, 5);
client.startCommitWithTime(commitTimestamp);
writeStatuses = client.insert(jsc.parallelize(records, 1), commitTimestamp);
client.commit(commitTimestamp, writeStatuses);
}
// Metadata table should have been bootstrapped
assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist");
FileStatus oldStatus = fs.getFileStatus(new Path(metadataTableBasePath));
// trigger partial commit
metaClient.reloadActiveTimeline();
commitTimestamp = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
records = dataGen.generateInserts(commitTimestamp, 5);
client.startCommitWithTime(commitTimestamp);
writeStatuses = client.insert(jsc.parallelize(records, 1), commitTimestamp);
}
// set hoodie.table.version to 2 in hoodie.properties file
changeTableVersion(HoodieTableVersion.TWO);
writeConfig = getWriteConfigBuilder(true, true, false)
.withRollbackUsingMarkers(false)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class).build())
.withProperties(properties)
.build();
// With next commit the table should be deleted (as part of upgrade) and partial commit should be rolled back.
metaClient.reloadActiveTimeline();
commitTimestamp = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
records = dataGen.generateInserts(commitTimestamp, 5);
client.startCommitWithTime(commitTimestamp);
writeStatuses = client.insert(jsc.parallelize(records, 1), commitTimestamp);
assertNoWriteErrors(writeStatuses.collect());
}
assertFalse(fs.exists(new Path(metadataTableBasePath)), "Metadata table should not exist");
// With next commit the table should be re-bootstrapped (currently in the constructor. To be changed)
commitTimestamp = HoodieActiveTimeline.createNewInstantTime();
try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, writeConfig)) {
records = dataGen.generateInserts(commitTimestamp, 5);
client.startCommitWithTime(commitTimestamp);
writeStatuses = client.insert(jsc.parallelize(records, 1), commitTimestamp);
assertNoWriteErrors(writeStatuses.collect());
}
assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist");
initMetaClient();
assertEquals(metaClient.getTableConfig().getTableVersion().versionCode(), HoodieTableVersion.THREE.versionCode());
assertTrue(fs.exists(new Path(metadataTableBasePath)), "Metadata table should exist");
FileStatus newStatus = fs.getFileStatus(new Path(metadataTableBasePath));
assertTrue(oldStatus.getModificationTime() < newStatus.getModificationTime());
}
/**
* Test various error scenarios.
*/

View File

@@ -1308,7 +1308,7 @@ public class TestCleaner extends HoodieClientTestBase {
metaClient.reloadActiveTimeline();
HoodieInstant rollbackInstant = new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, "000");
table.scheduleRollback(context, "001", rollbackInstant, false);
table.rollback(context, "001", rollbackInstant, true);
table.rollback(context, "001", rollbackInstant, true, false);
final int numTempFilesAfter = testTable.listAllFilesInTempFolder().length;
assertEquals(0, numTempFilesAfter, "All temp files are deleted.");
}

View File

@@ -88,7 +88,8 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, false);
HoodieRollbackPlan rollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get();
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true);
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, table.getConfig(), table, "003", needRollBackInstant, true,
false);
List<HoodieRollbackStat> hoodieRollbackStats = copyOnWriteRollbackActionExecutor.executeRollback(rollbackPlan);
// assert hoodieRollbackStats
@@ -169,7 +170,8 @@ public class TestCopyOnWriteRollbackActionExecutor extends HoodieClientRollbackT
BaseRollbackPlanActionExecutor copyOnWriteRollbackPlanActionExecutor =
new BaseRollbackPlanActionExecutor(context, table.getConfig(), table, "003", commitInstant, false);
HoodieRollbackPlan hoodieRollbackPlan = (HoodieRollbackPlan) copyOnWriteRollbackPlanActionExecutor.execute().get();
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false);
CopyOnWriteRollbackActionExecutor copyOnWriteRollbackActionExecutor = new CopyOnWriteRollbackActionExecutor(context, cfg, table, "003", commitInstant, false,
false);
Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = copyOnWriteRollbackActionExecutor.execute().getPartitionMetadata();
//3. assert the rollback stat

View File

@@ -99,7 +99,8 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
table,
"003",
rollBackInstant,
true);
true,
false);
//3. assert the rollback stat
Map<String, HoodieRollbackPartitionMetadata> rollbackMetadata = mergeOnReadRollbackActionExecutor.execute().getPartitionMetadata();
assertEquals(2, rollbackMetadata.size());
@@ -148,7 +149,8 @@ public class TestMergeOnReadRollbackActionExecutor extends HoodieClientRollbackT
rollBackInstant,
true,
true,
true).execute();
true,
false).execute();
});
}