diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 56db18576..271f8a378 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -464,7 +464,7 @@ public abstract class BaseHoodieWriteClient table, HoodieCommitMetadata metadata, Option> extraMetadata) { - if (config.areAnyTableServicesInline()) { + if (config.areAnyTableServicesExecutedInline() || config.areAnyTableServicesScheduledInline()) { if (config.isMetadataTableEnabled()) { table.getHoodieView().sync(); } @@ -472,19 +472,35 @@ public abstract class BaseHoodieWriteClient inlineCompact(Option> extraMetadata) { - Option compactionInstantTimeOpt = scheduleCompaction(extraMetadata); + protected Option inlineCompaction(Option> extraMetadata) { + Option compactionInstantTimeOpt = inlineScheduleCompaction(extraMetadata); compactionInstantTimeOpt.ifPresent(compactInstantTime -> { // inline compaction should auto commit as the user is never given control compact(compactInstantTime, true); @@ -1015,6 +1032,15 @@ public abstract class BaseHoodieWriteClient inlineScheduleCompaction(Option> extraMetadata) { + return scheduleCompaction(extraMetadata); + } + /** * Schedules a new clustering instant. * @param extraMetadata Extra Metadata to be stored @@ -1116,9 +1142,10 @@ public abstract class BaseHoodieWriteClient inlineCluster(Option> extraMetadata) { - Option clusteringInstantOpt = scheduleClustering(extraMetadata); + protected Option inlineClustering(Option> extraMetadata) { + Option clusteringInstantOpt = inlineScheduleClustering(extraMetadata); clusteringInstantOpt.ifPresent(clusteringInstant -> { // inline cluster should auto commit as the user is never given control cluster(clusteringInstant, true); @@ -1126,6 +1153,15 @@ public abstract class BaseHoodieWriteClient inlineScheduleClustering(Option> extraMetadata) { + return scheduleClustering(extraMetadata); + } + protected void rollbackInflightClustering(HoodieInstant inflightInstant, HoodieTable table) { String commitTime = HoodieActiveTimeline.createNewInstantTime(); table.scheduleRollback(context, commitTime, inflightInstant, false, config.shouldRollbackUsingMarkers()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java index e4a3c2868..b576c5b6c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -25,6 +25,7 @@ import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.util.TypeUtils; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode; @@ -177,6 +178,16 @@ public class HoodieClusteringConfig extends HoodieConfig { .withDocumentation("Determines how to handle updates, deletes to file groups that are under clustering." + " Default strategy just rejects the update"); + public static final ConfigProperty SCHEDULE_INLINE_CLUSTERING = ConfigProperty + .key("hoodie.clustering.schedule.inline") + .defaultValue("false") + .withDocumentation("When set to true, clustering service will be attempted for inline scheduling after each write. Users have to ensure " + + "they have a separate job to run async clustering(execution) for the one scheduled by this writer. Users can choose to set both " + + "`hoodie.clustering.inline` and `hoodie.clustering.schedule.inline` to false and have both scheduling and execution triggered by any async process, on which " + + "case `hoodie.clustering.async.enabled` is expected to be set to true. But if `hoodie.clustering.inline` is set to false, and `hoodie.clustering.schedule.inline` " + + "is set to true, regular writers will schedule clustering inline, but users are expected to trigger async job for execution. If `hoodie.clustering.inline` is set " + + "to true, regular writers will do both scheduling and execution inline for clustering"); + public static final ConfigProperty ASYNC_CLUSTERING_ENABLE = ConfigProperty .key("hoodie.clustering.async.enabled") .defaultValue("false") @@ -505,6 +516,11 @@ public class HoodieClusteringConfig extends HoodieConfig { return this; } + public Builder withScheduleInlineClustering(Boolean scheduleInlineClustering) { + clusteringConfig.setValue(SCHEDULE_INLINE_CLUSTERING, String.valueOf(scheduleInlineClustering)); + return this; + } + public Builder withInlineClusteringNumCommits(int numCommits) { clusteringConfig.setValue(INLINE_CLUSTERING_MAX_COMMITS, String.valueOf(numCommits)); return this; @@ -562,6 +578,12 @@ public class HoodieClusteringConfig extends HoodieConfig { clusteringConfig.setDefaultValue( EXECUTION_STRATEGY_CLASS_NAME, getDefaultExecutionStrategyClassName(engineType)); clusteringConfig.setDefaults(HoodieClusteringConfig.class.getName()); + + boolean inlineCluster = clusteringConfig.getBoolean(HoodieClusteringConfig.INLINE_CLUSTERING); + boolean inlineClusterSchedule = clusteringConfig.getBoolean(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING); + ValidationUtils.checkArgument(!(inlineCluster && inlineClusterSchedule), String.format("Either of inline clustering (%s) or " + + "schedule inline clustering (%s) can be enabled. Both can't be set to true at the same time. %s,%s", HoodieClusteringConfig.INLINE_CLUSTERING.key(), + HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING.key(), inlineCluster, inlineClusterSchedule)); return clusteringConfig; } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index 4bc470ba5..ee900fb36 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -90,6 +90,16 @@ public class HoodieCompactionConfig extends HoodieConfig { .withDocumentation("When set to true, compaction service is triggered after each write. While being " + " simpler operationally, this adds extra latency on the write path."); + public static final ConfigProperty SCHEDULE_INLINE_COMPACT = ConfigProperty + .key("hoodie.compact.schedule.inline") + .defaultValue("false") + .withDocumentation("When set to true, compaction service will be attempted for inline scheduling after each write. Users have to ensure " + + "they have a separate job to run async compaction(execution) for the one scheduled by this writer. Users can choose to set both " + + "`hoodie.compact.inline` and `hoodie.compact.schedule.inline` to false and have both scheduling and execution triggered by any async process. " + + "But if `hoodie.compact.inline` is set to false, and `hoodie.compact.schedule.inline` is set to true, regular writers will schedule compaction inline, " + + "but users are expected to trigger async job for execution. If `hoodie.compact.inline` is set to true, regular writers will do both scheduling and " + + "execution inline for compaction"); + public static final ConfigProperty INLINE_COMPACT_NUM_DELTA_COMMITS = ConfigProperty .key("hoodie.compact.inline.max.delta.commits") .defaultValue("5") @@ -537,6 +547,11 @@ public class HoodieCompactionConfig extends HoodieConfig { return this; } + public Builder withScheduleInlineCompaction(Boolean scheduleAsyncCompaction) { + compactionConfig.setValue(SCHEDULE_INLINE_COMPACT, String.valueOf(scheduleAsyncCompaction)); + return this; + } + public Builder withInlineCompactionTriggerStrategy(CompactionTriggerStrategy compactionTriggerStrategy) { compactionConfig.setValue(INLINE_COMPACT_TRIGGER_STRATEGY, compactionTriggerStrategy.name()); return this; @@ -700,6 +715,12 @@ public class HoodieCompactionConfig extends HoodieConfig { + "missing data from few instants.", HoodieCompactionConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep, HoodieCompactionConfig.CLEANER_COMMITS_RETAINED.key(), cleanerCommitsRetained)); + + boolean inlineCompact = compactionConfig.getBoolean(HoodieCompactionConfig.INLINE_COMPACT); + boolean inlineCompactSchedule = compactionConfig.getBoolean(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT); + ValidationUtils.checkArgument(!(inlineCompact && inlineCompactSchedule), String.format("Either of inline compaction (%s) or " + + "schedule inline compaction (%s) can be enabled. Both can't be set to true at the same time. %s, %s", HoodieCompactionConfig.INLINE_COMPACT.key(), + HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), inlineCompact, inlineCompactSchedule)); return compactionConfig; } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 8a1d04677..1cdad6c56 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1140,6 +1140,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getBoolean(HoodieCompactionConfig.INLINE_COMPACT); } + public boolean scheduleInlineCompaction() { + return getBoolean(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT); + } + public CompactionTriggerStrategy getInlineCompactTriggerStrategy() { return CompactionTriggerStrategy.valueOf(getString(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY)); } @@ -1180,6 +1184,10 @@ public class HoodieWriteConfig extends HoodieConfig { return getBoolean(HoodieClusteringConfig.INLINE_CLUSTERING); } + public boolean scheduleInlineClustering() { + return getBoolean(HoodieClusteringConfig.SCHEDULE_INLINE_CLUSTERING); + } + public boolean isAsyncClusteringEnabled() { return getBoolean(HoodieClusteringConfig.ASYNC_CLUSTERING_ENABLE); } @@ -1859,11 +1867,11 @@ public class HoodieWriteConfig extends HoodieConfig { } /** - * Are any table services configured to run inline? + * Are any table services configured to run inline for both scheduling and execution? * * @return True if any table services are configured to run inline, false otherwise. */ - public Boolean areAnyTableServicesInline() { + public Boolean areAnyTableServicesExecutedInline() { return inlineClusteringEnabled() || inlineCompactionEnabled() || isAutoClean(); } @@ -1876,6 +1884,10 @@ public class HoodieWriteConfig extends HoodieConfig { return isAsyncClusteringEnabled() || !inlineCompactionEnabled() || isAsyncClean(); } + public Boolean areAnyTableServicesScheduledInline() { + return scheduleInlineCompaction() || scheduleInlineClustering(); + } + public String getPreCommitValidators() { return getString(HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java index 0713b99b1..2c3ae98c6 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java @@ -167,7 +167,7 @@ public class TestHoodieWriteConfig { } }); assertFalse(writeConfig.areAnyTableServicesAsync()); - assertTrue(writeConfig.areAnyTableServicesInline()); + assertTrue(writeConfig.areAnyTableServicesExecutedInline()); assertEquals(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.defaultValue(), writeConfig.getLockProviderClass()); // 5. User override for the lock provider should always take the precedence diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java index 514007494..7334f4cb6 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java @@ -1376,6 +1376,40 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { testInsertAndClustering(clusteringConfig, populateMetaFields, true, false, SqlQueryEqualityPreCommitValidator.class.getName(), COUNT_SQL_QUERY_FOR_VALIDATION, ""); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testInlineScheduleClustering(boolean scheduleInlineClustering) throws IOException { + testInsertTwoBatches(true); + + // setup clustering config. + HoodieClusteringConfig clusteringConfig = HoodieClusteringConfig.newBuilder().withClusteringMaxNumGroups(10) + .withClusteringTargetPartitions(0).withInlineClusteringNumCommits(1).withInlineClustering(false).withScheduleInlineClustering(scheduleInlineClustering) + .withPreserveHoodieCommitMetadata(true).build(); + + HoodieWriteConfig config = getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withAutoCommit(false) + .withClusteringConfig(clusteringConfig) + .withProps(getPropertiesForKeyGen()).build(); + SparkRDDWriteClient client = getHoodieWriteClient(config); + dataGen = new HoodieTestDataGenerator(new String[] {"2015/03/16"}); + String commitTime1 = HoodieActiveTimeline.createNewInstantTime(); + List records1 = dataGen.generateInserts(commitTime1, 200); + client.startCommitWithTime(commitTime1); + JavaRDD insertRecordsRDD1 = jsc.parallelize(records1, 2); + JavaRDD statuses = client.upsert(insertRecordsRDD1, commitTime1); + List statusList = statuses.collect(); + assertNoWriteErrors(statusList); + client.commit(commitTime1, statuses); + + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build(); + List> pendingClusteringPlans = + ClusteringUtils.getAllPendingClusteringPlans(metaClient).collect(Collectors.toList()); + if (scheduleInlineClustering) { + assertEquals(1, pendingClusteringPlans.size()); + } else { + assertEquals(0, pendingClusteringPlans.size()); + } + } + @ParameterizedTest @MethodSource("populateMetaFieldsAndPreserveMetadataParams") public void testClusteringWithSortColumns(boolean populateMetaFields, boolean preserveCommitMetadata) throws Exception { @@ -1529,7 +1563,6 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { Pair, List>, Set> allRecords = testInsertTwoBatches(populateMetaFields); testClustering(clusteringConfig, populateMetaFields, completeClustering, assertSameFileIds, validatorClasses, sqlQueryForEqualityValidation, sqlQueryForSingleResultValidation, allRecords); return allRecords.getLeft().getLeft(); - } /** diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java index a0ec0de37..5438fbcfc 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableClustering.java @@ -132,7 +132,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes newCommitTime = "003"; client.startCommitWithTime(newCommitTime); records = dataGen.generateUpdates(newCommitTime, 100); - updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime); + updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime, false); } HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); @@ -201,7 +201,7 @@ class TestHoodieSparkMergeOnReadTableClustering extends SparkClientFunctionalTes newCommitTime = "003"; client.startCommitWithTime(newCommitTime); records = dataGen.generateUpdates(newCommitTime, 100); - updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime); + updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime, false); } HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), metaClient); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java index c80374b64..272208558 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableIncrementalRead.java @@ -118,7 +118,7 @@ public class TestHoodieSparkMergeOnReadTableIncrementalRead extends SparkClientF String updateTime = "004"; client.startCommitWithTime(updateTime); List records004 = dataGen.generateUpdates(updateTime, 100); - updateRecordsInMORTable(metaClient, records004, client, cfg, updateTime); + updateRecordsInMORTable(metaClient, records004, client, cfg, updateTime, false); // verify RO incremental reads - only one base file shows up because updates to into log files incrementalROFiles = getROIncrementalFiles(partitionPath, false); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java index dac32b7bb..63f6e4654 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableInsertUpdateDelete.java @@ -36,6 +36,7 @@ import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.table.HoodieSparkTable; @@ -105,7 +106,7 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie newCommitTime = "004"; client.startCommitWithTime(newCommitTime); records = dataGen.generateUpdates(newCommitTime, 100); - updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime); + updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime, false); String compactionCommitTime = client.scheduleCompaction(Option.empty()).get().toString(); client.compact(compactionCommitTime); @@ -134,6 +135,48 @@ public class TestHoodieSparkMergeOnReadTableInsertUpdateDelete extends SparkClie } } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testInlineScheduleCompaction(boolean scheduleInlineCompaction) throws Exception { + HoodieFileFormat fileFormat = HoodieFileFormat.PARQUET; + Properties properties = new Properties(); + properties.setProperty(HoodieTableConfig.BASE_FILE_FORMAT.key(), fileFormat.toString()); + HoodieTableMetaClient metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, properties); + + HoodieWriteConfig cfg = getConfigBuilder(false) + .withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024) + .withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(2).withPreserveCommitMetadata(true).withScheduleInlineCompaction(scheduleInlineCompaction).build()) + .build(); + try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) { + + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + /* + * Write 1 (only inserts) + */ + String newCommitTime = "001"; + client.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, 200); + Stream dataFiles = insertRecordsToMORTable(metaClient, records, client, cfg, newCommitTime, true); + assertTrue(dataFiles.findAny().isPresent(), "should list the base files we wrote in the delta commit"); + + /* + * Write 2 (updates) + */ + newCommitTime = "004"; + client.startCommitWithTime(newCommitTime); + records = dataGen.generateUpdates(newCommitTime, 100); + updateRecordsInMORTable(metaClient, records, client, cfg, newCommitTime, true); + + // verify that there is a commit + if (scheduleInlineCompaction) { + assertEquals(metaClient.reloadActiveTimeline().getAllCommitsTimeline().filterPendingCompactionTimeline().countInstants(), 1); + } else { + assertEquals(metaClient.reloadActiveTimeline().getAllCommitsTimeline().filterPendingCompactionTimeline().countInstants(), 0); + } + } + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testSimpleInsertUpdateAndDelete(boolean populateMetaFields) throws Exception { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java index fb19a6325..94e080cae 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java @@ -214,12 +214,22 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe } protected Stream insertRecordsToMORTable(HoodieTableMetaClient metaClient, List records, - SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException { + SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException { + return insertRecordsToMORTable(metaClient, records, client, cfg, commitTime, false); + } + + protected Stream insertRecordsToMORTable(HoodieTableMetaClient metaClient, List records, + SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime, + boolean doExplicitCommit) throws IOException { HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient); JavaRDD writeRecords = jsc().parallelize(records, 1); - List statuses = client.insert(writeRecords, commitTime).collect(); + JavaRDD statusesRdd = client.insert(writeRecords, commitTime); + List statuses = statusesRdd.collect(); assertNoWriteErrors(statuses); + if (doExplicitCommit) { + client.commit(commitTime, statusesRdd); + } assertFileSizesEqual(statuses, status -> FSUtils.getFileSize(reloadedMetaClient.getFs(), new Path(reloadedMetaClient.getBasePath(), status.getStat().getPath()))); HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(), reloadedMetaClient); @@ -243,6 +253,11 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe } protected void updateRecordsInMORTable(HoodieTableMetaClient metaClient, List records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime) throws IOException { + updateRecordsInMORTable(metaClient, records, client, cfg, commitTime, true); + } + + protected void updateRecordsInMORTable(HoodieTableMetaClient metaClient, List records, SparkRDDWriteClient client, HoodieWriteConfig cfg, String commitTime, + boolean doExplicitCommit) throws IOException { HoodieTableMetaClient reloadedMetaClient = HoodieTableMetaClient.reload(metaClient); Map recordsMap = new HashMap<>(); @@ -252,9 +267,13 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe } } - List statuses = client.upsert(jsc().parallelize(records, 1), commitTime).collect(); + JavaRDD statusesRdd = client.upsert(jsc().parallelize(records, 1), commitTime); + List statuses = statusesRdd.collect(); // Verify there are no errors assertNoWriteErrors(statuses); + if (doExplicitCommit) { + client.commit(commitTime, statusesRdd); + } assertFileSizesEqual(statuses, status -> FSUtils.getFileSize(reloadedMetaClient.getFs(), new Path(reloadedMetaClient.getBasePath(), status.getStat().getPath()))); Option deltaCommit = reloadedMetaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();