[HUDI-1228] Add utility method to query extra metadata
This commit is contained in:
@@ -21,11 +21,14 @@ package org.apache.hudi.common.table.timeline;
|
|||||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||||
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
||||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||||
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.exception.HoodieIOException;
|
import org.apache.hudi.exception.HoodieIOException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
@@ -98,4 +101,31 @@ public class TimelineUtils {
|
|||||||
|
|
||||||
}).distinct().filter(s -> !s.isEmpty()).collect(Collectors.toList());
|
}).distinct().filter(s -> !s.isEmpty()).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get extra metadata for specified key from latest commit/deltacommit instant.
|
||||||
|
*/
|
||||||
|
public static Option<String> getExtraMetadataFromLatest(HoodieTableMetaClient metaClient, String extraMetadataKey) {
|
||||||
|
return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants().findFirst().map(instant ->
|
||||||
|
getMetadataValue(metaClient, extraMetadataKey, instant)).orElse(Option.empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get extra metadata for specified key from all active commit/deltacommit instants.
|
||||||
|
*/
|
||||||
|
public static Map<String, Option<String>> getAllExtraMetadataForKey(HoodieTableMetaClient metaClient, String extraMetadataKey) {
|
||||||
|
return metaClient.getCommitsTimeline().filterCompletedInstants().getReverseOrderedInstants().collect(Collectors.toMap(
|
||||||
|
HoodieInstant::getTimestamp, instant -> getMetadataValue(metaClient, extraMetadataKey, instant)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static Option<String> getMetadataValue(HoodieTableMetaClient metaClient, String extraMetadataKey, HoodieInstant instant) {
|
||||||
|
try {
|
||||||
|
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
|
||||||
|
metaClient.getCommitsTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
|
||||||
|
|
||||||
|
return Option.ofNullable(commitMetadata.getExtraMetadata().get(extraMetadataKey));
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new HoodieIOException("Unable to parse instant metadata " + instant, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -48,6 +48,7 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
public class TestTimelineUtils extends HoodieCommonTestHarness {
|
public class TestTimelineUtils extends HoodieCommonTestHarness {
|
||||||
@@ -68,7 +69,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
|
|||||||
String ts = i + "";
|
String ts = i + "";
|
||||||
HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
|
HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
|
||||||
activeTimeline.createNewInstant(instant);
|
activeTimeline.createNewInstant(instant);
|
||||||
activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2)));
|
activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap())));
|
||||||
|
|
||||||
HoodieInstant cleanInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, ts);
|
HoodieInstant cleanInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, ts);
|
||||||
activeTimeline.createNewInstant(cleanInstant);
|
activeTimeline.createNewInstant(cleanInstant);
|
||||||
@@ -107,7 +108,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
|
|||||||
String ts = i + "";
|
String ts = i + "";
|
||||||
HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
|
HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
|
||||||
activeTimeline.createNewInstant(instant);
|
activeTimeline.createNewInstant(instant);
|
||||||
activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, partitionPath, ts, 2)));
|
activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, partitionPath, ts, 2, Collections.emptyMap())));
|
||||||
|
|
||||||
HoodieInstant cleanInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, ts);
|
HoodieInstant cleanInstant = new HoodieInstant(true, HoodieTimeline.CLEAN_ACTION, ts);
|
||||||
activeTimeline.createNewInstant(cleanInstant);
|
activeTimeline.createNewInstant(cleanInstant);
|
||||||
@@ -147,6 +148,42 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
|
|||||||
assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4"}));
|
assertEquals(partitions, Arrays.asList(new String[]{"2", "3", "4"}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetExtraMetadata() throws Exception {
|
||||||
|
String extraMetadataKey = "test_key";
|
||||||
|
String extraMetadataValue1 = "test_value1";
|
||||||
|
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
|
||||||
|
HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline();
|
||||||
|
assertTrue(activeCommitTimeline.empty());
|
||||||
|
assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey).isPresent());
|
||||||
|
|
||||||
|
String ts = "0";
|
||||||
|
HoodieInstant instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
|
||||||
|
activeTimeline.createNewInstant(instant);
|
||||||
|
activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2, Collections.emptyMap())));
|
||||||
|
|
||||||
|
ts = "1";
|
||||||
|
instant = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, ts);
|
||||||
|
activeTimeline.createNewInstant(instant);
|
||||||
|
Map<String, String> extraMetadata = new HashMap<>();
|
||||||
|
extraMetadata.put(extraMetadataKey, extraMetadataValue1);
|
||||||
|
activeTimeline.saveAsComplete(instant, Option.of(getCommitMetadata(basePath, ts, ts, 2, extraMetadata)));
|
||||||
|
|
||||||
|
metaClient.reloadActiveTimeline();
|
||||||
|
|
||||||
|
// verify modified partitions included cleaned data
|
||||||
|
Option<String> extraLatestValue = TimelineUtils.getExtraMetadataFromLatest(metaClient, extraMetadataKey);
|
||||||
|
assertTrue(extraLatestValue.isPresent());
|
||||||
|
assertEquals(extraMetadataValue1, extraLatestValue.get());
|
||||||
|
assertFalse(TimelineUtils.getExtraMetadataFromLatest(metaClient, "unknownKey").isPresent());
|
||||||
|
|
||||||
|
Map<String, Option<String>> extraMetadataEntries = TimelineUtils.getAllExtraMetadataForKey(metaClient, extraMetadataKey);
|
||||||
|
assertEquals(2, extraMetadataEntries.size());
|
||||||
|
assertFalse(extraMetadataEntries.get("0").isPresent());
|
||||||
|
assertTrue(extraMetadataEntries.get("1").isPresent());
|
||||||
|
assertEquals(extraMetadataValue1, extraMetadataEntries.get("1").get());
|
||||||
|
}
|
||||||
|
|
||||||
private byte[] getRestoreMetadata(String basePath, String partition, String commitTs, int count) throws IOException {
|
private byte[] getRestoreMetadata(String basePath, String partition, String commitTs, int count) throws IOException {
|
||||||
HoodieRestoreMetadata metadata = new HoodieRestoreMetadata();
|
HoodieRestoreMetadata metadata = new HoodieRestoreMetadata();
|
||||||
List<HoodieRollbackMetadata> rollbackM = new ArrayList<>();
|
List<HoodieRollbackMetadata> rollbackM = new ArrayList<>();
|
||||||
@@ -173,7 +210,7 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
|
|||||||
return TimelineMetadataUtils.convertRollbackMetadata(commitTs, Option.empty(), rollbacks, rollbackStats);
|
return TimelineMetadataUtils.convertRollbackMetadata(commitTs, Option.empty(), rollbacks, rollbackStats);
|
||||||
}
|
}
|
||||||
|
|
||||||
private byte[] getCommitMetadata(String basePath, String partition, String commitTs, int count)
|
private byte[] getCommitMetadata(String basePath, String partition, String commitTs, int count, Map<String, String> extraMetadata)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HoodieCommitMetadata commit = new HoodieCommitMetadata();
|
HoodieCommitMetadata commit = new HoodieCommitMetadata();
|
||||||
for (int i = 1; i <= count; i++) {
|
for (int i = 1; i <= count; i++) {
|
||||||
@@ -183,6 +220,9 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
|
|||||||
stat.setPath(commitTs + "." + i + ".parquet");
|
stat.setPath(commitTs + "." + i + ".parquet");
|
||||||
commit.addWriteStat(partition, stat);
|
commit.addWriteStat(partition, stat);
|
||||||
}
|
}
|
||||||
|
for (Map.Entry<String, String> extraEntries : extraMetadata.entrySet()) {
|
||||||
|
commit.addMetadata(extraEntries.getKey(), extraEntries.getValue());
|
||||||
|
}
|
||||||
return commit.toJsonString().getBytes(StandardCharsets.UTF_8);
|
return commit.toJsonString().getBytes(StandardCharsets.UTF_8);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user