1
0

[HUDI-2559] Converting commit timestamp format to millisecs (#4024)

- Adds support for generating commit timestamps with millisecs granularity. 
- Older commit timestamps (in secs granularity) will be suffixed with 999 and parsed with millisecs format.
This commit is contained in:
Sivabalan Narayanan
2021-11-22 11:44:38 -05:00
committed by GitHub
parent 89452063b4
commit fc9ca6a07a
19 changed files with 132 additions and 66 deletions

View File

@@ -51,7 +51,7 @@ public class CommitUtil {
public static String getTimeDaysAgo(int numberOfDays) { public static String getTimeDaysAgo(int numberOfDays) {
Date date = Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant()); Date date = Date.from(ZonedDateTime.now().minusDays(numberOfDays).toInstant());
return HoodieActiveTimeline.formatInstantTime(date); return HoodieActiveTimeline.formatDate(date);
} }
/** /**
@@ -61,8 +61,8 @@ public class CommitUtil {
* b) hours: -1, returns 20200202010000 * b) hours: -1, returns 20200202010000
*/ */
public static String addHours(String compactionCommitTime, int hours) throws ParseException { public static String addHours(String compactionCommitTime, int hours) throws ParseException {
Instant instant = HoodieActiveTimeline.parseInstantTime(compactionCommitTime).toInstant(); Instant instant = HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).toInstant();
ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault()); ZonedDateTime commitDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
return HoodieActiveTimeline.formatInstantTime(Date.from(commitDateTime.plusHours(hours).toInstant())); return HoodieActiveTimeline.formatDate(Date.from(commitDateTime.plusHours(hours).toInstant()));
} }
} }

View File

@@ -233,7 +233,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I
if (writeTimer != null) { if (writeTimer != null) {
long durationInMs = metrics.getDurationInMs(writeTimer.stop()); long durationInMs = metrics.getDurationInMs(writeTimer.stop());
metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(instantTime).getTime(), durationInMs, metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(instantTime).getTime(), durationInMs,
metadata, actionType); metadata, actionType);
writeTimer = null; writeTimer = null;
} }

View File

