[HUDI-2774] Handle duplicate instants when fetching pending clustering plans (#4118)
This commit is contained in:
@@ -34,6 +34,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
@@ -124,7 +125,16 @@ public class ClusteringUtils {
|
||||
// get all filegroups in the plan
|
||||
getFileGroupEntriesInClusteringPlan(clusteringPlan.getLeft(), clusteringPlan.getRight()));
|
||||
|
||||
Map<HoodieFileGroupId, HoodieInstant> resultMap = resultStream.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
Map<HoodieFileGroupId, HoodieInstant> resultMap;
|
||||
try {
|
||||
resultMap = resultStream.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
} catch (Exception e) {
|
||||
if (e instanceof IllegalStateException && e.getMessage().contains("Duplicate key")) {
|
||||
throw new HoodieException("Found duplicate file groups pending clustering. If you're running deltastreamer in continuous mode, consider adding delay using --min-sync-interval-seconds. "
|
||||
+ "Or consider setting write concurrency mode to optimistic_concurrency_control.", e);
|
||||
}
|
||||
throw new HoodieException("Error getting all file groups in pending clustering", e);
|
||||
}
|
||||
LOG.info("Found " + resultMap.size() + " files in pending clustering operations");
|
||||
return resultMap;
|
||||
}
|
||||
@@ -166,22 +176,20 @@ public class ClusteringUtils {
|
||||
.setStrategyClassName(strategyClassName).setStrategyParams(strategyParams)
|
||||
.build();
|
||||
|
||||
HoodieClusteringPlan plan = HoodieClusteringPlan.newBuilder()
|
||||
return HoodieClusteringPlan.newBuilder()
|
||||
.setInputGroups(clusteringGroups)
|
||||
.setExtraMetadata(extraMetadata)
|
||||
.setStrategy(strategy)
|
||||
.build();
|
||||
|
||||
return plan;
|
||||
}
|
||||
|
||||
private static List<HoodieSliceInfo> getFileSliceInfo(List<FileSlice> slices) {
|
||||
return slices.stream().map(slice -> new HoodieSliceInfo().newBuilder()
|
||||
return slices.stream().map(slice -> HoodieSliceInfo.newBuilder()
|
||||
.setPartitionPath(slice.getPartitionPath())
|
||||
.setFileId(slice.getFileId())
|
||||
.setDataFilePath(slice.getBaseFile().map(BaseFile::getPath).orElse(null))
|
||||
.setDeltaFilePaths(slice.getLogFiles().map(f -> f.getPath().getName()).collect(Collectors.toList()))
|
||||
.setBootstrapFilePath(slice.getBaseFile().map(bf -> bf.getBootstrapBaseFile().map(bbf -> bbf.getPath()).orElse(null)).orElse(null))
|
||||
.setBootstrapFilePath(slice.getBaseFile().map(bf -> bf.getBootstrapBaseFile().map(BaseFile::getPath).orElse(null)).orElse(null))
|
||||
.build()).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user