From e6ee7bdb516a7eb03ee2cddb0545d46a26325590 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Mon, 5 Jul 2021 20:56:24 +0800 Subject: [PATCH] [HUDI-2129] StreamerUtil.medianInstantTime should return a valid date time string (#3221) --- .../sink/StreamWriteOperatorCoordinator.java | 2 +- .../org/apache/hudi/util/CompactionUtil.java | 2 +- .../org/apache/hudi/util/StreamerUtil.java | 28 +++++++++++++---- .../apache/hudi/utils/TestStreamerUtil.java | 31 +++++++++++++++++-- 4 files changed, 52 insertions(+), 11 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index cdcd4e626..9c11a68d0 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -366,7 +366,7 @@ public class StreamWriteOperatorCoordinator try { return this.context.sendEvent(CommitAckEvent.getInstance(), taskID); } catch (TaskNotRunningException e) { - throw new HoodieException("Error while sending commit ack event to task [" + taskID + "] error", e); + throw new HoodieException("Error while sending commit ack event to task [" + taskID + "]", e); } }).toArray(CompletableFuture[]::new); try { diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java index e8927dc7f..df856742e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/CompactionUtil.java @@ -99,7 +99,7 @@ public class CompactionUtil { .filterPendingCompactionTimeline() .filter(instant -> instant.getState() == HoodieInstant.State.INFLIGHT - && StreamerUtil.instantTimeDiff(curInstantTime, instant.getTimestamp()) >= deltaSeconds); + && StreamerUtil.instantTimeDiffSeconds(curInstantTime, instant.getTimestamp()) >= deltaSeconds); inflightCompactionTimeline.getInstants().forEach(inflightInstant -> { LOG.info("Rollback the pending compaction instant: " + inflightInstant); table.rollback(table.getContext(), HoodieActiveTimeline.createNewInstantTime(), inflightInstant, true); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 4b94d26bd..d69481dc0 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -27,6 +27,8 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieMemoryConfig; import org.apache.hudi.config.HoodieStorageConfig; @@ -52,6 +54,8 @@ import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.IOException; import java.io.StringReader; +import java.text.ParseException; +import java.util.Date; import java.util.List; import java.util.Locale; import java.util.Properties; @@ -265,16 +269,28 @@ public class StreamerUtil { * Return the median instant time between the given two instant time. */ public static String medianInstantTime(String highVal, String lowVal) { - long high = Long.parseLong(highVal); - long low = Long.parseLong(lowVal); - long median = low + (high - low) / 2; - return String.valueOf(median); + try { + long high = HoodieActiveTimeline.COMMIT_FORMATTER.parse(highVal).getTime(); + long low = HoodieActiveTimeline.COMMIT_FORMATTER.parse(lowVal).getTime(); + ValidationUtils.checkArgument(high > low, + "Instant [" + highVal + "] should have newer timestamp than instant [" + lowVal + "]"); + long median = low + (high - low) / 2; + return HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(median)); + } catch (ParseException e) { + throw new HoodieException("Get median instant time with interval [" + lowVal + ", " + highVal + "] error", e); + } } /** * Returns the time interval in seconds between the given instant time. */ - public static long instantTimeDiff(String newInstantTime, String oldInstantTime) { - return Long.parseLong(newInstantTime) - Long.parseLong(oldInstantTime); + public static long instantTimeDiffSeconds(String newInstantTime, String oldInstantTime) { + try { + long newTimestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(newInstantTime).getTime(); + long oldTimestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(oldInstantTime).getTime(); + return (newTimestamp - oldTimestamp) / 1000; + } catch (ParseException e) { + throw new HoodieException("Get instant time diff with interval [" + oldInstantTime + ", " + newInstantTime + "] error", e); + } } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java index a0072c28b..2f6037c3d 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java @@ -24,25 +24,30 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.Configuration; - import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.IOException; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertArrayEquals; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +/** + * Test cases for {@link StreamerUtil}. + */ public class TestStreamerUtil { @TempDir File tempFile; @Test - public void testInitTableIfNotExists() throws IOException { + void testInitTableIfNotExists() throws IOException { Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); // Test for partitioned table. @@ -70,5 +75,25 @@ public class TestStreamerUtil { .build(); assertFalse(metaClient2.getTableConfig().getPartitionColumns().isPresent()); } + + @Test + void testMedianInstantTime() { + String higher = "20210705125921"; + String lower = "20210705125806"; + String median1 = StreamerUtil.medianInstantTime(higher, lower); + assertThat(median1, is("20210705125843")); + // test symmetry + assertThrows(IllegalArgumentException.class, + () -> StreamerUtil.medianInstantTime(lower, higher), + "The first argument should have newer instant time"); + } + + @Test + void testInstantTimeDiff() { + String higher = "20210705125921"; + String lower = "20210705125806"; + long diff = StreamerUtil.instantTimeDiffSeconds(higher, lower); + assertThat(diff, is(75L)); + } }