[HUDI-3054] Fixing default lock configs for FileSystemBasedLock and fixing a flaky test (#4374)
This commit is contained in:
committed by
GitHub
parent
dc40397fa9
commit
77abb5ccb9
@@ -40,6 +40,7 @@ import org.apache.hudi.exception.HoodieWriteConflictException;
|
||||
import org.apache.hudi.testutils.HoodieClientTestBase;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.EnumSource;
|
||||
@@ -56,6 +57,7 @@ import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hudi.common.config.LockConfiguration.FILESYSTEM_LOCK_PATH_PROP_KEY;
|
||||
@@ -90,6 +92,10 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
||||
}
|
||||
Properties properties = new Properties();
|
||||
properties.setProperty(FILESYSTEM_LOCK_PATH_PROP_KEY, basePath + "/.hoodie/.locks");
|
||||
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "10");
|
||||
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "250");
|
||||
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"250");
|
||||
properties.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,"10");
|
||||
HoodieWriteConfig cfg = getConfigBuilder()
|
||||
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
|
||||
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).withAutoClean(false).build())
|
||||
@@ -103,6 +109,8 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
||||
ExecutorService executors = Executors.newFixedThreadPool(2);
|
||||
SparkRDDWriteClient client1 = getHoodieWriteClient(cfg);
|
||||
SparkRDDWriteClient client2 = getHoodieWriteClient(cfg);
|
||||
AtomicBoolean writer1Conflict = new AtomicBoolean(false);
|
||||
AtomicBoolean writer2Conflict = new AtomicBoolean(false);
|
||||
Future future1 = executors.submit(() -> {
|
||||
String newCommitTime = "004";
|
||||
int numRecords = 100;
|
||||
@@ -111,7 +119,7 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
||||
createCommitWithUpserts(cfg, client1, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
|
||||
} catch (Exception e1) {
|
||||
assertTrue(e1 instanceof HoodieWriteConflictException);
|
||||
throw new RuntimeException(e1);
|
||||
writer1Conflict.set(true);
|
||||
}
|
||||
});
|
||||
Future future2 = executors.submit(() -> {
|
||||
@@ -122,11 +130,15 @@ public class TestHoodieClientMultiWriter extends HoodieClientTestBase {
|
||||
createCommitWithUpserts(cfg, client2, "002", commitTimeBetweenPrevAndNew, newCommitTime, numRecords);
|
||||
} catch (Exception e2) {
|
||||
assertTrue(e2 instanceof HoodieWriteConflictException);
|
||||
throw new RuntimeException(e2);
|
||||
writer2Conflict.set(true);
|
||||
}
|
||||
});
|
||||
future1.get();
|
||||
future2.get();
|
||||
Assertions.assertTrue(writer1Conflict.get() || writer2Conflict.get(), "Either of writer1 or writer2 should have failed "
|
||||
+ "with conflict");
|
||||
Assertions.assertFalse(writer1Conflict.get() && writer2Conflict.get(), "Both writer1 and writer2 should not result "
|
||||
+ "in conflict");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -30,14 +30,14 @@ public class LockConfiguration implements Serializable {
|
||||
public static final String LOCK_PREFIX = "hoodie.write.lock.";
|
||||
|
||||
public static final String LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY = LOCK_PREFIX + "wait_time_ms_between_retry";
|
||||
public static final String DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS = String.valueOf(5000L);
|
||||
public static final String DEFAULT_LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS = String.valueOf(1000L);
|
||||
|
||||
public static final String LOCK_ACQUIRE_RETRY_MAX_WAIT_TIME_IN_MILLIS_PROP_KEY = LOCK_PREFIX + "max_wait_time_ms_between_retry";
|
||||
|
||||
public static final String LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY = LOCK_PREFIX + "client.wait_time_ms_between_retry";
|
||||
|
||||
public static final String LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY = LOCK_PREFIX + "num_retries";
|
||||
public static final String DEFAULT_LOCK_ACQUIRE_NUM_RETRIES = String.valueOf(3);
|
||||
public static final String DEFAULT_LOCK_ACQUIRE_NUM_RETRIES = String.valueOf(15);
|
||||
|
||||
public static final String LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY = LOCK_PREFIX + "client.num_retries";
|
||||
|
||||
|
||||
@@ -92,8 +92,10 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
|
||||
TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
|
||||
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
|
||||
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
|
||||
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
|
||||
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000");
|
||||
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "10");
|
||||
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "250");
|
||||
props.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"250");
|
||||
props.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,"10");
|
||||
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
|
||||
|
||||
HoodieDeltaStreamer.Config cfgIngestionJob = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.UPSERT,
|
||||
@@ -132,8 +134,10 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
|
||||
TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
|
||||
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
|
||||
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
|
||||
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
|
||||
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000");
|
||||
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "10");
|
||||
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "250");
|
||||
props.setProperty(LockConfiguration.LOCK_ACQUIRE_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,"250");
|
||||
props.setProperty(LockConfiguration.LOCK_ACQUIRE_NUM_RETRIES_PROP_KEY,"10");
|
||||
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
|
||||
|
||||
// create new ingestion & backfill job config to generate only INSERTS to avoid conflict
|
||||
|
||||
Reference in New Issue
Block a user