[HUDI-3642] Handle NPE due to empty requested replacecommit metadata (#5090)
This commit is contained in:
@@ -18,7 +18,6 @@
|
||||
|
||||
package org.apache.hudi.client.transaction;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
|
||||
import org.apache.hudi.client.utils.MetadataConversionUtils;
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
@@ -27,15 +26,18 @@ import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.util.CommitUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
|
||||
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
|
||||
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
|
||||
import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
|
||||
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
|
||||
import static org.apache.hudi.common.util.CommitUtils.getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord;
|
||||
|
||||
/**
|
||||
* This class is used to hold all information used to identify how to resolve conflicts between instants.
|
||||
@@ -52,7 +54,7 @@ public class ConcurrentOperation {
|
||||
private final String instantTime;
|
||||
private Set<String> mutatedFileIds = Collections.EMPTY_SET;
|
||||
|
||||
public ConcurrentOperation(HoodieInstant instant, HoodieTableMetaClient metaClient) throws IOException {
|
||||
public ConcurrentOperation(HoodieInstant instant, HoodieTableMetaClient metaClient) throws IOException {
|
||||
this.metadataWrapper = new HoodieMetadataWrapper(MetadataConversionUtils.createMetaWrapper(instant, metaClient));
|
||||
this.commitMetadataOption = Option.empty();
|
||||
this.actionState = instant.getState().name();
|
||||
@@ -106,24 +108,37 @@ public class ConcurrentOperation {
|
||||
break;
|
||||
case COMMIT_ACTION:
|
||||
case DELTA_COMMIT_ACTION:
|
||||
this.mutatedFileIds = CommitUtils.getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata()
|
||||
this.mutatedFileIds = getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata()
|
||||
.getPartitionToWriteStats()).keySet();
|
||||
this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType());
|
||||
break;
|
||||
case REPLACE_COMMIT_ACTION:
|
||||
if (instant.isCompleted()) {
|
||||
this.mutatedFileIds = CommitUtils.getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(
|
||||
this.mutatedFileIds = getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(
|
||||
this.metadataWrapper.getMetadataFromTimeline().getHoodieReplaceCommitMetadata().getPartitionToWriteStats()).keySet();
|
||||
this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieReplaceCommitMetadata().getOperationType());
|
||||
} else {
|
||||
// we need to have different handling for requested and inflight replacecommit because
|
||||
// for requested replacecommit, clustering will generate a plan and HoodieRequestedReplaceMetadata will not be empty, but insert_overwrite/insert_overwrite_table could have empty content
|
||||
// for inflight replacecommit, clustering will have no content in metadata, but insert_overwrite/insert_overwrite_table will have some commit metadata
|
||||
HoodieRequestedReplaceMetadata requestedReplaceMetadata = this.metadataWrapper.getMetadataFromTimeline().getHoodieRequestedReplaceMetadata();
|
||||
this.mutatedFileIds = requestedReplaceMetadata
|
||||
.getClusteringPlan().getInputGroups()
|
||||
.stream()
|
||||
.flatMap(ig -> ig.getSlices().stream())
|
||||
.map(file -> file.getFileId())
|
||||
.collect(Collectors.toSet());
|
||||
this.operationType = WriteOperationType.CLUSTER;
|
||||
org.apache.hudi.avro.model.HoodieCommitMetadata inflightCommitMetadata = this.metadataWrapper.getMetadataFromTimeline().getHoodieInflightReplaceMetadata();
|
||||
if (instant.isRequested()) {
|
||||
if (requestedReplaceMetadata != null) {
|
||||
this.mutatedFileIds = getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
|
||||
this.operationType = WriteOperationType.CLUSTER;
|
||||
}
|
||||
} else {
|
||||
if (inflightCommitMetadata != null) {
|
||||
this.mutatedFileIds = getFileIdWithoutSuffixAndRelativePathsFromSpecificRecord(inflightCommitMetadata.getPartitionToWriteStats()).keySet();
|
||||
this.operationType = WriteOperationType.fromValue(this.metadataWrapper.getMetadataFromTimeline().getHoodieCommitMetadata().getOperationType());
|
||||
} else if (requestedReplaceMetadata != null) {
|
||||
// inflight replacecommit metadata is empty due to clustering, read fileIds from requested replacecommit
|
||||
this.mutatedFileIds = getFileIdsFromRequestedReplaceMetadata(requestedReplaceMetadata);
|
||||
this.operationType = WriteOperationType.CLUSTER;
|
||||
}
|
||||
// NOTE: it cannot be the case that instant is inflight, and both the requested and inflight replacecommit metadata are empty
|
||||
}
|
||||
}
|
||||
break;
|
||||
default:
|
||||
@@ -142,6 +157,15 @@ public class ConcurrentOperation {
|
||||
}
|
||||
}
|
||||
|
||||
private static Set<String> getFileIdsFromRequestedReplaceMetadata(HoodieRequestedReplaceMetadata requestedReplaceMetadata) {
|
||||
return requestedReplaceMetadata
|
||||
.getClusteringPlan().getInputGroups()
|
||||
.stream()
|
||||
.flatMap(ig -> ig.getSlices().stream())
|
||||
.map(file -> file.getFileId())
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "{"
|
||||
|
||||
@@ -97,7 +97,7 @@ public class CommitUtils {
|
||||
String commitActionType,
|
||||
WriteOperationType operationType) {
|
||||
final HoodieCommitMetadata commitMetadata;
|
||||
if (commitActionType == HoodieTimeline.REPLACE_COMMIT_ACTION) {
|
||||
if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(commitActionType)) {
|
||||
HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata();
|
||||
replaceMetadata.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
|
||||
commitMetadata = replaceMetadata;
|
||||
|
||||
Reference in New Issue
Block a user