1
0

[HUDI-1507] Change timeline utils to support reading replacecommit metadata (#2407)

This commit is contained in:
satishkotha
2021-01-06 04:55:14 -08:00
committed by GitHub
parent da2919a75f
commit 2c4868e770
2 changed files with 78 additions and 0 deletions

View File

@@ -21,14 +21,17 @@ package org.apache.hudi.common.table.timeline;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieIOException;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -63,6 +66,17 @@ public class TimelineUtils {
return commitMetadata.getPartitionToWriteStats().keySet().stream();
} catch (IOException e) {
throw new HoodieIOException("Failed to get partitions written at " + s, e);
}
case HoodieTimeline.REPLACE_COMMIT_ACTION:
try {
HoodieReplaceCommitMetadata commitMetadata = HoodieReplaceCommitMetadata.fromBytes(
timeline.getInstantDetails(s).get(), HoodieReplaceCommitMetadata.class);
Set<String> partitions = new HashSet<>();
partitions.addAll(commitMetadata.getPartitionToReplaceFileIds().keySet());
partitions.addAll(commitMetadata.getPartitionToWriteStats().keySet());
return partitions.stream();
} catch (IOException e) {
throw new HoodieIOException("Failed to get partitions modified at " + s, e);
}
case HoodieTimeline.CLEAN_ACTION:
try {

View File

@@ -23,8 +23,10 @@ import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.HoodieRollbackStat;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -58,6 +60,43 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
initMetaClient();
}
@Test
public void testGetPartitionsWithReplaceCommits() throws IOException {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
HoodieTimeline activeCommitTimeline = activeTimeline.getCommitTimeline();
assertTrue(activeCommitTimeline.empty());
String ts1 = "1";
String replacePartition = "2021/01/01";
String newFilePartition = "2021/01/02";
HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, ts1);
activeTimeline.createNewInstant(instant1);
// create replace metadata only with replaced file Ids (no new files created)
activeTimeline.saveAsComplete(instant1,
Option.of(getReplaceCommitMetadata(basePath, ts1, replacePartition,2, newFilePartition,0, Collections.emptyMap())));
metaClient.reloadActiveTimeline();
List<String> partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("0", 10));
assertEquals(1, partitions.size());
assertEquals(replacePartition, partitions.get(0));
String ts2 = "2";
HoodieInstant instant2 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, ts2);
activeTimeline.createNewInstant(instant2);
// create replace metadata only with replaced file Ids (no new files created)
activeTimeline.saveAsComplete(instant2,
Option.of(getReplaceCommitMetadata(basePath, ts2, replacePartition,0, newFilePartition,3, Collections.emptyMap())));
metaClient.reloadActiveTimeline();
partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("1", 10));
assertEquals(1, partitions.size());
assertEquals(newFilePartition, partitions.get(0));
partitions = TimelineUtils.getAffectedPartitions(metaClient.getActiveTimeline().findInstantsAfter("0", 10));
assertEquals(2, partitions.size());
assertTrue(partitions.contains(replacePartition));
assertTrue(partitions.contains(newFilePartition));
}
@Test
public void testGetPartitions() throws IOException {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
@@ -224,6 +263,31 @@ public class TestTimelineUtils extends HoodieCommonTestHarness {
return commit.toJsonString().getBytes(StandardCharsets.UTF_8);
}
private byte[] getReplaceCommitMetadata(String basePath, String commitTs, String replacePartition, int replaceCount,
String newFilePartition, int newFileCount, Map<String, String> extraMetadata)
throws IOException {
HoodieReplaceCommitMetadata commit = new HoodieReplaceCommitMetadata();
for (int i = 1; i <= newFileCount; i++) {
HoodieWriteStat stat = new HoodieWriteStat();
stat.setFileId(i + "");
stat.setPartitionPath(Paths.get(basePath, newFilePartition).toString());
stat.setPath(commitTs + "." + i + ".parquet");
commit.addWriteStat(newFilePartition, stat);
}
Map<String, List<String>> partitionToReplaceFileIds = new HashMap<>();
if (replaceCount > 0) {
partitionToReplaceFileIds.put(replacePartition, new ArrayList<>());
}
for (int i = 1; i <= replaceCount; i++) {
partitionToReplaceFileIds.get(replacePartition).add(FSUtils.createNewFileIdPfx());
}
commit.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
for (Map.Entry<String, String> extraEntries : extraMetadata.entrySet()) {
commit.addMetadata(extraEntries.getKey(), extraEntries.getValue());
}
return commit.toJsonString().getBytes(StandardCharsets.UTF_8);
}
private Option<byte[]> getCleanMetadata(String partition, String time) throws IOException {
Map<String, HoodieCleanPartitionMetadata> partitionToFilesCleaned = new HashMap<>();
List<String> filesDeleted = new ArrayList<>();