1
0

[HUDI-3064][HUDI-3054] FileSystemBasedLockProviderTestClass tryLock fix and TestHoodieClientMultiWriter test fixes (#4384)

- Made FileSystemBasedLockProviderTestClass thread safe and fixed the
   tryLock retry logic.

 - Made TestHoodieClientMultiWriter. testHoodieClientBasicMultiWriter
   deterministic in verifying the HoodieWriteConflictException.
This commit is contained in:
Manoj Govindassamy
2021-12-19 10:31:02 -08:00
committed by GitHub
parent 03f71ef1a2
commit 4a48f99a59
2 changed files with 121 additions and 62 deletions

View File

@@ -36,38 +36,37 @@ import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_NUM_R
import static org.apache.hudi.common.config.LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY;
/**
* This lock provider is used for testing purposes only. It provides a simple file system based lock using HDFS atomic
* create operation. This lock does not support cleaning/expiring the lock after a failed write hence cannot be used
* in production environments.
* This lock provider is used for testing purposes only. It provides a simple file system based lock
* using filesystem's atomic create operation. This lock does not support cleaning/expiring the lock
* after a failed write. Must not be used in production environments.
*/
public class FileSystemBasedLockProviderTestClass implements LockProvider<String>, Serializable {
private static final String LOCK_NAME = "acquired";
private static final String LOCK = "lock";
private String lockPath;
private final int retryMaxCount;
private final int retryWaitTimeMs;
private transient FileSystem fs;
private transient Path lockFile;
protected LockConfiguration lockConfiguration;
public FileSystemBasedLockProviderTestClass(final LockConfiguration lockConfiguration, final Configuration configuration) {
this.lockConfiguration = lockConfiguration;
this.lockPath = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY);
this.fs = FSUtils.getFs(this.lockPath, configuration);
}
public void acquireLock() {
try {
fs.create(new Path(lockPath + "/" + LOCK_NAME), false).close();
} catch (IOException e) {
throw new HoodieIOException("Failed to acquire lock", e);
}
final String lockDirectory = lockConfiguration.getConfig().getString(FILESYSTEM_LOCK_PATH_PROP_KEY);
this.retryWaitTimeMs = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY);
this.retryMaxCount = lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY);
this.lockFile = new Path(lockDirectory + "/" + LOCK);
this.fs = FSUtils.getFs(this.lockFile.toString(), configuration);
}
@Override
public void close() {
try {
fs.delete(new Path(lockPath + "/" + LOCK_NAME), true);
} catch (IOException e) {
throw new HoodieLockException("Unable to release lock", e);
synchronized (LOCK) {
try {
fs.delete(this.lockFile, true);
} catch (IOException e) {
throw new HoodieLockException("Unable to release lock: " + getLock(), e);
}
}
}
@@ -75,39 +74,45 @@ public class FileSystemBasedLockProviderTestClass implements LockProvider<String
public boolean tryLock(long time, TimeUnit unit) {
try {
int numRetries = 0;
while (fs.exists(new Path(lockPath + "/" + LOCK_NAME))
&& (numRetries++ <= lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY))) {
Thread.sleep(lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY));
}
synchronized (LOCK_NAME) {
if (fs.exists(new Path(lockPath + "/" + LOCK_NAME))) {
return false;
synchronized (LOCK) {
while (fs.exists(this.lockFile)) {
LOCK.wait(retryWaitTimeMs);
numRetries++;
if (numRetries > retryMaxCount) {
return false;
}
}
acquireLock();
return fs.exists(this.lockFile);
}
return true;
} catch (IOException | InterruptedException e) {
throw new HoodieLockException("Failed to acquire lock", e);
throw new HoodieLockException("Failed to acquire lock: " + getLock(), e);
}
}
@Override
public void unlock() {
try {
if (fs.exists(new Path(lockPath + "/" + LOCK_NAME))) {
fs.delete(new Path(lockPath + "/" + LOCK_NAME), true);
synchronized (LOCK) {
try {
if (fs.exists(this.lockFile)) {
fs.delete(this.lockFile, true);
}
} catch (IOException io) {
throw new HoodieIOException("Unable to delete lock " + getLock() + "on disk", io);
}
} catch (IOException io) {
throw new HoodieIOException("Unable to delete lock on disk", io);
}
}
@Override
public String getLock() {
return this.lockFile.toString();
}
private void acquireLock() {
try {
return fs.listStatus(new Path(lockPath))[0].getPath().toString();
} catch (Exception e) {
throw new HoodieLockException("Failed to retrieve lock status from lock path " + lockPath);
fs.create(this.lockFile, false).close();
} catch (IOException e) {
throw new HoodieIOException("Failed to acquire lock: " + getLock(), e);
}
}
}

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
@@ -40,7 +41,6 @@ import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.testutils.HoodieClientTestBase;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -53,6 +53,7 @@ import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -61,6 +62,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -96,7 +98,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "250");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"250");
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,"10");
HoodieWriteConfig cfg = getConfigBuilder()
HoodieWriteConfig writeConfig = getConfigBuilder()
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
@@ -104,41 +106,64 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
.withMarkersType(MarkerType.DIRECT.name())
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(FileSystemBasedLockProviderTestClass.class)
.build()).withAutoCommit(false).withProperties(properties).build();
// Create the first commit
createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200);
ExecutorService executors = Executors.newFixedThreadPool(2);
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
AtomicBoolean writer1Conflict = new AtomicBoolean(false);
AtomicBoolean writer2Conflict = new AtomicBoolean(false);
createCommitWithInserts(writeConfig, getHoodieWriteClient(writeConfig), "000", "001", 200);
final int threadCount = 2;
final ExecutorService executors = Executors.newFixedThreadPool(2);
final SparkRDDWriteClient client1 = getHoodieWriteClient(writeConfig);
final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig);
final CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
final AtomicBoolean writer1Completed = new AtomicBoolean(false);
final AtomicBoolean writer2Completed = new AtomicBoolean(false);
Future future1 = executors.submit(() -> {
String newCommitTime = "004";
int numRecords = 100;
String commitTimeBetweenPrevAndNew = "002";
try {
createCommitWithUpserts(cfg, client1, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
} catch (Exception e1) {
assertTrue(e1 instanceof HoodieWriteConflictException);
writer1Conflict.set(true);
final String nextCommitTime = "002";
final JavaRDD<WriteStatus> writeStatusList = startCommitForUpdate(writeConfig, client1, nextCommitTime, 100);
// Wait for the 2nd writer to start the commit
cyclicBarrier.await(60, TimeUnit.SECONDS);
// Commit the update before the 2nd writer
assertDoesNotThrow(() -> {
client1.commit(nextCommitTime, writeStatusList);
});
// Signal the 2nd writer to go ahead for his commit
cyclicBarrier.await(60, TimeUnit.SECONDS);
writer1Completed.set(true);
} catch (Exception e) {
writer1Completed.set(false);
}
});
Future future2 = executors.submit(() -> {
String newCommitTime = "005";
int numRecords = 100;
String commitTimeBetweenPrevAndNew = "002";
try {
createCommitWithUpserts(cfg, client2, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
} catch (Exception e2) {
assertTrue(e2 instanceof HoodieWriteConflictException);
writer2Conflict.set(true);
final String nextCommitTime = "003";
// Wait for the 1st writer to make progress with the commit
cyclicBarrier.await(60, TimeUnit.SECONDS);
final JavaRDD<WriteStatus> writeStatusList = startCommitForUpdate(writeConfig, client2, nextCommitTime, 100);
// Wait for the 1st writer to complete the commit
cyclicBarrier.await(60, TimeUnit.SECONDS);
assertThrows(HoodieWriteConflictException.class, () -> {
client2.commit(nextCommitTime, writeStatusList);
});
writer2Completed.set(true);
} catch (Exception e) {
writer2Completed.set(false);
}
});
future1.get();
future2.get();
Assertions.assertTrue(writer1Conflict.get() || writer2Conflict.get(), "Either of writer1 or writer2 should have failed "
+ "with conflict");
Assertions.assertFalse(writer1Conflict.get() && writer2Conflict.get(), "Both writer1 and writer2 should not result "
+ "in conflict");
// both should have been completed successfully. I mean, we already assert for conflict for writer2 at L155.
assertTrue(writer1Completed.get() && writer2Completed.get());
}
@Test
@@ -443,4 +468,33 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
numRecords, 200, 2);
client.commit(newCommitTime, result);
}
/**
* Start the commit for an update operation with given number of records
*
* @param writeConfig - Write config
* @param writeClient - Write client for starting the commit
* @param newCommitTime - Commit time for the update
* @param numRecords - Number of records to update
* @return RDD of write status from the update
* @throws Exception
*/
private JavaRDD<WriteStatus> startCommitForUpdate(HoodieWriteConfig writeConfig, SparkRDDWriteClient writeClient,
String newCommitTime, int numRecords) throws Exception {
// Start the new commit
writeClient.startCommitWithTime(newCommitTime);
// Prepare update records
final Function2<List<HoodieRecord>, String, Integer> recordGenFunction =
generateWrapRecordsFn(false, writeConfig, dataGen::generateUniqueUpdates);
final List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, numRecords);
final JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);
// Write updates
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, JavaRDD<HoodieRecord>, String> writeFn = SparkRDDWriteClient::upsert;
JavaRDD<WriteStatus> result = writeFn.apply(writeClient, writeRecords, newCommitTime);
List<WriteStatus> statuses = result.collect();
assertNoWriteErrors(statuses);
return result;
}
}