[HUDI-858] Allow multiple operations to be executed within a single commit (#1633)
This commit is contained in:
committed by
GitHub
parent
2600d2de8d
commit
e6f3bf10cf
@@ -99,6 +99,20 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
|
private static final String MAX_CONSISTENCY_CHECKS_PROP = "hoodie.consistency.check.max_checks";
|
||||||
private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
|
private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HUDI-858 : There are users who had been directly using RDD APIs and have relied on a behavior in 0.4.x to allow
|
||||||
|
* multiple write operations (upsert/buk-insert/...) to be executed within a single commit.
|
||||||
|
*
|
||||||
|
* Given Hudi commit protocol, these are generally unsafe operations and user need to handle failure scenarios. It
|
||||||
|
* only works with COW table. Hudi 0.5.x had stopped this behavior.
|
||||||
|
*
|
||||||
|
* Given the importance of supporting such cases for the user's migration to 0.5.x, we are proposing a safety flag
|
||||||
|
* (disabled by default) which will allow this old behavior.
|
||||||
|
*/
|
||||||
|
private static final String ALLOW_MULTI_WRITE_ON_SAME_INSTANT =
|
||||||
|
"_.hoodie.allow.multi.write.on.same.instant";
|
||||||
|
private static final String DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT = "false";
|
||||||
|
|
||||||
private ConsistencyGuardConfig consistencyGuardConfig;
|
private ConsistencyGuardConfig consistencyGuardConfig;
|
||||||
|
|
||||||
// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
|
// Hoodie Write Client transparently rewrites File System View config when embedded mode is enabled
|
||||||
@@ -194,6 +208,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
return Boolean.parseBoolean(props.getProperty(COMBINE_BEFORE_DELETE_PROP));
|
return Boolean.parseBoolean(props.getProperty(COMBINE_BEFORE_DELETE_PROP));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean shouldAllowMultiWriteOnSameInstant() {
|
||||||
|
return Boolean.parseBoolean(props.getProperty(ALLOW_MULTI_WRITE_ON_SAME_INSTANT));
|
||||||
|
}
|
||||||
|
|
||||||
public String getWriteStatusClassName() {
|
public String getWriteStatusClassName() {
|
||||||
return props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP);
|
return props.getProperty(HOODIE_WRITE_STATUS_CLASS_PROP);
|
||||||
}
|
}
|
||||||
@@ -723,6 +741,11 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder withAllowMultiWriteOnSameInstant(boolean allow) {
|
||||||
|
props.setProperty(ALLOW_MULTI_WRITE_ON_SAME_INSTANT, String.valueOf(allow));
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public HoodieWriteConfig build() {
|
public HoodieWriteConfig build() {
|
||||||
// Check for mandatory properties
|
// Check for mandatory properties
|
||||||
setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM, DEFAULT_PARALLELISM);
|
setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), INSERT_PARALLELISM, DEFAULT_PARALLELISM);
|
||||||
@@ -738,6 +761,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
DEFAULT_COMBINE_BEFORE_UPSERT);
|
DEFAULT_COMBINE_BEFORE_UPSERT);
|
||||||
setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_DELETE_PROP), COMBINE_BEFORE_DELETE_PROP,
|
setDefaultOnCondition(props, !props.containsKey(COMBINE_BEFORE_DELETE_PROP), COMBINE_BEFORE_DELETE_PROP,
|
||||||
DEFAULT_COMBINE_BEFORE_DELETE);
|
DEFAULT_COMBINE_BEFORE_DELETE);
|
||||||
|
setDefaultOnCondition(props, !props.containsKey(ALLOW_MULTI_WRITE_ON_SAME_INSTANT),
|
||||||
|
ALLOW_MULTI_WRITE_ON_SAME_INSTANT, DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT);
|
||||||
setDefaultOnCondition(props, !props.containsKey(WRITE_STATUS_STORAGE_LEVEL), WRITE_STATUS_STORAGE_LEVEL,
|
setDefaultOnCondition(props, !props.containsKey(WRITE_STATUS_STORAGE_LEVEL), WRITE_STATUS_STORAGE_LEVEL,
|
||||||
DEFAULT_WRITE_STATUS_STORAGE_LEVEL);
|
DEFAULT_WRITE_STATUS_STORAGE_LEVEL);
|
||||||
setDefaultOnCondition(props, !props.containsKey(HOODIE_AUTO_COMMIT_PROP), HOODIE_AUTO_COMMIT_PROP,
|
setDefaultOnCondition(props, !props.containsKey(HOODIE_AUTO_COMMIT_PROP), HOODIE_AUTO_COMMIT_PROP,
|
||||||
@@ -778,7 +803,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
|
|||||||
// Ensure Layout Version is good
|
// Ensure Layout Version is good
|
||||||
new TimelineLayoutVersion(Integer.parseInt(layoutVersion));
|
new TimelineLayoutVersion(Integer.parseInt(layoutVersion));
|
||||||
|
|
||||||
|
|
||||||
// Build WriteConfig at the end
|
// Build WriteConfig at the end
|
||||||
HoodieWriteConfig config = new HoodieWriteConfig(props);
|
HoodieWriteConfig config = new HoodieWriteConfig(props);
|
||||||
Objects.requireNonNull(config.getBasePath());
|
Objects.requireNonNull(config.getBasePath());
|
||||||
|
|||||||
@@ -136,7 +136,8 @@ public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload<T>>
|
|||||||
String commitActionType = table.getMetaClient().getCommitActionType();
|
String commitActionType = table.getMetaClient().getCommitActionType();
|
||||||
HoodieInstant requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime);
|
HoodieInstant requested = new HoodieInstant(State.REQUESTED, commitActionType, instantTime);
|
||||||
activeTimeline.transitionRequestedToInflight(requested,
|
activeTimeline.transitionRequestedToInflight(requested,
|
||||||
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)),
|
||||||
|
config.shouldAllowMultiWriteOnSameInstant());
|
||||||
} catch (IOException io) {
|
} catch (IOException io) {
|
||||||
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
|
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", io);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -73,7 +73,8 @@ public class BulkInsertHelper<T extends HoodieRecordPayload<T>> {
|
|||||||
IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
|
IntStream.range(0, parallelism).mapToObj(i -> FSUtils.createNewFileIdPfx()).collect(Collectors.toList());
|
||||||
|
|
||||||
table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
|
table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED,
|
||||||
table.getMetaClient().getCommitActionType(), instantTime), Option.empty());
|
table.getMetaClient().getCommitActionType(), instantTime), Option.empty(),
|
||||||
|
config.shouldAllowMultiWriteOnSameInstant());
|
||||||
|
|
||||||
JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
|
JavaRDD<WriteStatus> writeStatusRDD = repartitionedRecords
|
||||||
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(instantTime, config, table, fileIDPrefixes), true)
|
.mapPartitionsWithIndex(new BulkInsertMapFunction<T>(instantTime, config, table, fileIDPrefixes), true)
|
||||||
|
|||||||
@@ -988,6 +988,44 @@ public class TestHoodieClientOnCopyOnWriteStorage extends TestHoodieClientBase {
|
|||||||
return Pair.of(markerFilePath, result);
|
return Pair.of(markerFilePath, result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiOperationsPerCommit() throws IOException {
|
||||||
|
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false)
|
||||||
|
.withAllowMultiWriteOnSameInstant(true)
|
||||||
|
.build();
|
||||||
|
HoodieWriteClient client = getHoodieWriteClient(cfg);
|
||||||
|
String firstInstantTime = "0000";
|
||||||
|
client.startCommitWithTime(firstInstantTime);
|
||||||
|
int numRecords = 200;
|
||||||
|
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(dataGen.generateInserts(firstInstantTime, numRecords), 1);
|
||||||
|
JavaRDD<WriteStatus> result = client.bulkInsert(writeRecords, firstInstantTime);
|
||||||
|
assertTrue(client.commit(firstInstantTime, result), "Commit should succeed");
|
||||||
|
assertTrue(HoodieTestUtils.doesCommitExist(basePath, firstInstantTime),
|
||||||
|
"After explicit commit, commit file should be created");
|
||||||
|
|
||||||
|
// Check the entire dataset has all records still
|
||||||
|
String[] fullPartitionPaths = new String[dataGen.getPartitionPaths().length];
|
||||||
|
for (int i = 0; i < fullPartitionPaths.length; i++) {
|
||||||
|
fullPartitionPaths[i] = String.format("%s/%s/*", basePath, dataGen.getPartitionPaths()[i]);
|
||||||
|
}
|
||||||
|
assertEquals(numRecords,
|
||||||
|
HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
|
||||||
|
"Must contain " + numRecords + " records");
|
||||||
|
|
||||||
|
String nextInstantTime = "0001";
|
||||||
|
client.startCommitWithTime(nextInstantTime);
|
||||||
|
JavaRDD<HoodieRecord> updateRecords = jsc.parallelize(dataGen.generateUpdates(nextInstantTime, numRecords), 1);
|
||||||
|
JavaRDD<HoodieRecord> insertRecords = jsc.parallelize(dataGen.generateInserts(nextInstantTime, numRecords), 1);
|
||||||
|
JavaRDD<WriteStatus> inserts = client.bulkInsert(insertRecords, nextInstantTime);
|
||||||
|
JavaRDD<WriteStatus> upserts = client.upsert(updateRecords, nextInstantTime);
|
||||||
|
assertTrue(client.commit(nextInstantTime, inserts.union(upserts)), "Commit should succeed");
|
||||||
|
assertTrue(HoodieTestUtils.doesCommitExist(basePath, firstInstantTime),
|
||||||
|
"After explicit commit, commit file should be created");
|
||||||
|
int totalRecords = 2 * numRecords;
|
||||||
|
assertEquals(totalRecords, HoodieClientTestUtils.read(jsc, basePath, sqlContext, fs, fullPartitionPaths).count(),
|
||||||
|
"Must contain " + totalRecords + " records");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Build Hoodie Write Config for small data file sizes.
|
* Build Hoodie Write Config for small data file sizes.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -305,11 +305,16 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data) {
|
private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data) {
|
||||||
|
transitionState(fromInstant, toInstant, data, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option<byte[]> data,
|
||||||
|
boolean allowRedundantTransitions) {
|
||||||
ValidationUtils.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp()));
|
ValidationUtils.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp()));
|
||||||
try {
|
try {
|
||||||
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
|
if (metaClient.getTimelineLayoutVersion().isNullVersion()) {
|
||||||
// Re-create the .inflight file by opening a new file and write the commit metadata in
|
// Re-create the .inflight file by opening a new file and write the commit metadata in
|
||||||
createFileInMetaPath(fromInstant.getFileName(), data, false);
|
createFileInMetaPath(fromInstant.getFileName(), data, allowRedundantTransitions);
|
||||||
Path fromInstantPath = new Path(metaClient.getMetaPath(), fromInstant.getFileName());
|
Path fromInstantPath = new Path(metaClient.getMetaPath(), fromInstant.getFileName());
|
||||||
Path toInstantPath = new Path(metaClient.getMetaPath(), toInstant.getFileName());
|
Path toInstantPath = new Path(metaClient.getMetaPath(), toInstant.getFileName());
|
||||||
boolean success = metaClient.getFs().rename(fromInstantPath, toInstantPath);
|
boolean success = metaClient.getFs().rename(fromInstantPath, toInstantPath);
|
||||||
@@ -322,7 +327,11 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
|||||||
ValidationUtils.checkArgument(metaClient.getFs().exists(new Path(metaClient.getMetaPath(),
|
ValidationUtils.checkArgument(metaClient.getFs().exists(new Path(metaClient.getMetaPath(),
|
||||||
fromInstant.getFileName())));
|
fromInstant.getFileName())));
|
||||||
// Use Write Once to create Target File
|
// Use Write Once to create Target File
|
||||||
|
if (allowRedundantTransitions) {
|
||||||
|
createFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data);
|
||||||
|
} else {
|
||||||
createImmutableFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data);
|
createImmutableFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data);
|
||||||
|
}
|
||||||
LOG.info("Create new file for toInstant ?" + new Path(metaClient.getMetaPath(), toInstant.getFileName()));
|
LOG.info("Create new file for toInstant ?" + new Path(metaClient.getMetaPath(), toInstant.getFileName()));
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
@@ -365,9 +374,14 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void transitionRequestedToInflight(HoodieInstant requested, Option<byte[]> content) {
|
public void transitionRequestedToInflight(HoodieInstant requested, Option<byte[]> content) {
|
||||||
|
transitionRequestedToInflight(requested, content, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void transitionRequestedToInflight(HoodieInstant requested, Option<byte[]> content,
|
||||||
|
boolean allowRedundantTransitions) {
|
||||||
HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, requested.getAction(), requested.getTimestamp());
|
HoodieInstant inflight = new HoodieInstant(State.INFLIGHT, requested.getAction(), requested.getTimestamp());
|
||||||
ValidationUtils.checkArgument(requested.isRequested(), "Instant " + requested + " in wrong state");
|
ValidationUtils.checkArgument(requested.isRequested(), "Instant " + requested + " in wrong state");
|
||||||
transitionState(requested, inflight, content);
|
transitionState(requested, inflight, content, allowRedundantTransitions);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void saveToCompactionRequested(HoodieInstant instant, Option<byte[]> content) {
|
public void saveToCompactionRequested(HoodieInstant instant, Option<byte[]> content) {
|
||||||
|
|||||||
Reference in New Issue
Block a user