[HUDI-3043] De-coupling multi writer tests (#4362)
This commit is contained in:
committed by
GitHub
parent
7784249e55
commit
47852446e8
@@ -85,7 +85,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
|
|||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@EnumSource(HoodieTableType.class)
|
@EnumSource(HoodieTableType.class)
|
||||||
void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) throws Exception {
|
void testUpsertsContinuousModeWithMultipleWritersForConflicts(HoodieTableType tableType) throws Exception {
|
||||||
// NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
|
// NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
|
||||||
setUpTestTable(tableType);
|
setUpTestTable(tableType);
|
||||||
prepareInitialConfigs(fs(), basePath, "foo");
|
prepareInitialConfigs(fs(), basePath, "foo");
|
||||||
@@ -121,6 +121,20 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
|
|||||||
// run ingestion & backfill in parallel, create conflict and fail one
|
// run ingestion & backfill in parallel, create conflict and fail one
|
||||||
runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob2,
|
runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob2,
|
||||||
cfgIngestionJob, backfillJob, cfgBackfillJob, true, "batch1");
|
cfgIngestionJob, backfillJob, cfgBackfillJob, true, "batch1");
|
||||||
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@EnumSource(HoodieTableType.class)
|
||||||
|
void testUpsertsContinuousModeWithMultipleWritersWithoutConflicts(HoodieTableType tableType) throws Exception {
|
||||||
|
// NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts
|
||||||
|
setUpTestTable(tableType);
|
||||||
|
prepareInitialConfigs(fs(), basePath, "foo");
|
||||||
|
TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
|
||||||
|
props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass");
|
||||||
|
props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath);
|
||||||
|
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3");
|
||||||
|
props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000");
|
||||||
|
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath);
|
||||||
|
|
||||||
// create new ingestion & backfill job config to generate only INSERTS to avoid conflict
|
// create new ingestion & backfill job config to generate only INSERTS to avoid conflict
|
||||||
props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
|
props = prepareMultiWriterProps(fs(), basePath, propsFilePath);
|
||||||
@@ -131,9 +145,9 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona
|
|||||||
HoodieDeltaStreamer.Config cfgBackfillJob2 = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.INSERT,
|
HoodieDeltaStreamer.Config cfgBackfillJob2 = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.INSERT,
|
||||||
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName()));
|
propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName()));
|
||||||
cfgBackfillJob2.continuousMode = false;
|
cfgBackfillJob2.continuousMode = false;
|
||||||
meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
|
HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build();
|
||||||
timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||||
commitMetadata = HoodieCommitMetadata
|
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
||||||
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
|
.fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class);
|
||||||
cfgBackfillJob2.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
|
cfgBackfillJob2.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY);
|
||||||
cfgBackfillJob2.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
|
cfgBackfillJob2.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));
|
||||||
|
|||||||
Reference in New Issue
Block a user