[HUDI-3252] Avoid creating empty requestedReplaceCommit in the startCommit method (#4515)
This commit is contained in:
@@ -828,8 +828,13 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
|
|||||||
if (config.getFailedWritesCleanPolicy().isLazy()) {
|
if (config.getFailedWritesCleanPolicy().isLazy()) {
|
||||||
this.heartbeatClient.start(instantTime);
|
this.heartbeatClient.start(instantTime);
|
||||||
}
|
}
|
||||||
metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, actionType,
|
|
||||||
instantTime));
|
if (actionType.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
|
||||||
|
metaClient.getActiveTimeline().createRequestedReplaceCommit(instantTime, actionType);
|
||||||
|
} else {
|
||||||
|
metaClient.getActiveTimeline().createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, actionType,
|
||||||
|
instantTime));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -134,7 +134,7 @@ public class MetadataConversionUtils {
|
|||||||
return Option.of(HoodieCommitMetadata.fromBytes(inflightContent.get(), HoodieCommitMetadata.class));
|
return Option.of(HoodieCommitMetadata.fromBytes(inflightContent.get(), HoodieCommitMetadata.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Option<HoodieRequestedReplaceMetadata> getRequestedReplaceMetadata(HoodieTableMetaClient metaClient, HoodieInstant instant) throws IOException {
|
private static Option<HoodieRequestedReplaceMetadata> getRequestedReplaceMetadata(HoodieTableMetaClient metaClient, HoodieInstant instant) throws IOException {
|
||||||
Option<byte[]> requestedContent = metaClient.getActiveTimeline().getInstantDetails(instant);
|
Option<byte[]> requestedContent = metaClient.getActiveTimeline().getInstantDetails(instant);
|
||||||
if (!requestedContent.isPresent() || requestedContent.get().length == 0) {
|
if (!requestedContent.isPresent() || requestedContent.get().length == 0) {
|
||||||
// requested commit files can be empty in some certain cases, e.g. insert_overwrite or insert_overwrite_table.
|
// requested commit files can be empty in some certain cases, e.g. insert_overwrite or insert_overwrite_table.
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.common.table.timeline;
|
package org.apache.hudi.common.table.timeline;
|
||||||
|
|
||||||
|
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
|
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
|
||||||
import org.apache.hudi.common.util.FileIOUtils;
|
import org.apache.hudi.common.util.FileIOUtils;
|
||||||
@@ -156,6 +157,18 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
|
|||||||
createFileInMetaPath(instant.getFileName(), Option.empty(), false);
|
createFileInMetaPath(instant.getFileName(), Option.empty(), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void createRequestedReplaceCommit(String instantTime, String actionType) {
|
||||||
|
try {
|
||||||
|
HoodieInstant instant = new HoodieInstant(State.REQUESTED, actionType, instantTime);
|
||||||
|
LOG.info("Creating a new instant " + instant);
|
||||||
|
// Create the request replace file
|
||||||
|
createFileInMetaPath(instant.getFileName(),
|
||||||
|
TimelineMetadataUtils.serializeRequestedReplaceMetadata(new HoodieRequestedReplaceMetadata()), false);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieIOException("Error create requested replace commit ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void saveAsComplete(HoodieInstant instant, Option<byte[]> data) {
|
public void saveAsComplete(HoodieInstant instant, Option<byte[]> data) {
|
||||||
LOG.info("Marking instant complete " + instant);
|
LOG.info("Marking instant complete " + instant);
|
||||||
ValidationUtils.checkArgument(instant.isInflight(),
|
ValidationUtils.checkArgument(instant.isInflight(),
|
||||||
|
|||||||
@@ -77,7 +77,7 @@ public class ClusteringUtils {
|
|||||||
* @return
|
* @return
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public static Option<HoodieRequestedReplaceMetadata> getRequestedReplaceMetadata(HoodieTableMetaClient metaClient, HoodieInstant pendingReplaceInstant) throws IOException {
|
private static Option<HoodieRequestedReplaceMetadata> getRequestedReplaceMetadata(HoodieTableMetaClient metaClient, HoodieInstant pendingReplaceInstant) throws IOException {
|
||||||
final HoodieInstant requestedInstant;
|
final HoodieInstant requestedInstant;
|
||||||
if (!pendingReplaceInstant.isRequested()) {
|
if (!pendingReplaceInstant.isRequested()) {
|
||||||
// inflight replacecommit files don't have clustering plan.
|
// inflight replacecommit files don't have clustering plan.
|
||||||
|
|||||||
Reference in New Issue
Block a user