From 6f5e661010a85f3426131111baff962a6d6ba91e Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Tue, 16 Nov 2021 13:46:34 +0800 Subject: [PATCH] [HUDI-2769] Fix StreamerUtil#medianInstantTime for very near instant time (#4005) --- .../src/main/java/org/apache/hudi/util/StreamerUtil.java | 8 +++++++- .../test/java/org/apache/hudi/utils/TestStreamerUtil.java | 2 ++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 867621a66..ddbd24e35 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; @@ -414,7 +415,12 @@ public class StreamerUtil { ValidationUtils.checkArgument(high > low, "Instant [" + highVal + "] should have newer timestamp than instant [" + lowVal + "]"); long median = low + (high - low) / 2; - return low >= median ? Option.empty() : Option.of(HoodieActiveTimeline.formatInstantTime(new Date(median))); + final String instantTime = HoodieActiveTimeline.formatInstantTime(new Date(median)); + if (HoodieTimeline.compareTimestamps(lowVal, HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime) + || HoodieTimeline.compareTimestamps(highVal, HoodieTimeline.LESSER_THAN_OR_EQUALS, instantTime)) { + return Option.empty(); + } + return Option.of(instantTime); } catch (ParseException e) { throw new HoodieException("Get median instant time with interval [" + lowVal + ", " + highVal + "] error", e); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java index c05e5b056..b9e2b916d 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java @@ -86,6 +86,8 @@ public class TestStreamerUtil { assertThrows(IllegalArgumentException.class, () -> StreamerUtil.medianInstantTime(lower, higher), "The first argument should have newer instant time"); + // test very near instant time + assertFalse(StreamerUtil.medianInstantTime("20211116115634", "20211116115633").isPresent()); } @Test