[HUDI-1498] Read clustering plan from requested file for inflight instant (#2389)
This commit is contained in:
@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieLogFile;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.exception.HoodieIOException;
|
||||
@@ -68,21 +69,30 @@ public class ClusteringUtils {
|
||||
.filter(Option::isPresent).map(Option::get);
|
||||
}
|
||||
|
||||
public static Option<Pair<HoodieInstant, HoodieClusteringPlan>> getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant requestedReplaceInstant) {
|
||||
public static Option<Pair<HoodieInstant, HoodieClusteringPlan>> getClusteringPlan(HoodieTableMetaClient metaClient, HoodieInstant pendingReplaceInstant) {
|
||||
try {
|
||||
Option<byte[]> content = metaClient.getActiveTimeline().getInstantDetails(requestedReplaceInstant);
|
||||
final HoodieInstant requestedInstant;
|
||||
if (!pendingReplaceInstant.isRequested()) {
|
||||
// inflight replacecommit files don't have clustering plan.
|
||||
// This is because replacecommit inflight can have workload profile for 'insert_overwrite'.
|
||||
// Get the plan from corresponding requested instant.
|
||||
requestedInstant = HoodieTimeline.getReplaceCommitRequestedInstant(pendingReplaceInstant.getTimestamp());
|
||||
} else {
|
||||
requestedInstant = pendingReplaceInstant;
|
||||
}
|
||||
Option<byte[]> content = metaClient.getActiveTimeline().getInstantDetails(requestedInstant);
|
||||
if (!content.isPresent() || content.get().length == 0) {
|
||||
// few operations create requested file without any content. Assume these are not clustering
|
||||
LOG.warn("No content found in requested file for instant " + requestedReplaceInstant);
|
||||
LOG.warn("No content found in requested file for instant " + pendingReplaceInstant);
|
||||
return Option.empty();
|
||||
}
|
||||
HoodieRequestedReplaceMetadata requestedReplaceMetadata = TimelineMetadataUtils.deserializeRequestedReplaceMetadta(content.get());
|
||||
if (WriteOperationType.CLUSTER.name().equals(requestedReplaceMetadata.getOperationType())) {
|
||||
return Option.of(Pair.of(requestedReplaceInstant, requestedReplaceMetadata.getClusteringPlan()));
|
||||
return Option.of(Pair.of(pendingReplaceInstant, requestedReplaceMetadata.getClusteringPlan()));
|
||||
}
|
||||
return Option.empty();
|
||||
} catch (IOException e) {
|
||||
throw new HoodieIOException("Error reading clustering plan " + requestedReplaceInstant.getTimestamp(), e);
|
||||
throw new HoodieIOException("Error reading clustering plan " + pendingReplaceInstant.getTimestamp(), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@ package org.apache.hudi.common.table.view;
|
||||
|
||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||
import org.apache.hudi.common.HoodieCleanStat;
|
||||
@@ -35,6 +36,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
|
||||
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.model.WriteOperationType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
|
||||
@@ -857,11 +859,20 @@ public class TestIncrementalFSViewSync extends HoodieCommonTestHarness {
|
||||
private List<String> addReplaceInstant(HoodieTableMetaClient metaClient, String instant,
|
||||
List<Pair<String, HoodieWriteStat>> writeStats,
|
||||
Map<String, List<String>> partitionToReplaceFileIds) throws IOException {
|
||||
// created requested
|
||||
HoodieInstant newRequestedInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, instant);
|
||||
HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()
|
||||
.setOperationType(WriteOperationType.UNKNOWN.name()).build();
|
||||
metaClient.getActiveTimeline().saveToPendingReplaceCommit(newRequestedInstant,
|
||||
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
|
||||
|
||||
metaClient.reloadActiveTimeline();
|
||||
// transition to inflight
|
||||
HoodieInstant inflightInstant = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(newRequestedInstant, Option.empty());
|
||||
// transition to replacecommit
|
||||
HoodieReplaceCommitMetadata replaceCommitMetadata = new HoodieReplaceCommitMetadata();
|
||||
writeStats.forEach(e -> replaceCommitMetadata.addWriteStat(e.getKey(), e.getValue()));
|
||||
replaceCommitMetadata.setPartitionToReplaceFileIds(partitionToReplaceFileIds);
|
||||
HoodieInstant inflightInstant = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, instant);
|
||||
metaClient.getActiveTimeline().createNewInstant(inflightInstant);
|
||||
metaClient.getActiveTimeline().saveAsComplete(inflightInstant,
|
||||
Option.of(replaceCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
|
||||
return writeStats.stream().map(e -> e.getValue().getPath()).collect(Collectors.toList());
|
||||
|
||||
@@ -98,6 +98,22 @@ public class TestClusteringUtils extends HoodieCommonTestHarness {
|
||||
validateClusteringInstant(fileIds3, partitionPath1, clusterTime, fileGroupToInstantMap);
|
||||
}
|
||||
|
||||
// replacecommit.inflight doesnt have clustering plan.
|
||||
// Verify that getClusteringPlan fetches content from corresponding requested file.
|
||||
@Test
|
||||
public void testClusteringPlanInflight() throws Exception {
|
||||
String partitionPath1 = "partition1";
|
||||
List<String> fileIds1 = new ArrayList<>();
|
||||
fileIds1.add(UUID.randomUUID().toString());
|
||||
fileIds1.add(UUID.randomUUID().toString());
|
||||
String clusterTime1 = "1";
|
||||
HoodieInstant requestedInstant = createRequestedReplaceInstant(partitionPath1, clusterTime1, fileIds1);
|
||||
HoodieInstant inflightInstant = metaClient.getActiveTimeline().transitionReplaceRequestedToInflight(requestedInstant, Option.empty());
|
||||
HoodieClusteringPlan requestedClusteringPlan = ClusteringUtils.getClusteringPlan(metaClient, requestedInstant).get().getRight();
|
||||
HoodieClusteringPlan inflightClusteringPlan = ClusteringUtils.getClusteringPlan(metaClient, inflightInstant).get().getRight();
|
||||
assertEquals(requestedClusteringPlan, inflightClusteringPlan);
|
||||
}
|
||||
|
||||
private void validateClusteringInstant(List<String> fileIds, String partitionPath,
|
||||
String expectedInstantTime, Map<HoodieFileGroupId, HoodieInstant> fileGroupToInstantMap) {
|
||||
for (String fileId : fileIds) {
|
||||
|
||||
Reference in New Issue
Block a user