[HUDI-1661] Exclude clustering commits from getExtraMetadataFromLatest API (#2632)
This commit is contained in:
@@ -22,9 +22,12 @@ import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
|||||||
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
|
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
|
||||||
|
import org.apache.hudi.common.model.WriteOperationType;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
@@ -43,6 +46,7 @@ import java.util.stream.Stream;
|
|||||||
* 2) Incremental reads - InputFormats can use this API to query
|
* 2) Incremental reads - InputFormats can use this API to query
|
||||||
*/
|
*/
|
||||||
public class TimelineUtils {
|
public class TimelineUtils {
|
||||||
|
private static final Logger LOG = LogManager.getLogger(TimelineUtils.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns partitions that have new data strictly after commitTime.
|
* Returns partitions that have new data strictly after commitTime.
|
||||||
@@ -117,10 +121,24 @@ public class TimelineUtils {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get extra metadata for specified key from latest commit/deltacommit instant.
|
* Get extra metadata for specified key from latest commit/deltacommit/replacecommit(eg. insert_overwrite) instant.
|
||||||
*/
|
*/
|
||||||
public static Option<String> getExtraMetadataFromLatest(HoodieTableMetaClient metaClient, String extraMetadataKey) {
|
public static Option<String> getExtraMetadataFromLatest(HoodieTableMetaClient metaClient, String extraMetadataKey) {
|
||||||
return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants().findFirst().map(instant ->
|
return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants()
|
||||||
|
// exclude clustering commits for returning user stored extra metadata
|
||||||
|
.filter(instant -> !isClusteringCommit(metaClient, instant))
|
||||||
|
.findFirst().map(instant ->
|
||||||
|
getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get extra metadata for specified key from latest commit/deltacommit/replacecommit instant including internal commits
|
||||||
|
* such as clustering.
|
||||||
|
*/
|
||||||
|
public static Option<String> getExtraMetadataFromLatestIncludeClustering(HoodieTableMetaClient metaClient, String extraMetadataKey) {
|
||||||
|
return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants()
|
||||||
|
.findFirst().map(instant ->
|
||||||
getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty());
|
getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -134,6 +152,7 @@ public class TimelineUtils {
|
|||||||
|
|
||||||
private static Option<String> getMetadataValue(HoodieTableMetaClient metaClient, String extraMetadataKey, HoodieInstant instant) {
|
private static Option<String> getMetadataValue(HoodieTableMetaClient metaClient, String extraMetadataKey, HoodieInstant instant) {
|
||||||
try {
|
try {
|
||||||
|
LOG.info("reading checkpoint info for:" + instant + " key: " + extraMetadataKey);
|
||||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
|
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
|
||||||
metaClient.getCommitsTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
|
metaClient.getCommitsTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
|
||||||
|
|
||||||
@@ -142,4 +161,20 @@ public class TimelineUtils {
|
|||||||
throw new HoodieIOException("Unable to parse instant metadata " + instant, e);
|
throw new HoodieIOException("Unable to parse instant metadata " + instant, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static boolean isClusteringCommit(HoodieTableMetaClient metaClient, HoodieInstant instant) {
|
||||||
|
try {
|
||||||
|
if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
|
||||||
|
// replacecommit is used for multiple operations: insert_overwrite/cluster etc.
|
||||||
|
// Check operation type to see if this instant is related to clustering.
|
||||||
|
HoodieReplaceCommitMetadata replaceMetadata = HoodieReplaceCommitMetadata.fromBytes(
|
||||||
|
metaClient.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
|
||||||
|
return WriteOperationType.CLUSTER.equals(replaceMetadata.getOperationType());
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieIOException("Unable to read instant information: " + instant + " for " + metaClient.getBasePath(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieCleaningPolicy;
|
|||||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
|
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||||
|
import org.apache.hudi.common.model.WriteOperationType;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
@@ -73,7 +74,8 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
|
|||||||
activeTimeline.createNewInstant(instant1);
|
activeTimeline.createNewInstant(instant1);
|
||||||
// create replace metadata only with replaced file Ids (no new files created)
|
// create replace metadata only with replaced file Ids (no new files created)
|
||||||
activeTimeline.saveAsComplete(instant1,
|
activeTimeline.saveAsComplete(instant1,
|
||||||
Option.of(getReplaceCommitMetadata(basePath, ts1, replacePartition, 2, newFilePartition, 0, Collections.emptyMap())));
|
Option.of(getReplaceCommitMetadata(basePath, ts1, replacePartition, 2,
|
||||||
|
newFilePartition, 0, Collections.emptyMap(), WriteOperationType.CLUSTER)));
|
||||||
metaClient.reloadActiveTimeline();
|
metaClient.reloadActiveTimeline();
|
||||||
|
|
||||||
List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("0", 10));
|
List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("0", 10));
|
||||||
@@ -85,7 +87,8 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
|
|||||||
activeTimeline.createNewInstant(instant2);
|
activeTimeline.createNewInstant(instant2);
|
||||||
// create replace metadata only with replaced file Ids (no new files created)
|
// create replace metadata only with replaced file Ids (no new files created)
|
||||||
activeTimeline.saveAsComplete(instant2,
|
activeTimeline.saveAsComplete(instant2,
|
||||||
Option.of(getReplaceCommitMetadata(basePath, ts2, replacePartition, 0, newFilePartition, 3, Collections.emptyMap())));
|
Option.of(getReplaceCommitMetadata(basePath, ts2, replacePartition, 0,
|
||||||
|
newFilePartition, 3, Collections.emptyMap(), WriteOperationType.CLUSTER)));
|
||||||
metaClient.reloadActiveTimeline();
|
metaClient.reloadActiveTimeline();
|
||||||
partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
|
partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
|
||||||
assertEquals(1, partitions.size());
|
assertEquals(1, partitions.size());
|
||||||
@@ -211,16 +214,42 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
|
|||||||
metaClient.reloadActiveTimeline();
|
metaClient.reloadActiveTimeline();
|
||||||
|
|
||||||
// verify modified partitions included cleaned data
|
// verify modified partitions included cleaned data
|
||||||
Option<String> extraLatestValue = TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey);
|
verifyExtraMetadataLatestValue(extraMetadataKey, extraMetadataValue1, false);
|
||||||
assertTrue(extraLatestValue.isPresent());
|
assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, "unknownKey").isPresent());
|
||||||
assertEquals(extraMetadataValue1, extraLatestValue.get());
|
|
||||||
|
// verify adding clustering commit doesnt change behavior of getExtraMetadataFromLatest
|
||||||
|
String ts2 = "2";
|
||||||
|
HoodieInstant instant2 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, ts2);
|
||||||
|
activeTimeline.createNewInstant(instant2);
|
||||||
|
String newValueForMetadata = "newValue2";
|
||||||
|
extraMetadata.put(extraMetadataKey, newValueForMetadata);
|
||||||
|
activeTimeline.saveAsComplete(instant2,
|
||||||
|
Option.of(getReplaceCommitMetadata(basePath, ts2, "p2", 0,
|
||||||
|
"p2", 3, extraMetadata, WriteOperationType.CLUSTER)));
|
||||||
|
metaClient.reloadActiveTimeline();
|
||||||
|
|
||||||
|
verifyExtraMetadataLatestValue(extraMetadataKey, extraMetadataValue1, false);
|
||||||
|
verifyExtraMetadataLatestValue(extraMetadataKey, newValueForMetadata, true);
|
||||||
assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, "unknownKey").isPresent());
|
assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, "unknownKey").isPresent());
|
||||||
|
|
||||||
Map<String, Option<String>> extraMetadataEntries = TimelineUtils.getAllExtraMetadataForKey(metaClient, extraMetadataKey);
|
Map<String, Option<String>> extraMetadataEntries = TimelineUtils.getAllExtraMetadataForKey(metaClient, extraMetadataKey);
|
||||||
assertEquals(2, extraMetadataEntries.size());
|
assertEquals(3, extraMetadataEntries.size());
|
||||||
assertFalse(extraMetadataEntries.get("0").isPresent());
|
assertFalse(extraMetadataEntries.get("0").isPresent());
|
||||||
assertTrue(extraMetadataEntries.get("1").isPresent());
|
assertTrue(extraMetadataEntries.get("1").isPresent());
|
||||||
assertEquals(extraMetadataValue1, extraMetadataEntries.get("1").get());
|
assertEquals(extraMetadataValue1, extraMetadataEntries.get("1").get());
|
||||||
|
assertTrue(extraMetadataEntries.get("2").isPresent());
|
||||||
|
assertEquals(newValueForMetadata, extraMetadataEntries.get("2").get());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyExtraMetadataLatestValue(String extraMetadataKey, String expected, boolean includeClustering) {
|
||||||
|
final Option<String> extraLatestValue;
|
||||||
|
if (includeClustering) {
|
||||||
|
extraLatestValue = TimelineUtils.getExtraMetadataFromLatestIncludeClustering(metaClient, extraMetadataKey);
|
||||||
|
} else {
|
||||||
|
extraLatestValue = TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey);
|
||||||
|
}
|
||||||
|
assertTrue(extraLatestValue.isPresent());
|
||||||
|
assertEquals(expected, extraLatestValue.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
private byte[] getRestoreMetadata(String basePath, String partition, String commitTs, int count, String actionType) throws IOException {
|
private byte[] getRestoreMetadata(String basePath, String partition, String commitTs, int count, String actionType) throws IOException {
|
||||||
@@ -265,9 +294,11 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private byte[] getReplaceCommitMetadata(String basePath, String commitTs, String replacePartition, int replaceCount,
|
private byte[] getReplaceCommitMetadata(String basePath, String commitTs, String replacePartition, int replaceCount,
|
||||||
String newFilePartition, int newFileCount, Map<String, String> extraMetadata)
|
String newFilePartition, int newFileCount, Map<String, String> extraMetadata,
|
||||||
|
WriteOperationType operationType)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HoodieReplaceCommitMetadata commit = new HoodieReplaceCommitMetadata();
|
HoodieReplaceCommitMetadata commit = new HoodieReplaceCommitMetadata();
|
||||||
|
commit.setOperationType(operationType);
|
||||||
for (int i = 1; i <= newFileCount; i++) {
|
for (int i = 1; i <= newFileCount; i++) {
|
||||||
HoodieWriteStat stat = new HoodieWriteStat();
|
HoodieWriteStat stat = new HoodieWriteStat();
|
||||||
stat.setFileId(i + "");
|
stat.setFileId(i + "");
|
||||||
|
|||||||
Reference in New Issue
Block a user