From aaf5727495bc547889b815160ddf855b1799a79d Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Wed, 5 Jan 2022 03:02:05 +0530 Subject: [PATCH] [HUDI-2774] Handle duplicate instants when fetching pending clustering plans (#4118) --- .../hudi/common/util/ClusteringUtils.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java index 0d790be84..6687e583a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ClusteringUtils.java @@ -34,6 +34,7 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -124,7 +125,16 @@ public class ClusteringUtils { // get all filegroups in the plan getFileGroupEntriesInClusteringPlan(clusteringPlan.getLeft(), clusteringPlan.getRight())); - Map resultMap = resultStream.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + Map resultMap; + try { + resultMap = resultStream.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } catch (Exception e) { + if (e instanceof IllegalStateException && e.getMessage().contains("Duplicate key")) { + throw new HoodieException("Found duplicate file groups pending clustering. If you're running deltastreamer in continuous mode, consider adding delay using --min-sync-interval-seconds. " + + "Or consider setting write concurrency mode to optimistic_concurrency_control.", e); + } + throw new HoodieException("Error getting all file groups in pending clustering", e); + } LOG.info("Found " + resultMap.size() + " files in pending clustering operations"); return resultMap; } @@ -166,22 +176,20 @@ public class ClusteringUtils { .setStrategyClassName(strategyClassName).setStrategyParams(strategyParams) .build(); - HoodieClusteringPlan plan = HoodieClusteringPlan.newBuilder() + return HoodieClusteringPlan.newBuilder() .setInputGroups(clusteringGroups) .setExtraMetadata(extraMetadata) .setStrategy(strategy) .build(); - - return plan; } private static List getFileSliceInfo(List slices) { - return slices.stream().map(slice -> new HoodieSliceInfo().newBuilder() + return slices.stream().map(slice -> HoodieSliceInfo.newBuilder() .setPartitionPath(slice.getPartitionPath()) .setFileId(slice.getFileId()) .setDataFilePath(slice.getBaseFile().map(BaseFile::getPath).orElse(null)) .setDeltaFilePaths(slice.getLogFiles().map(f -> f.getPath().getName()).collect(Collectors.toList())) - .setBootstrapFilePath(slice.getBaseFile().map(bf -> bf.getBootstrapBaseFile().map(bbf -> bbf.getPath()).orElse(null)).orElse(null)) + .setBootstrapFilePath(slice.getBaseFile().map(bf -> bf.getBootstrapBaseFile().map(BaseFile::getPath).orElse(null)).orElse(null)) .build()).collect(Collectors.toList()); }