[HUDI-2744] Fix parsing of metadadata table compaction timestamp when metrics are enabled (#3976)
This commit is contained in:
committed by
GitHub
parent
3c4319729c
commit
53d2d6ae24
@@ -223,7 +223,7 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
|||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource("bootstrapAndTableOperationTestArgs")
|
@MethodSource("bootstrapAndTableOperationTestArgs")
|
||||||
public void testTableOperations(HoodieTableType tableType, boolean enableFullScan) throws Exception {
|
public void testTableOperations(HoodieTableType tableType, boolean enableFullScan) throws Exception {
|
||||||
init(tableType, true, enableFullScan);
|
init(tableType, true, enableFullScan, false);
|
||||||
doWriteInsertAndUpsert(testTable);
|
doWriteInsertAndUpsert(testTable);
|
||||||
|
|
||||||
// trigger an upsert
|
// trigger an upsert
|
||||||
@@ -462,27 +462,43 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase {
|
|||||||
validateMetadata(testTable, emptyList(), true);
|
validateMetadata(testTable, emptyList(), true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetches next commit time in seconds from current one.
|
||||||
|
*
|
||||||
|
* @param curCommitTime current commit time.
|
||||||
|
* @return the next valid commit time.
|
||||||
|
*/
|
||||||
|
private Long getNextCommitTime(long curCommitTime) {
|
||||||
|
if ((curCommitTime + 1) % 1000000000000L >= 60) { // max seconds is 60 and hence
|
||||||
|
return Long.parseLong(HoodieActiveTimeline.createNewInstantTime());
|
||||||
|
} else {
|
||||||
|
return curCommitTime + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@EnumSource(HoodieTableType.class)
|
@EnumSource(HoodieTableType.class)
|
||||||
public void testMetadataBootstrapLargeCommitList(HoodieTableType tableType) throws Exception {
|
public void testMetadataBootstrapLargeCommitList(HoodieTableType tableType) throws Exception {
|
||||||
init(tableType);
|
init(tableType, true, true, true);
|
||||||
|
long baseCommitTime = Long.parseLong(HoodieActiveTimeline.createNewInstantTime());
|
||||||
for (int i = 1; i < 25; i += 7) {
|
for (int i = 1; i < 25; i += 7) {
|
||||||
String commitTime1 = ((i > 9) ? ("00000") : ("000000")) + i;
|
long commitTime1 = getNextCommitTime(baseCommitTime);
|
||||||
String commitTime2 = ((i > 9) ? ("00000") : ("000000")) + (i + 1);
|
long commitTime2 = getNextCommitTime(commitTime1);
|
||||||
String commitTime3 = ((i > 9) ? ("00000") : ("000000")) + (i + 2);
|
long commitTime3 = getNextCommitTime(commitTime2);
|
||||||
String commitTime4 = ((i > 9) ? ("00000") : ("000000")) + (i + 3);
|
long commitTime4 = getNextCommitTime(commitTime3);
|
||||||
String commitTime5 = ((i > 9) ? ("00000") : ("000000")) + (i + 4);
|
long commitTime5 = getNextCommitTime(commitTime4);
|
||||||
String commitTime6 = ((i > 9) ? ("00000") : ("000000")) + (i + 5);
|
long commitTime6 = getNextCommitTime(commitTime5);
|
||||||
String commitTime7 = ((i > 9) ? ("00000") : ("000000")) + (i + 6);
|
long commitTime7 = getNextCommitTime(commitTime6);
|
||||||
doWriteOperation(testTable, commitTime1, INSERT);
|
baseCommitTime = commitTime7;
|
||||||
doWriteOperation(testTable, commitTime2);
|
doWriteOperation(testTable, Long.toString(commitTime1), INSERT);
|
||||||
doClean(testTable, commitTime3, Arrays.asList(commitTime1));
|
doWriteOperation(testTable, Long.toString(commitTime2));
|
||||||
doWriteOperation(testTable, commitTime4);
|
doClean(testTable, Long.toString(commitTime3), Arrays.asList(Long.toString(commitTime1)));
|
||||||
|
doWriteOperation(testTable, Long.toString(commitTime4));
|
||||||
if (tableType == MERGE_ON_READ) {
|
if (tableType == MERGE_ON_READ) {
|
||||||
doCompaction(testTable, commitTime5);
|
doCompaction(testTable, Long.toString(commitTime5));
|
||||||
}
|
}
|
||||||
doWriteOperation(testTable, commitTime6);
|
doWriteOperation(testTable, Long.toString(commitTime6));
|
||||||
doRollback(testTable, commitTime6, commitTime7);
|
doRollback(testTable, Long.toString(commitTime6), Long.toString(commitTime7));
|
||||||
}
|
}
|
||||||
validateMetadata(testTable, emptyList(), true);
|
validateMetadata(testTable, emptyList(), true);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -75,10 +75,10 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void init(HoodieTableType tableType, boolean enableMetadataTable) throws IOException {
|
public void init(HoodieTableType tableType, boolean enableMetadataTable) throws IOException {
|
||||||
init(tableType, enableMetadataTable, true);
|
init(tableType, enableMetadataTable, true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableFullScan) throws IOException {
|
public void init(HoodieTableType tableType, boolean enableMetadataTable, boolean enableFullScan, boolean enableMetrics) throws IOException {
|
||||||
this.tableType = tableType;
|
this.tableType = tableType;
|
||||||
initPath();
|
initPath();
|
||||||
initSparkContexts("TestHoodieMetadata");
|
initSparkContexts("TestHoodieMetadata");
|
||||||
@@ -87,8 +87,7 @@ public class TestHoodieMetadataBase extends HoodieClientTestHarness {
|
|||||||
initMetaClient(tableType);
|
initMetaClient(tableType);
|
||||||
initTestDataGenerator();
|
initTestDataGenerator();
|
||||||
metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
|
metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
|
||||||
writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, enableMetadataTable, false,
|
writeConfig = getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, enableMetadataTable, enableMetrics, enableFullScan).build();
|
||||||
enableFullScan).build();
|
|
||||||
initWriteConfigAndMetatableWriter(writeConfig, enableMetadataTable);
|
initWriteConfigAndMetatableWriter(writeConfig, enableMetadataTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.common.table.timeline;
|
package org.apache.hudi.common.table.timeline;
|
||||||
|
|
||||||
|
import java.text.ParseException;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.time.LocalDateTime;
|
import java.time.LocalDateTime;
|
||||||
import java.time.ZoneId;
|
import java.time.ZoneId;
|
||||||
@@ -33,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
|
|||||||
public class HoodieInstantTimeGenerator {
|
public class HoodieInstantTimeGenerator {
|
||||||
// Format of the timestamp used for an Instant
|
// Format of the timestamp used for an Instant
|
||||||
private static final String INSTANT_TIMESTAMP_FORMAT = "yyyyMMddHHmmss";
|
private static final String INSTANT_TIMESTAMP_FORMAT = "yyyyMMddHHmmss";
|
||||||
|
private static final int INSTANT_TIMESTAMP_FORMAT_LENGTH = INSTANT_TIMESTAMP_FORMAT.length();
|
||||||
// Formatter to generate Instant timestamps
|
// Formatter to generate Instant timestamps
|
||||||
private static DateTimeFormatter INSTANT_TIME_FORMATTER = DateTimeFormatter.ofPattern(INSTANT_TIMESTAMP_FORMAT);
|
private static DateTimeFormatter INSTANT_TIME_FORMATTER = DateTimeFormatter.ofPattern(INSTANT_TIMESTAMP_FORMAT);
|
||||||
// The last Instant timestamp generated
|
// The last Instant timestamp generated
|
||||||
@@ -56,7 +58,7 @@ public class HoodieInstantTimeGenerator {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Date parseInstantTime(String timestamp) {
|
public static Date parseInstantTime(String timestamp) throws ParseException {
|
||||||
try {
|
try {
|
||||||
LocalDateTime dt = LocalDateTime.parse(timestamp, INSTANT_TIME_FORMATTER);
|
LocalDateTime dt = LocalDateTime.parse(timestamp, INSTANT_TIME_FORMATTER);
|
||||||
return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant());
|
return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant());
|
||||||
@@ -65,7 +67,11 @@ public class HoodieInstantTimeGenerator {
|
|||||||
if (timestamp.equals(ALL_ZERO_TIMESTAMP)) {
|
if (timestamp.equals(ALL_ZERO_TIMESTAMP)) {
|
||||||
return new Date(0);
|
return new Date(0);
|
||||||
}
|
}
|
||||||
|
// compaction and cleaning in metadata has special format. handling it by trimming extra chars and treating it with secs granularity
|
||||||
|
if (timestamp.length() > INSTANT_TIMESTAMP_FORMAT_LENGTH) {
|
||||||
|
LocalDateTime dt = LocalDateTime.parse(timestamp.substring(0, INSTANT_TIMESTAMP_FORMAT_LENGTH), INSTANT_TIME_FORMATTER);
|
||||||
|
return Date.from(dt.atZone(ZoneId.systemDefault()).toInstant());
|
||||||
|
}
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -184,8 +184,9 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
|
|||||||
HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
|
HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
|
||||||
// Retrieve record from base file
|
// Retrieve record from base file
|
||||||
if (baseFileReader != null) {
|
if (baseFileReader != null) {
|
||||||
HoodieTimer readTimer = new HoodieTimer().startTimer();
|
HoodieTimer readTimer = new HoodieTimer();
|
||||||
for (String key : keys) {
|
for (String key : keys) {
|
||||||
|
readTimer.startTimer();
|
||||||
Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
|
Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
|
||||||
if (baseRecord.isPresent()) {
|
if (baseRecord.isPresent()) {
|
||||||
hoodieRecord = metadataTableConfig.populateMetaFields()
|
hoodieRecord = metadataTableConfig.populateMetaFields()
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.BeforeEach;
|
|||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.text.ParseException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
@@ -472,6 +473,19 @@ public class TestHoodieActiveTimeline extends HoodieCommonTestHarness {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMetadataCompactionInstantDateParsing() throws ParseException {
|
||||||
|
// default second granularity instant ID
|
||||||
|
String secondGranularityInstant = "20210101120101";
|
||||||
|
Date defaultSecsGranularityDate = HoodieActiveTimeline.parseInstantTime(secondGranularityInstant);
|
||||||
|
// metadata table compaction/cleaning : ms granularity instant ID
|
||||||
|
String compactionInstant = secondGranularityInstant + "001";
|
||||||
|
Date msGranularityDate = HoodieActiveTimeline.parseInstantTime(compactionInstant);
|
||||||
|
assertEquals(0, msGranularityDate.getTime() - defaultSecsGranularityDate.getTime(), "Expected the ms part to be 0");
|
||||||
|
assertTrue(HoodieTimeline.compareTimestamps(secondGranularityInstant, HoodieTimeline.LESSER_THAN, compactionInstant));
|
||||||
|
assertTrue(HoodieTimeline.compareTimestamps(compactionInstant, HoodieTimeline.GREATER_THAN, secondGranularityInstant));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns an exhaustive list of all possible HoodieInstant.
|
* Returns an exhaustive list of all possible HoodieInstant.
|
||||||
* @return list of HoodieInstant
|
* @return list of HoodieInstant
|
||||||
|
|||||||
Reference in New Issue
Block a user