@@ -184,7 +184,7 @@ public class ScheduleCompactionActionExecutor<T extends HoodieRecordPayload, I,
private Long parsedToSeconds(String time) { private Long parsedToSeconds(String time) {
long timestamp; long timestamp;
try { try {
timestamp = HoodieActiveTimeline.parseInstantTime(time).getTime() / 1000; timestamp = HoodieActiveTimeline.parseDateFromInstantTime(time).getTime() / 1000;
} catch (ParseException e) { } catch (ParseException e) {
throw new HoodieCompactionException(e.getMessage(), e); throw new HoodieCompactionException(e.getMessage(), e);
} }

View File

@@ -371,7 +371,7 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
if (compactionTimer != null) { if (compactionTimer != null) {
long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
try { try {
metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(compactionCommitTime).getTime(), metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION); durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
} catch (ParseException e) { } catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "

View File

@@ -324,7 +324,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
if (compactionTimer != null) { if (compactionTimer != null) {
long durationInMs = metrics.getDurationInMs(compactionTimer.stop()); long durationInMs = metrics.getDurationInMs(compactionTimer.stop());
try { try {
metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(compactionCommitTime).getTime(), metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(compactionCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION); durationInMs, metadata, HoodieActiveTimeline.COMPACTION_ACTION);
} catch (ParseException e) { } catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "
@@ -405,7 +405,7 @@ public class SparkRDDWriteClient<T extends HoodieRecordPayload> extends
if (clusteringTimer != null) { if (clusteringTimer != null) {
long durationInMs = metrics.getDurationInMs(clusteringTimer.stop()); long durationInMs = metrics.getDurationInMs(clusteringTimer.stop());
try { try {
metrics.updateCommitMetrics(HoodieActiveTimeline.parseInstantTime(clusteringCommitTime).getTime(), metrics.updateCommitMetrics(HoodieActiveTimeline.parseDateFromInstantTime(clusteringCommitTime).getTime(),
durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION); durationInMs, metadata, HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
} catch (ParseException e) { } catch (ParseException e) {
throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction " throw new HoodieCommitException("Commit time is not of valid format. Failed to commit compaction "

View File

@@ -567,7 +567,7 @@ public class SparkHoodieHBaseIndex<T extends HoodieRecordPayload<T>>
BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) { BufferedMutator mutator = hbaseConnection.getBufferedMutator(TableName.valueOf(tableName))) {
final RateLimiter limiter = RateLimiter.create(multiPutBatchSize, TimeUnit.SECONDS); final RateLimiter limiter = RateLimiter.create(multiPutBatchSize, TimeUnit.SECONDS);
Long rollbackTime = HoodieActiveTimeline.parseInstantTime(instantTime).getTime(); Long rollbackTime = HoodieActiveTimeline.parseDateFromInstantTime(instantTime).getTime();
Long currentTime = new Date().getTime(); Long currentTime = new Date().getTime();
Scan scan = new Scan(); Scan scan = new Scan();
scan.addFamily(SYSTEM_COLUMN_FAMILY); scan.addFamily(SYSTEM_COLUMN_FAMILY);

View File

@@ -72,10 +72,10 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
protected HoodieTableMetaClient metaClient; protected HoodieTableMetaClient metaClient;
/** /**
* Parse the timestamp of an Instant and return a {@code SimpleDateFormat}. * Parse the timestamp of an Instant and return a {@code Date}.
*/ */
public static Date parseInstantTime(String timestamp) throws ParseException { public static Date parseDateFromInstantTime(String timestamp) throws ParseException {
return HoodieInstantTimeGenerator.parseInstantTime(timestamp); return HoodieInstantTimeGenerator.parseDateFromInstantTime(timestamp);
} }
/** /**
@@ -88,8 +88,8 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
/** /**
* Format the Date to a String representing the timestamp of a Hoodie Instant. * Format the Date to a String representing the timestamp of a Hoodie Instant.
*/ */
public static String formatInstantTime(Date timestamp) { public static String formatDate(Date timestamp) {
return HoodieInstantTimeGenerator.formatInstantTime(timestamp); return HoodieInstantTimeGenerator.formatDate(timestamp);
} }
/** /**
@@ -100,6 +100,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
return HoodieInstantTimeGenerator.createNewInstantTime(0); return HoodieInstantTimeGenerator.createNewInstantTime(0);
} }
/** /**
* Returns next instant time that adds N milliseconds to current time. * Returns next instant time that adds N milliseconds to current time.
* Ensures each instant time is atleast 1 second apart since we create instant times at second granularity * Ensures each instant time is atleast 1 second apart since we create instant times at second granularity

View File

@@ -23,7 +23,9 @@ import java.time.Instant;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.DateTimeParseException; import java.time.format.DateTimeParseException;
import java.time.temporal.ChronoField;
import java.time.temporal.TemporalAccessor; import java.time.temporal.TemporalAccessor;
import java.util.Date; import java.util.Date;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@@ -33,14 +35,27 @@ import java.util.concurrent.atomic.AtomicReference;
*/ */
public class HoodieInstantTimeGenerator { public class HoodieInstantTimeGenerator {
// Format of the timestamp used for an Instant // Format of the timestamp used for an Instant
private static final String INSTANT_TIMESTAMP_FORMAT = "yyyyMMddHHmmss"; public static final String SECS_INSTANT_TIMESTAMP_FORMAT = "yyyyMMddHHmmss";
private static final int INSTANT_TIMESTAMP_FORMAT_LENGTH = INSTANT_TIMESTAMP_FORMAT.length(); public static final int SECS_INSTANT_ID_LENGTH = SECS_INSTANT_TIMESTAMP_FORMAT.length();
public static final String MILLIS_INSTANT_TIMESTAMP_FORMAT = "yyyyMMddHHmmssSSS";
public static final int MILLIS_INSTANT_ID_LENGTH = MILLIS_INSTANT_TIMESTAMP_FORMAT.length();
public static final int MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH = MILLIS_INSTANT_TIMESTAMP_FORMAT.length();
// Formatter to generate Instant timestamps // Formatter to generate Instant timestamps
private static DateTimeFormatter INSTANT_TIME_FORMATTER = DateTimeFormatter.ofPattern(INSTANT_TIMESTAMP_FORMAT); // Unfortunately millisecond format is not parsable as is https://bugs.openjdk.java.net/browse/JDK-8031085. hence have to do appendValue()
private static DateTimeFormatter MILLIS_INSTANT_TIME_FORMATTER = new DateTimeFormatterBuilder().appendPattern(SECS_INSTANT_TIMESTAMP_FORMAT)
.appendValue(ChronoField.MILLI_OF_SECOND, 3).toFormatter();
private static final String MILLIS_GRANULARITY_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
private static DateTimeFormatter MILLIS_GRANULARITY_DATE_FORMATTER = DateTimeFormatter.ofPattern(MILLIS_GRANULARITY_DATE_FORMAT);
// The last Instant timestamp generated // The last Instant timestamp generated
private static AtomicReference<String> lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE)); private static AtomicReference<String> lastInstantTime = new AtomicReference<>(String.valueOf(Integer.MIN_VALUE));
private static final String ALL_ZERO_TIMESTAMP = "00000000000000"; private static final String ALL_ZERO_TIMESTAMP = "00000000000000";
// The default number of milliseconds that we add if they are not present
// We prefer the max timestamp as it mimics the current behavior with second granularity
// when performing comparisons such as LESS_THAN_OR_EQUAL_TO
private static final String DEFAULT_MILLIS_EXT = "999";
/** /**
* Returns next instant time that adds N milliseconds to the current time. * Returns next instant time that adds N milliseconds to the current time.
* Ensures each instant time is atleast 1 second apart since we create instant times at second granularity * Ensures each instant time is atleast 1 second apart since we create instant times at second granularity
@@ -52,36 +67,65 @@ public class HoodieInstantTimeGenerator {
String newCommitTime; String newCommitTime;
do { do {
Date d = new Date(System.currentTimeMillis() + milliseconds); Date d = new Date(System.currentTimeMillis() + milliseconds);
newCommitTime = INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d)); newCommitTime = MILLIS_INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(d));
} while (HoodieTimeline.compareTimestamps(newCommitTime, HoodieActiveTimeline.LESSER_THAN_OR_EQUALS, oldVal)); } while (HoodieTimeline.compareTimestamps(newCommitTime, HoodieActiveTimeline.LESSER_THAN_OR_EQUALS, oldVal));
return newCommitTime; return newCommitTime;
}); });
} }
public static Date parseInstantTime(String timestamp) throws ParseException { public static Date parseDateFromInstantTime(String timestamp) throws ParseException {
try { try {
LocalDateTime dt = LocalDateTime.parse(timestamp, INSTANT_TIME_FORMATTER); // Enables backwards compatibility with non-millisecond granularity instants
String timestampInMillis = timestamp;
if (isSecondGranularity(timestamp)) {
// Add milliseconds to the instant in order to parse successfully
timestampInMillis = timestamp + DEFAULT_MILLIS_EXT;
} else if (timestamp.length() > MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH) {
// compaction and cleaning in metadata has special format. handling it by trimming extra chars and treating it with ms granularity
timestampInMillis = timestamp.substring(0, MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH);
}
LocalDateTime dt = LocalDateTime.parse(timestampInMillis, MILLIS_INSTANT_TIME_FORMATTER);
return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant()); return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant());
} catch (DateTimeParseException e) { } catch (DateTimeParseException e) {
// Special handling for all zero timestamp which is not parsable by DateTimeFormatter // Special handling for all zero timestamp which is not parsable by DateTimeFormatter
if (timestamp.equals(ALL_ZERO_TIMESTAMP)) { if (timestamp.equals(ALL_ZERO_TIMESTAMP)) {
return new Date(0); return new Date(0);
} }
// compaction and cleaning in metadata has special format. handling it by trimming extra chars and treating it with secs granularity
if (timestamp.length() > INSTANT_TIMESTAMP_FORMAT_LENGTH) {
LocalDateTime dt = LocalDateTime.parse(timestamp.substring(0, INSTANT_TIMESTAMP_FORMAT_LENGTH), INSTANT_TIME_FORMATTER);
return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant());
}
throw e; throw e;
} }
} }
public static String formatInstantTime(Instant timestamp) { private static boolean isSecondGranularity(String instant) {
return INSTANT_TIME_FORMATTER.format(timestamp); return instant.length() == SECS_INSTANT_ID_LENGTH;
} }
public static String formatInstantTime(Date timestamp) { public static String formatInstantTime(Instant timestamp) {
return INSTANT_TIME_FORMATTER.format(convertDateToTemporalAccessor(timestamp)); return MILLIS_INSTANT_TIME_FORMATTER.format(timestamp);
}
public static String formatDate(Date timestamp) {
return getInstantFromTemporalAccessor(convertDateToTemporalAccessor(timestamp));
}
public static String getInstantFromTemporalAccessor(TemporalAccessor temporalAccessor) {
return MILLIS_INSTANT_TIME_FORMATTER.format(temporalAccessor);
}
/**
* Creates an instant string given a valid date-time string.
* @param dateString A date-time string in the format yyyy-MM-dd HH:mm:ss[:SSS]
* @return A timeline instant
* @throws ParseException If we cannot parse the date string
*/
public static String getInstantForDateString(String dateString) {
try {
return getInstantFromTemporalAccessor(LocalDateTime.parse(dateString, MILLIS_GRANULARITY_DATE_FORMATTER));
} catch (Exception e) {
// Attempt to add the milliseconds in order to complete parsing
return getInstantFromTemporalAccessor(LocalDateTime.parse(
String.format("%s:%s", dateString, DEFAULT_MILLIS_EXT), MILLIS_GRANULARITY_DATE_FORMATTER));
}
} }
private static TemporalAccessor convertDateToTemporalAccessor(Date d) { private static TemporalAccessor convertDateToTemporalAccessor(Date d) {

View File

@@ -79,14 +79,14 @@ public class TestFSUtils extends HoodieCommonTestHarness {
@Test @Test
public void testMakeDataFileName() { public void testMakeDataFileName() {
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date()); String instantTime = HoodieActiveTimeline.formatDate(new Date());
String fileName = UUID.randomUUID().toString(); String fileName = UUID.randomUUID().toString();
assertEquals(FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName), fileName + "_" + TEST_WRITE_TOKEN + "_" + instantTime + BASE_FILE_EXTENSION); assertEquals(FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName), fileName + "_" + TEST_WRITE_TOKEN + "_" + instantTime + BASE_FILE_EXTENSION);
} }
@Test @Test
public void testMaskFileName() { public void testMaskFileName() {
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date()); String instantTime = HoodieActiveTimeline.formatDate(new Date());
int taskPartitionId = 2; int taskPartitionId = 2;
assertEquals(FSUtils.maskWithoutFileId(instantTime, taskPartitionId), "*_" + taskPartitionId + "_" + instantTime + BASE_FILE_EXTENSION); assertEquals(FSUtils.maskWithoutFileId(instantTime, taskPartitionId), "*_" + taskPartitionId + "_" + instantTime + BASE_FILE_EXTENSION);
} }
@@ -154,7 +154,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
@Test @Test
public void testGetCommitTime() { public void testGetCommitTime() {
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date()); String instantTime = HoodieActiveTimeline.formatDate(new Date());
String fileName = UUID.randomUUID().toString(); String fileName = UUID.randomUUID().toString();
String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName); String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName);
assertEquals(instantTime, FSUtils.getCommitTime(fullFileName)); assertEquals(instantTime, FSUtils.getCommitTime(fullFileName));
@@ -165,7 +165,7 @@ public class TestFSUtils extends HoodieCommonTestHarness {
@Test @Test
public void testGetFileNameWithoutMeta() { public void testGetFileNameWithoutMeta() {
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date()); String instantTime = HoodieActiveTimeline.formatDate(new Date());
String fileName = UUID.randomUUID().toString(); String fileName = UUID.randomUUID().toString();
String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName); String fullFileName = FSUtils.makeDataFileName(instantTime, TEST_WRITE_TOKEN, fileName);
assertEquals(fileName, FSUtils.getFileId(fullFileName)); assertEquals(fileName, FSUtils.getFileId(fullFileName));

View File

@@ -36,7 +36,7 @@ public class TestHoodieWriteStat {
@Test @Test
public void testSetPaths() { public void testSetPaths() {
String instantTime = HoodieActiveTimeline.formatInstantTime(new Date()); String instantTime = HoodieActiveTimeline.formatDate(new Date());
String basePathString = "/data/tables/some-hoodie-table"; String basePathString = "/data/tables/some-hoodie-table";
String partitionPathString = "2017/12/31"; String partitionPathString = "2017/12/31";
String fileName = UUID.randomUUID().toString(); String fileName = UUID.randomUUID().toString();

View File

@@ -444,7 +444,7 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
} }
// All zero timestamp can be parsed // All zero timestamp can be parsed
HoodieActiveTimeline.parseInstantTime("00000000000000"); HoodieActiveTimeline.parseDateFromInstantTime("00000000000000");
// Multiple thread test // Multiple thread test
final int numChecks = 100000; final int numChecks = 100000;
@@ -455,9 +455,9 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
for (int idx = 0; idx < numThreads; ++idx) { for (int idx = 0; idx < numThreads; ++idx) {
futures.add(executorService.submit(() -> { futures.add(executorService.submit(() -> {
Date date = new Date(System.currentTimeMillis() + (int)(Math.random() * numThreads) * milliSecondsInYear); Date date = new Date(System.currentTimeMillis() + (int)(Math.random() * numThreads) * milliSecondsInYear);
final String expectedFormat = HoodieActiveTimeline.formatInstantTime(date); final String expectedFormat = HoodieActiveTimeline.formatDate(date);
for (int tidx = 0; tidx < numChecks; ++tidx) { for (int tidx = 0; tidx < numChecks; ++tidx) {
final String curFormat = HoodieActiveTimeline.formatInstantTime(date); final String curFormat = HoodieActiveTimeline.formatDate(date);
if (!curFormat.equals(expectedFormat)) { if (!curFormat.equals(expectedFormat)) {
throw new HoodieException("Format error: expected=" + expectedFormat + ", curFormat=" + curFormat); throw new HoodieException("Format error: expected=" + expectedFormat + ", curFormat=" + curFormat);
} }
@@ -476,16 +476,37 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
@Test @Test
public void testMetadataCompactionInstantDateParsing() throws ParseException { public void testMetadataCompactionInstantDateParsing() throws ParseException {
// default second granularity instant ID // default second granularity instant ID
String secondGranularityInstant = "20210101120101"; String secondGranularityInstant = "20210101120101123";
Date defaultSecsGranularityDate = HoodieActiveTimeline.parseInstantTime(secondGranularityInstant); Date defaultSecsGranularityDate = HoodieActiveTimeline.parseDateFromInstantTime(secondGranularityInstant);
// metadata table compaction/cleaning : ms granularity instant ID // metadata table compaction/cleaning : ms granularity instant ID
String compactionInstant = secondGranularityInstant + "001"; String compactionInstant = secondGranularityInstant + "001";
Date msGranularityDate = HoodieActiveTimeline.parseInstantTime(compactionInstant); Date defaultMsGranularityDate = HoodieActiveTimeline.parseDateFromInstantTime(compactionInstant);
assertEquals(0, msGranularityDate.getTime() - defaultSecsGranularityDate.getTime(), "Expected the ms part to be 0"); assertEquals(0, defaultMsGranularityDate.getTime() - defaultSecsGranularityDate.getTime(), "Expected the ms part to be 0");
assertTrue(HoodieTimeline.compareTimestamps(secondGranularityInstant, HoodieTimeline.LESSER_THAN, compactionInstant)); assertTrue(HoodieTimeline.compareTimestamps(secondGranularityInstant, HoodieTimeline.LESSER_THAN, compactionInstant));
assertTrue(HoodieTimeline.compareTimestamps(compactionInstant, HoodieTimeline.GREATER_THAN, secondGranularityInstant)); assertTrue(HoodieTimeline.compareTimestamps(compactionInstant, HoodieTimeline.GREATER_THAN, secondGranularityInstant));
} }
@Test
public void testMillisGranularityInstantDateParsing() throws ParseException {
// Old second granularity instant ID
String secondGranularityInstant = "20210101120101";
Date defaultMsGranularityDate = HoodieActiveTimeline.parseDateFromInstantTime(secondGranularityInstant);
// New ms granularity instant ID
String specificMsGranularityInstant = secondGranularityInstant + "009";
Date msGranularityDate = HoodieActiveTimeline.parseDateFromInstantTime(specificMsGranularityInstant);
assertEquals(999, defaultMsGranularityDate.getTime() % 1000, "Expected the ms part to be 999");
assertEquals(9, msGranularityDate.getTime() % 1000, "Expected the ms part to be 9");
// Ensure that any date math which expects second granularity still works
String laterDateInstant = "20210101120111"; // + 10 seconds from original instant
assertEquals(
10,
HoodieActiveTimeline.parseDateFromInstantTime(laterDateInstant).getTime() / 1000
- HoodieActiveTimeline.parseDateFromInstantTime(secondGranularityInstant).getTime() / 1000,
"Expected the difference between later instant and previous instant to be 10 seconds"
);
}
/** /**
* Returns an exhaustive list of all possible HoodieInstant. * Returns an exhaustive list of all possible HoodieInstant.
* @return list of HoodieInstant * @return list of HoodieInstant

View File

@@ -148,7 +148,7 @@ public class HoodieTestTable {
} }
public static String makeNewCommitTime(Instant dateTime) { public static String makeNewCommitTime(Instant dateTime) {
return HoodieActiveTimeline.formatInstantTime(Date.from(dateTime)); return HoodieActiveTimeline.formatDate(Date.from(dateTime));
} }
public static List<String> makeIncrementalCommitTimes(int num) { public static List<String> makeIncrementalCommitTimes(int num) {

View File

@@ -403,12 +403,12 @@ public class StreamerUtil {
*/ */
public static Option<String> medianInstantTime(String highVal, String lowVal) { public static Option<String> medianInstantTime(String highVal, String lowVal) {
try { try {
long high = HoodieActiveTimeline.parseInstantTime(highVal).getTime(); long high = HoodieActiveTimeline.parseDateFromInstantTime(highVal).getTime();
long low = HoodieActiveTimeline.parseInstantTime(lowVal).getTime(); long low = HoodieActiveTimeline.parseDateFromInstantTime(lowVal).getTime();
ValidationUtils.checkArgument(high > low, ValidationUtils.checkArgument(high > low,
"Instant [" + highVal + "] should have newer timestamp than instant [" + lowVal + "]"); "Instant [" + highVal + "] should have newer timestamp than instant [" + lowVal + "]");
long median = low + (high - low) / 2; long median = low + (high - low) / 2;
final String instantTime = HoodieActiveTimeline.formatInstantTime(new Date(median)); final String instantTime = HoodieActiveTimeline.formatDate(new Date(median));
if (HoodieTimeline.compareTimestamps(lowVal, HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime) if (HoodieTimeline.compareTimestamps(lowVal, HoodieTimeline.GREATER_THAN_OR_EQUALS, instantTime)
|| HoodieTimeline.compareTimestamps(highVal, HoodieTimeline.LESSER_THAN_OR_EQUALS, instantTime)) { || HoodieTimeline.compareTimestamps(highVal, HoodieTimeline.LESSER_THAN_OR_EQUALS, instantTime)) {
return Option.empty(); return Option.empty();
@@ -424,8 +424,8 @@ public class StreamerUtil {
*/ */
public static long instantTimeDiffSeconds(String newInstantTime, String oldInstantTime) { public static long instantTimeDiffSeconds(String newInstantTime, String oldInstantTime) {
try { try {
long newTimestamp = HoodieActiveTimeline.parseInstantTime(newInstantTime).getTime(); long newTimestamp = HoodieActiveTimeline.parseDateFromInstantTime(newInstantTime).getTime();
long oldTimestamp = HoodieActiveTimeline.parseInstantTime(oldInstantTime).getTime(); long oldTimestamp = HoodieActiveTimeline.parseDateFromInstantTime(oldInstantTime).getTime();
return (newTimestamp - oldTimestamp) / 1000; return (newTimestamp - oldTimestamp) / 1000;
} catch (ParseException e) { } catch (ParseException e) {
throw new HoodieException("Get instant time diff with interval [" + oldInstantTime + ", " + newInstantTime + "] error", e); throw new HoodieException("Get instant time diff with interval [" + oldInstantTime + ", " + newInstantTime + "] error", e);

View File

@@ -80,8 +80,9 @@ public class TestStreamerUtil {
void testMedianInstantTime() { void testMedianInstantTime() {
String higher = "20210705125921"; String higher = "20210705125921";
String lower = "20210705125806"; String lower = "20210705125806";
String expectedMedianInstant = "20210705125844499";
String median1 = StreamerUtil.medianInstantTime(higher, lower).get(); String median1 = StreamerUtil.medianInstantTime(higher, lower).get();
assertThat(median1, is("20210705125843")); assertThat(median1, is(expectedMedianInstant));
// test symmetry // test symmetry
assertThrows(IllegalArgumentException.class, assertThrows(IllegalArgumentException.class,
() -> StreamerUtil.medianInstantTime(lower, higher), () -> StreamerUtil.medianInstantTime(lower, higher),

View File

@@ -20,7 +20,6 @@ package org.apache.spark.sql.hudi
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import java.net.URI import java.net.URI
import java.util.{Date, Locale, Properties} import java.util.{Date, Locale, Properties}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.Path
@@ -31,8 +30,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstantTimeGenerator}
import org.apache.spark.SPARK_VERSION import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql.{Column, DataFrame, SparkSession} import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.TableIdentifier
@@ -50,7 +48,6 @@ import java.text.SimpleDateFormat
import scala.collection.immutable.Map import scala.collection.immutable.Map
object HoodieSqlUtils extends SparkAdapterSupport { object HoodieSqlUtils extends SparkAdapterSupport {
private val defaultDateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
private val defaultDateFormat = new SimpleDateFormat("yyyy-MM-dd") private val defaultDateFormat = new SimpleDateFormat("yyyy-MM-dd")
def isHoodieTable(table: CatalogTable): Boolean = { def isHoodieTable(table: CatalogTable): Boolean = {
@@ -293,13 +290,15 @@ object HoodieSqlUtils extends SparkAdapterSupport {
* 3、yyyyMMddHHmmss * 3、yyyyMMddHHmmss
*/ */
def formatQueryInstant(queryInstant: String): String = { def formatQueryInstant(queryInstant: String): String = {
if (queryInstant.length == 19) { // for yyyy-MM-dd HH:mm:ss val instantLength = queryInstant.length
HoodieActiveTimeline.formatInstantTime(defaultDateTimeFormat.parse(queryInstant)) if (instantLength == 19 || instantLength == 23) { // for yyyy-MM-dd HH:mm:ss[.SSS]
} else if (queryInstant.length == 14) { // for yyyyMMddHHmmss HoodieInstantTimeGenerator.getInstantForDateString(queryInstant)
HoodieActiveTimeline.parseInstantTime(queryInstant) // validate the format } else if (instantLength == HoodieInstantTimeGenerator.SECS_INSTANT_TIMESTAMP_FORMAT
|| instantLength == HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH) { // for yyyyMMddHHmmss[SSS]
HoodieActiveTimeline.parseDateFromInstantTime(queryInstant) // validate the format
queryInstant queryInstant
} else if (queryInstant.length == 10) { // for yyyy-MM-dd } else if (instantLength == 10) { // for yyyy-MM-dd
HoodieActiveTimeline.formatInstantTime(defaultDateFormat.parse(queryInstant)) HoodieActiveTimeline.formatDate(defaultDateFormat.parse(queryInstant))
} else { } else {
throw new IllegalArgumentException(s"Unsupported query instant time format: $queryInstant," throw new IllegalArgumentException(s"Unsupported query instant time format: $queryInstant,"
+ s"Supported time format are: 'yyyy-MM-dd: HH:mm:ss' or 'yyyy-MM-dd' or 'yyyyMMddHHmmss'") + s"Supported time format are: 'yyyy-MM-dd: HH:mm:ss' or 'yyyy-MM-dd' or 'yyyyMMddHHmmss'")

View File

@@ -179,10 +179,10 @@ class HoodieStreamSource(
startOffset match { startOffset match {
case INIT_OFFSET => startOffset.commitTime case INIT_OFFSET => startOffset.commitTime
case HoodieSourceOffset(commitTime) => case HoodieSourceOffset(commitTime) =>
val time = HoodieActiveTimeline.parseInstantTime(commitTime).getTime val time = HoodieActiveTimeline.parseDateFromInstantTime(commitTime).getTime
// As we consume the data between (start, end], start is not included, // As we consume the data between (start, end], start is not included,
// so we +1s to the start commit time here. // so we +1s to the start commit time here.
HoodieActiveTimeline.formatInstantTime(new Date(time + 1000)) HoodieActiveTimeline.formatDate(new Date(time + 1000))
case _=> throw new IllegalStateException("UnKnow offset type.") case _=> throw new IllegalStateException("UnKnow offset type.")
} }
} }

View File

@@ -218,13 +218,13 @@ class TestTimeTravelQuery extends HoodieClientTestBase {
} }
private def defaultDateTimeFormat(queryInstant: String): String = { private def defaultDateTimeFormat(queryInstant: String): String = {
val date = HoodieActiveTimeline.parseInstantTime(queryInstant) val date = HoodieActiveTimeline.parseDateFromInstantTime(queryInstant)
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
format.format(date) format.format(date)
} }
private def defaultDateFormat(queryInstant: String): String = { private def defaultDateFormat(queryInstant: String): String = {
val date = HoodieActiveTimeline.parseInstantTime(queryInstant) val date = HoodieActiveTimeline.parseDateFromInstantTime(queryInstant)
val format = new SimpleDateFormat("yyyy-MM-dd") val format = new SimpleDateFormat("yyyy-MM-dd")
format.format(date) format.format(date)
} }

View File

@@ -241,7 +241,7 @@ public class HoodieClusteringJob {
HoodieTimeline inflightHoodieTimeline = table.getActiveTimeline().filterPendingReplaceTimeline().filterInflights(); HoodieTimeline inflightHoodieTimeline = table.getActiveTimeline().filterPendingReplaceTimeline().filterInflights();
if (!inflightHoodieTimeline.empty()) { if (!inflightHoodieTimeline.empty()) {
HoodieInstant inflightClusteringInstant = inflightHoodieTimeline.lastInstant().get(); HoodieInstant inflightClusteringInstant = inflightHoodieTimeline.lastInstant().get();
Date clusteringStartTime = HoodieActiveTimeline.parseInstantTime(inflightClusteringInstant.getTimestamp()); Date clusteringStartTime = HoodieActiveTimeline.parseDateFromInstantTime(inflightClusteringInstant.getTimestamp());
if (clusteringStartTime.getTime() + cfg.maxProcessingTimeMs < System.currentTimeMillis()) { if (clusteringStartTime.getTime() + cfg.maxProcessingTimeMs < System.currentTimeMillis()) {
// if there has failed clustering, then we will use the failed clustering instant-time to trigger next clustering action which will rollback and clustering. // if there has failed clustering, then we will use the failed clustering instant-time to trigger next clustering action which will rollback and clustering.
LOG.info("Found failed clustering instant at : " + inflightClusteringInstant + "; Will rollback the failed clustering and re-trigger again."); LOG.info("Found failed clustering instant at : " + inflightClusteringInstant + "; Will rollback the failed clustering and re-trigger again.");

View File

@@ -230,7 +230,7 @@ public class TestHDFSParquetImporter extends FunctionalTestHarness implements Se
public List<GenericRecord> createInsertRecords(Path srcFolder) throws ParseException, IOException { public List<GenericRecord> createInsertRecords(Path srcFolder) throws ParseException, IOException {
Path srcFile = new Path(srcFolder.toString(), "file1.parquet"); Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
long startTime = HoodieActiveTimeline.parseInstantTime("20170203000000").getTime() / 1000; long startTime = HoodieActiveTimeline.parseDateFromInstantTime("20170203000000").getTime() / 1000;
List<GenericRecord> records = new ArrayList<GenericRecord>(); List<GenericRecord> records = new ArrayList<GenericRecord>();
for (long recordNum = 0; recordNum < 96; recordNum++) { for (long recordNum = 0; recordNum < 96; recordNum++) {
records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "0", "rider-" + recordNum, records.add(HoodieTestDataGenerator.generateGenericRecord(Long.toString(recordNum), "0", "rider-" + recordNum,
@@ -247,7 +247,7 @@ public class TestHDFSParquetImporter extends FunctionalTestHarness implements Se
public List<GenericRecord> createUpsertRecords(Path srcFolder) throws ParseException, IOException { public List<GenericRecord> createUpsertRecords(Path srcFolder) throws ParseException, IOException {
Path srcFile = new Path(srcFolder.toString(), "file1.parquet"); Path srcFile = new Path(srcFolder.toString(), "file1.parquet");
long startTime = HoodieActiveTimeline.parseInstantTime("20170203000000").getTime() / 1000; long startTime = HoodieActiveTimeline.parseDateFromInstantTime("20170203000000").getTime() / 1000;
List<GenericRecord> records = new ArrayList<GenericRecord>(); List<GenericRecord> records = new ArrayList<GenericRecord>();
// 10 for update // 10 for update
for (long recordNum = 0; recordNum < 11; recordNum++) { for (long recordNum = 0; recordNum < 11; recordNum++) {