diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 04676579b..11931c133 100644 --- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -99,6 +99,20 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks"; private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7; + /** + * HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow + * multiple write operations (upsert/buk-insert/...) to be executed within a single commit. + * + * Given Hudi commit protocol, these are generally unsafe operations and user need to handle failure scenarios. It + * only works with COW table. Hudi 0.5.x had stopped this behavior. + * + * Given the importance of supporting such cases for the user's migration to 0.5.x, we are proposing a safety flag + * (disabled by default) which will allow this old behavior. + */ + private static final String ALLOW_MULTI_WRITE_ON_SAME_INSTANT = + "_.hoodie.allow.multi.write.on.same.instant"; + private static final String DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT = "false"; + private ConsistencyGuardConfig consistencyGuardConfig; // Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled @@ -194,6 +208,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return Boolean.parseBoolean(props.getProperty(COMBINE_BEFORE_DELETE_PROP)); } + public boolean shouldAllowMultiWriteOnSameInstant() { + return Boolean.parseBoolean(props.getProperty(ALLOW_MULTI_WRITE_ON_SAME_INSTANT)); + } + public String getWriteStatusClassName() { return props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP); } @@ -723,6 +741,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { return this; } + public Builder withAllowMultiWriteOnSameInstant(boolean allow) { + props.setProperty(ALLOW_MULTI_WRITE_ON_SAME_INSTANT, String.valueOf(allow)); + return this; + } + public HoodieWriteConfig build() { // Check for mandatory properties setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM, DEFAULT_PARALLELISM); @@ -738,6 +761,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { DEFAULT_COMBINE_BEFORE_UPSERT); setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_DELETE_PROP), COMBINE_BEFORE_DELETE_PROP, DEFAULT_COMBINE_BEFORE_DELETE); + setDefaultOnCondition(props, !props.containsKey(ALLOW_MULTI_WRITE_ON_SAME_INSTANT), + ALLOW_MULTI_WRITE_ON_SAME_INSTANT, DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT); setDefaultOnCondition(props, !props.containsKey(WRITE_STATUS_STORAGE_LEVEL), WRITE_STATUS_STORAGE_LEVEL, DEFAULT_WRITE_STATUS_STORAGE_LEVEL); setDefaultOnCondition(props, !props.containsKey(HOODIE_AUTO_COMMIT_PROP), HOODIE_AUTO_COMMIT_PROP, @@ -778,7 +803,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig { // Ensure Layout Version is good new TimelineLayoutVersion(Integer.parseInt(layoutVersion)); - // Build WriteConfig at the end HoodieWriteConfig config = new HoodieWriteConfig(props); Objects.requireNonNull(config.getBasePath()); diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index a846de813..0717fd2f4 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -136,7 +136,8 @@ public abstract class BaseCommitActionExecutor> String commitActionType = table.getMetaClient().getCommitActionType(); HoodieInstant requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime); activeTimeline.transitionRequestedToInflight(requested, - Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8))); + Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)), + config.shouldAllowMultiWriteOnSameInstant()); } catch (IOException io) { throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io); } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java index 4755664b4..782b9aaa3 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertHelper.java @@ -73,7 +73,8 @@ public class BulkInsertHelper> { IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList()); table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED, - table.getMetaClient().getCommitActionType(), instantTime), Option.empty()); + table.getMetaClient().getCommitActionType(), instantTime), Option.empty(), + config.shouldAllowMultiWriteOnSameInstant()); JavaRDD writeStatusRDD = repartitionedRecords .mapPartitionsWithIndex(new BulkInsertMapFunction(instantTime, config, table, fileIDPrefixes), true) diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index 223c39db4..b50c4cbae 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -988,6 +988,44 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase { return Pair.of(markerFilePath, result); } + @Test + public void testMultiOperationsPerCommit() throws IOException { + HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false) + .withAllowMultiWriteOnSameInstant(true) + .build(); + HoodieWriteClient client = getHoodieWriteClient(cfg); + String firstInstantTime = "0000"; + client.startCommitWithTime(firstInstantTime); + int numRecords = 200; + JavaRDD writeRecords = jsc.parallelize(dataGen.generateInserts(firstInstantTime, numRecords), 1); + JavaRDD result = client.bulkInsert(writeRecords, firstInstantTime); + assertTrue(client.commit(firstInstantTime, result), "Commit should succeed"); + assertTrue(HoodieTestUtils.doesCommitExist(basePath, firstInstantTime), + "After explicit commit, commit file should be created"); + + // Check the entire dataset has all records still + String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length]; + for (int i = 0; i < fullPartitionPaths.length; i++) { + fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]); + } + assertEquals(numRecords, + HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(), + "Must contain " + numRecords + " records"); + + String nextInstantTime = "0001"; + client.startCommitWithTime(nextInstantTime); + JavaRDD updateRecords = jsc.parallelize(dataGen.generateUpdates(nextInstantTime, numRecords), 1); + JavaRDD insertRecords = jsc.parallelize(dataGen.generateInserts(nextInstantTime, numRecords), 1); + JavaRDD inserts = client.bulkInsert(insertRecords, nextInstantTime); + JavaRDD upserts = client.upsert(updateRecords, nextInstantTime); + assertTrue(client.commit(nextInstantTime, inserts.union(upserts)), "Commit should succeed"); + assertTrue(HoodieTestUtils.doesCommitExist(basePath, firstInstantTime), + "After explicit commit, commit file should be created"); + int totalRecords = 2 * numRecords; + assertEquals(totalRecords, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(), + "Must contain " + totalRecords + " records"); + } + /** * Build Hoodie Write Config for small data file sizes. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index c17309d57..36e2b3de1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -305,11 +305,16 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option data) { + transitionState(fromInstant, toInstant, data, false); + } + + private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option data, + boolean allowRedundantTransitions) { ValidationUtils.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp())); try { if (metaClient.getTimelineLayoutVersion().isNullVersion()) { // Re-create the .inflight file by opening a new file and write the commit metadata in - createFileInMetaPath(fromInstant.getFileName(), data, false); + createFileInMetaPath(fromInstant.getFileName(), data, allowRedundantTransitions); Path fromInstantPath = new Path(metaClient.getMetaPath(), fromInstant.getFileName()); Path toInstantPath = new Path(metaClient.getMetaPath(), toInstant.getFileName()); boolean success = metaClient.getFs().rename(fromInstantPath, toInstantPath); @@ -322,7 +327,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { ValidationUtils.checkArgument(metaClient.getFs().exists(new Path(metaClient.getMetaPath(), fromInstant.getFileName()))); // Use Write Once to create Target File - createImmutableFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data); + if (allowRedundantTransitions) { + createFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data); + } else { + createImmutableFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data); + } LOG.info("Create new file for toInstant ?" + new Path(metaClient.getMetaPath(), toInstant.getFileName())); } } catch (IOException e) { @@ -365,9 +374,14 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { } public void transitionRequestedToInflight(HoodieInstant requested, Option content) { + transitionRequestedToInflight(requested, content, false); + } + + public void transitionRequestedToInflight(HoodieInstant requested, Option content, + boolean allowRedundantTransitions) { HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, requested.getAction(), requested.getTimestamp()); ValidationUtils.checkArgument(requested.isRequested(), "Instant " + requested + " in wrong state"); - transitionState(requested, inflight, content); + transitionState(requested, inflight, content, allowRedundantTransitions); } public void saveToCompactionRequested(HoodieInstant instant, Option content) {