diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java index fd30ee35c..95a2ae618 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineUtils.java @@ -21,11 +21,14 @@ package org.apache.hudi.common.table.timeline; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieIOException; import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -98,4 +101,31 @@ public class TimelineUtils { }).distinct().filter(s -> !s.isEmpty()).collect(Collectors.toList()); } + + /** + * Get extra metadata for specified key from latest commit/deltacommit instant. + */ + public static Option getExtraMetadataFromLatest(HoodieTableMetaClient metaClient, String extraMetadataKey) { + return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants().findFirst().map(instant -> + getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty()); + } + + /** + * Get extra metadata for specified key from all active commit/deltacommit instants. + */ + public static Map> getAllExtraMetadataForKey(HoodieTableMetaClient metaClient, String extraMetadataKey) { + return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants().collect(Collectors.toMap( + HoodieInstant::getTimestamp, instant -> getMetadataValue(metaClient, extraMetadataKey, instant))); + } + + private static Option getMetadataValue(HoodieTableMetaClient metaClient, String extraMetadataKey, HoodieInstant instant) { + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( + metaClient.getCommitsTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class); + + return Option.ofNullable(commitMetadata.getExtraMetadata().get(extraMetadataKey)); + } catch (IOException e) { + throw new HoodieIOException("Unable to parse instant metadata " + instant, e); + } + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java index 1a1ac5461..2ea418c48 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTimelineUtils.java @@ -48,6 +48,7 @@ import java.util.List; import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestTimelineUtils extends HoodieCommonTestHarness { @@ -68,7 +69,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { String ts = i + ""; HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts); activeTimeline.createNewInstant(instant); - activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2))); + activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap()))); HoodieInstant cleanInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, ts); activeTimeline.createNewInstant(cleanInstant); @@ -107,7 +108,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { String ts = i + ""; HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts); activeTimeline.createNewInstant(instant); - activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, partitionPath, ts, 2))); + activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, partitionPath, ts, 2, Collections.emptyMap()))); HoodieInstant cleanInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, ts); activeTimeline.createNewInstant(cleanInstant); @@ -147,6 +148,42 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4"})); } + @Test + public void testGetExtraMetadata() throws Exception { + String extraMetadataKey = "test_key"; + String extraMetadataValue1 = "test_value1"; + HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); + HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline(); + assertTrue(activeCommitTimeline.empty()); + assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey).isPresent()); + + String ts = "0"; + HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts); + activeTimeline.createNewInstant(instant); + activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap()))); + + ts = "1"; + instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts); + activeTimeline.createNewInstant(instant); + Map extraMetadata = new HashMap<>(); + extraMetadata.put(extraMetadataKey, extraMetadataValue1); + activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2, extraMetadata))); + + metaClient.reloadActiveTimeline(); + + // verify modified partitions included cleaned data + Option extraLatestValue = TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey); + assertTrue(extraLatestValue.isPresent()); + assertEquals(extraMetadataValue1, extraLatestValue.get()); + assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, "unknownKey").isPresent()); + + Map> extraMetadataEntries = TimelineUtils.getAllExtraMetadataForKey(metaClient, extraMetadataKey); + assertEquals(2, extraMetadataEntries.size()); + assertFalse(extraMetadataEntries.get("0").isPresent()); + assertTrue(extraMetadataEntries.get("1").isPresent()); + assertEquals(extraMetadataValue1, extraMetadataEntries.get("1").get()); + } + private byte[] getRestoreMetadata(String basePath, String partition, String commitTs, int count) throws IOException { HoodieRestoreMetadata metadata = new HoodieRestoreMetadata(); List rollbackM = new ArrayList<>(); @@ -173,7 +210,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { return TimelineMetadataUtils.convertRollbackMetadata(commitTs, Option.empty(), rollbacks, rollbackStats); } - private byte[] getCommitMetadata(String basePath, String partition, String commitTs, int count) + private byte[] getCommitMetadata(String basePath, String partition, String commitTs, int count, Map extraMetadata) throws IOException { HoodieCommitMetadata commit = new HoodieCommitMetadata(); for (int i = 1; i <= count; i++) { @@ -183,6 +220,9 @@ public class TestTimelineUtils extends HoodieCommonTestHarness { stat.setPath(commitTs + "." + i + ".parquet"); commit.addWriteStat(partition, stat); } + for (Map.Entry extraEntries : extraMetadata.entrySet()) { + commit.addMetadata(extraEntries.getKey(), extraEntries.getValue()); + } return commit.toJsonString().getBytes(StandardCharsets.UTF_8); }