diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java index 7307dec26..89f4cfc77 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerWithMultiWriter.java @@ -85,7 +85,7 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona @ParameterizedTest @EnumSource(HoodieTableType.class) - void testUpsertsContinuousModeWithMultipleWriters(HoodieTableType tableType) throws Exception { + void testUpsertsContinuousModeWithMultipleWritersForConflicts(HoodieTableType tableType) throws Exception { // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts setUpTestTable(tableType); prepareInitialConfigs(fs(), basePath, "foo"); @@ -121,6 +121,20 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona // run ingestion & backfill in parallel, create conflict and fail one runJobsInParallel(tableBasePath, tableType, totalRecords, ingestionJob2, cfgIngestionJob, backfillJob, cfgBackfillJob, true, "batch1"); + } + + @ParameterizedTest + @EnumSource(HoodieTableType.class) + void testUpsertsContinuousModeWithMultipleWritersWithoutConflicts(HoodieTableType tableType) throws Exception { + // NOTE : Overriding the LockProvider to FileSystemBasedLockProviderTestClass since Zookeeper locks work in unit test but fail on Jenkins with connection timeouts + setUpTestTable(tableType); + prepareInitialConfigs(fs(), basePath, "foo"); + TypedProperties props = prepareMultiWriterProps(fs(), basePath, propsFilePath); + props.setProperty("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.FileSystemBasedLockProviderTestClass"); + props.setProperty("hoodie.write.lock.filesystem.path", tableBasePath); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY, "3"); + props.setProperty(LockConfiguration.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY, "5000"); + UtilitiesTestBase.Helpers.savePropsToDFS(props, fs(), propsFilePath); // create new ingestion & backfill job config to generate only INSERTS to avoid conflict props = prepareMultiWriterProps(fs(), basePath, propsFilePath); @@ -131,9 +145,9 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends SparkClientFunctiona HoodieDeltaStreamer.Config cfgBackfillJob2 = getDeltaStreamerConfig(tableBasePath, tableType.name(), WriteOperationType.INSERT, propsFilePath, Collections.singletonList(TestHoodieDeltaStreamer.TestIdentityTransformer.class.getName())); cfgBackfillJob2.continuousMode = false; - meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build(); - timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); - commitMetadata = HoodieCommitMetadata + HoodieTableMetaClient meta = HoodieTableMetaClient.builder().setConf(hadoopConf()).setBasePath(tableBasePath).build(); + HoodieTimeline timeline = meta.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata .fromBytes(timeline.getInstantDetails(timeline.firstInstant().get()).get(), HoodieCommitMetadata.class); cfgBackfillJob2.checkpoint = commitMetadata.getMetadata(CHECKPOINT_KEY); cfgBackfillJob2.configs.add(String.format("%s=%d", SourceConfigs.MAX_UNIQUE_RECORDS_PROP, totalRecords));