[HUDI-2129] StreamerUtil.medianInstantTime should return a valid date time string (#3221)
This commit is contained in:
@@ -366,7 +366,7 @@ public class StreamWriteOperatorCoordinator
|
||||
try {
|
||||
return this.context.sendEvent(CommitAckEvent.getInstance(), taskID);
|
||||
} catch (TaskNotRunningException e) {
|
||||
throw new HoodieException("Error while sending commit ack event to task [" + taskID + "] error", e);
|
||||
throw new HoodieException("Error while sending commit ack event to task [" + taskID + "]", e);
|
||||
}
|
||||
}).toArray(CompletableFuture<?>[]::new);
|
||||
try {
|
||||
|
||||
@@ -99,7 +99,7 @@ public class CompactionUtil {
|
||||
.filterPendingCompactionTimeline()
|
||||
.filter(instant ->
|
||||
instant.getState() == HoodieInstant.State.INFLIGHT
|
||||
&& StreamerUtil.instantTimeDiff(curInstantTime, instant.getTimestamp()) >= deltaSeconds);
|
||||
&& StreamerUtil.instantTimeDiffSeconds(curInstantTime, instant.getTimestamp()) >= deltaSeconds);
|
||||
inflightCompactionTimeline.getInstants().forEach(inflightInstant -> {
|
||||
LOG.info("Rollback the pending compaction instant: " + inflightInstant);
|
||||
table.rollback(table.getContext(), HoodieActiveTimeline.createNewInstantTime(), inflightInstant, true);
|
||||
|
||||
@@ -27,6 +27,8 @@ import org.apache.hudi.common.config.TypedProperties;
|
||||
import org.apache.hudi.common.engine.EngineType;
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||
import org.apache.hudi.common.util.ValidationUtils;
|
||||
import org.apache.hudi.config.HoodieCompactionConfig;
|
||||
import org.apache.hudi.config.HoodieMemoryConfig;
|
||||
import org.apache.hudi.config.HoodieStorageConfig;
|
||||
@@ -52,6 +54,8 @@ import org.slf4j.LoggerFactory;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
import java.text.ParseException;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Properties;
|
||||
@@ -265,16 +269,28 @@ public class StreamerUtil {
|
||||
* Return the median instant time between the given two instant time.
|
||||
*/
|
||||
public static String medianInstantTime(String highVal, String lowVal) {
|
||||
long high = Long.parseLong(highVal);
|
||||
long low = Long.parseLong(lowVal);
|
||||
long median = low + (high - low) / 2;
|
||||
return String.valueOf(median);
|
||||
try {
|
||||
long high = HoodieActiveTimeline.COMMIT_FORMATTER.parse(highVal).getTime();
|
||||
long low = HoodieActiveTimeline.COMMIT_FORMATTER.parse(lowVal).getTime();
|
||||
ValidationUtils.checkArgument(high > low,
|
||||
"Instant [" + highVal + "] should have newer timestamp than instant [" + lowVal + "]");
|
||||
long median = low + (high - low) / 2;
|
||||
return HoodieActiveTimeline.COMMIT_FORMATTER.format(new Date(median));
|
||||
} catch (ParseException e) {
|
||||
throw new HoodieException("Get median instant time with interval [" + lowVal + ", " + highVal + "] error", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the time interval in seconds between the given instant time.
|
||||
*/
|
||||
public static long instantTimeDiff(String newInstantTime, String oldInstantTime) {
|
||||
return Long.parseLong(newInstantTime) - Long.parseLong(oldInstantTime);
|
||||
public static long instantTimeDiffSeconds(String newInstantTime, String oldInstantTime) {
|
||||
try {
|
||||
long newTimestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(newInstantTime).getTime();
|
||||
long oldTimestamp = HoodieActiveTimeline.COMMIT_FORMATTER.parse(oldInstantTime).getTime();
|
||||
return (newTimestamp - oldTimestamp) / 1000;
|
||||
} catch (ParseException e) {
|
||||
throw new HoodieException("Get instant time diff with interval [" + oldInstantTime + ", " + newInstantTime + "] error", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,25 +24,30 @@ import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.is;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
/**
|
||||
* Test cases for {@link StreamerUtil}.
|
||||
*/
|
||||
public class TestStreamerUtil {
|
||||
|
||||
@TempDir
|
||||
File tempFile;
|
||||
|
||||
@Test
|
||||
public void testInitTableIfNotExists() throws IOException {
|
||||
void testInitTableIfNotExists() throws IOException {
|
||||
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
|
||||
|
||||
// Test for partitioned table.
|
||||
@@ -70,5 +75,25 @@ public class TestStreamerUtil {
|
||||
.build();
|
||||
assertFalse(metaClient2.getTableConfig().getPartitionColumns().isPresent());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testMedianInstantTime() {
|
||||
String higher = "20210705125921";
|
||||
String lower = "20210705125806";
|
||||
String median1 = StreamerUtil.medianInstantTime(higher, lower);
|
||||
assertThat(median1, is("20210705125843"));
|
||||
// test symmetry
|
||||
assertThrows(IllegalArgumentException.class,
|
||||
() -> StreamerUtil.medianInstantTime(lower, higher),
|
||||
"The first argument should have newer instant time");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testInstantTimeDiff() {
|
||||
String higher = "20210705125921";
|
||||
String lower = "20210705125806";
|
||||
long diff = StreamerUtil.instantTimeDiffSeconds(higher, lower);
|
||||
assertThat(diff, is(75L));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user