[HUDI-1651] Fix archival of requested replacecommit (#2622)
This commit is contained in:
@@ -400,9 +400,14 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case HoodieTimeline.REPLACE_COMMIT_ACTION: {
|
case HoodieTimeline.REPLACE_COMMIT_ACTION: {
|
||||||
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata
|
if (hoodieInstant.isRequested()) {
|
||||||
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieReplaceCommitMetadata.class);
|
archivedMetaWrapper.setHoodieRequestedReplaceMetadata(
|
||||||
archivedMetaWrapper.setHoodieReplaceCommitMetadata(ReplaceArchivalHelper.convertReplaceCommitMetadata(replaceCommitMetadata));
|
TimelineMetadataUtils.deserializeRequestedReplaceMetadata(commitTimeline.getInstantDetails(hoodieInstant).get()));
|
||||||
|
} else if (hoodieInstant.isCompleted()) {
|
||||||
|
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata
|
||||||
|
.fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), HoodieReplaceCommitMetadata.class);
|
||||||
|
archivedMetaWrapper.setHoodieReplaceCommitMetadata(ReplaceArchivalHelper.convertReplaceCommitMetadata(replaceCommitMetadata));
|
||||||
|
}
|
||||||
archivedMetaWrapper.setActionType(ActionType.replacecommit.name());
|
archivedMetaWrapper.setActionType(ActionType.replacecommit.name());
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ package org.apache.hudi.io;
|
|||||||
import org.apache.hudi.avro.model.HoodieActionInstant;
|
import org.apache.hudi.avro.model.HoodieActionInstant;
|
||||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||||
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
||||||
|
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
|
||||||
import org.apache.hudi.common.HoodieCleanStat;
|
import org.apache.hudi.common.HoodieCleanStat;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
|
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
|
||||||
@@ -495,11 +496,16 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
|
|||||||
String fileId2 = "file-" + instantTime + "-2";
|
String fileId2 = "file-" + instantTime + "-2";
|
||||||
|
|
||||||
// create replace instant to mark fileId1 as deleted
|
// create replace instant to mark fileId1 as deleted
|
||||||
|
HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()
|
||||||
|
.setOperationType(WriteOperationType.INSERT_OVERWRITE.toString())
|
||||||
|
.setVersion(1)
|
||||||
|
.setExtraMetadata(Collections.emptyMap())
|
||||||
|
.build();
|
||||||
HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata();
|
HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata();
|
||||||
replaceMetadata.addReplaceFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1);
|
replaceMetadata.addReplaceFileId(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1);
|
||||||
replaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE);
|
replaceMetadata.setOperationType(WriteOperationType.INSERT_OVERWRITE);
|
||||||
HoodieTestTable.of(metaClient)
|
HoodieTestTable.of(metaClient)
|
||||||
.addReplaceCommit(instantTime, replaceMetadata)
|
.addReplaceCommit(instantTime, requestedReplaceMetadata, replaceMetadata)
|
||||||
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
|
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, fileId1, fileId2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -24,9 +24,14 @@ import org.apache.hudi.avro.model.HoodieActionInstant;
|
|||||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||||
import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
|
import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
|
||||||
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
||||||
|
import org.apache.hudi.avro.model.HoodieClusteringGroup;
|
||||||
|
import org.apache.hudi.avro.model.HoodieClusteringPlan;
|
||||||
|
import org.apache.hudi.avro.model.HoodieClusteringStrategy;
|
||||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||||
import org.apache.hudi.avro.model.HoodieFileStatus;
|
import org.apache.hudi.avro.model.HoodieFileStatus;
|
||||||
|
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
|
||||||
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
||||||
|
import org.apache.hudi.avro.model.HoodieSliceInfo;
|
||||||
import org.apache.hudi.client.SparkRDDWriteClient;
|
import org.apache.hudi.client.SparkRDDWriteClient;
|
||||||
import org.apache.hudi.client.WriteStatus;
|
import org.apache.hudi.client.WriteStatus;
|
||||||
import org.apache.hudi.common.HoodieCleanStat;
|
import org.apache.hudi.common.HoodieCleanStat;
|
||||||
@@ -845,7 +850,8 @@ public class TestCleaner extends HoodieClientTestBase {
|
|||||||
// make next replacecommit, with 1 clustering operation. logically delete p0. No change to p1
|
// make next replacecommit, with 1 clustering operation. logically delete p0. No change to p1
|
||||||
Map<String, String> partitionAndFileId002 = testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0);
|
Map<String, String> partitionAndFileId002 = testTable.forReplaceCommit("00000000000002").getFileIdsWithBaseFilesInPartitions(p0);
|
||||||
String file2P0C1 = partitionAndFileId002.get(p0);
|
String file2P0C1 = partitionAndFileId002.get(p0);
|
||||||
testTable.addReplaceCommit("00000000000002", generateReplaceCommitMetadata(p0, file1P0C0, file2P0C1));
|
Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> replaceMetadata = generateReplaceCommitMetadata(p0, file1P0C0, file2P0C1);
|
||||||
|
testTable.addReplaceCommit("00000000000002", replaceMetadata.getKey(), replaceMetadata.getValue());
|
||||||
|
|
||||||
// run cleaner
|
// run cleaner
|
||||||
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config);
|
List<HoodieCleanStat> hoodieCleanStatsTwo = runCleaner(config);
|
||||||
@@ -857,7 +863,8 @@ public class TestCleaner extends HoodieClientTestBase {
|
|||||||
// make next replacecommit, with 1 clustering operation. Replace data in p1. No change to p0
|
// make next replacecommit, with 1 clustering operation. Replace data in p1. No change to p0
|
||||||
Map<String, String> partitionAndFileId003 = testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1);
|
Map<String, String> partitionAndFileId003 = testTable.forReplaceCommit("00000000000003").getFileIdsWithBaseFilesInPartitions(p1);
|
||||||
String file3P1C2 = partitionAndFileId003.get(p1);
|
String file3P1C2 = partitionAndFileId003.get(p1);
|
||||||
testTable.addReplaceCommit("00000000000003", generateReplaceCommitMetadata(p1, file1P1C0, file3P1C2));
|
replaceMetadata = generateReplaceCommitMetadata(p1, file1P1C0, file3P1C2);
|
||||||
|
testTable.addReplaceCommit("00000000000003", replaceMetadata.getKey(), replaceMetadata.getValue());
|
||||||
|
|
||||||
// run cleaner
|
// run cleaner
|
||||||
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config);
|
List<HoodieCleanStat> hoodieCleanStatsThree = runCleaner(config);
|
||||||
@@ -870,7 +877,8 @@ public class TestCleaner extends HoodieClientTestBase {
|
|||||||
// make next replacecommit, with 1 clustering operation. Replace data in p0 again
|
// make next replacecommit, with 1 clustering operation. Replace data in p0 again
|
||||||
Map<String, String> partitionAndFileId004 = testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0);
|
Map<String, String> partitionAndFileId004 = testTable.forReplaceCommit("00000000000004").getFileIdsWithBaseFilesInPartitions(p0);
|
||||||
String file4P0C3 = partitionAndFileId004.get(p0);
|
String file4P0C3 = partitionAndFileId004.get(p0);
|
||||||
testTable.addReplaceCommit("00000000000004", generateReplaceCommitMetadata(p0, file2P0C1, file4P0C3));
|
replaceMetadata = generateReplaceCommitMetadata(p0, file2P0C1, file4P0C3);
|
||||||
|
testTable.addReplaceCommit("00000000000004", replaceMetadata.getKey(), replaceMetadata.getValue());
|
||||||
|
|
||||||
// run cleaner
|
// run cleaner
|
||||||
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
|
List<HoodieCleanStat> hoodieCleanStatsFour = runCleaner(config);
|
||||||
@@ -884,7 +892,8 @@ public class TestCleaner extends HoodieClientTestBase {
|
|||||||
// make next replacecommit, with 1 clustering operation. Replace all data in p1. no new files created
|
// make next replacecommit, with 1 clustering operation. Replace all data in p1. no new files created
|
||||||
Map<String, String> partitionAndFileId005 = testTable.forReplaceCommit("00000000000005").getFileIdsWithBaseFilesInPartitions(p1);
|
Map<String, String> partitionAndFileId005 = testTable.forReplaceCommit("00000000000005").getFileIdsWithBaseFilesInPartitions(p1);
|
||||||
String file4P1C4 = partitionAndFileId005.get(p1);
|
String file4P1C4 = partitionAndFileId005.get(p1);
|
||||||
testTable.addReplaceCommit("00000000000005", generateReplaceCommitMetadata(p1, file3P1C2, file4P1C4));
|
replaceMetadata = generateReplaceCommitMetadata(p0, file3P1C2, file4P1C4);
|
||||||
|
testTable.addReplaceCommit("00000000000005", replaceMetadata.getKey(), replaceMetadata.getValue());
|
||||||
|
|
||||||
List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, 2);
|
List<HoodieCleanStat> hoodieCleanStatsFive = runCleaner(config, 2);
|
||||||
assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
|
assertTrue(testTable.baseFileExists(p0, "00000000000004", file4P0C3));
|
||||||
@@ -894,7 +903,23 @@ public class TestCleaner extends HoodieClientTestBase {
|
|||||||
assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
|
assertFalse(testTable.baseFileExists(p1, "00000000000001", file1P1C0));
|
||||||
}
|
}
|
||||||
|
|
||||||
private HoodieReplaceCommitMetadata generateReplaceCommitMetadata(String partition, String replacedFileId, String newFileId) {
|
private Pair<HoodieRequestedReplaceMetadata, HoodieReplaceCommitMetadata> generateReplaceCommitMetadata(String partition,
|
||||||
|
String replacedFileId,
|
||||||
|
String newFileId) {
|
||||||
|
HoodieRequestedReplaceMetadata requestedReplaceMetadata = new HoodieRequestedReplaceMetadata();
|
||||||
|
requestedReplaceMetadata.setOperationType(WriteOperationType.CLUSTER.toString());
|
||||||
|
requestedReplaceMetadata.setVersion(1);
|
||||||
|
HoodieSliceInfo sliceInfo = HoodieSliceInfo.newBuilder().setFileId(replacedFileId).build();
|
||||||
|
List<HoodieClusteringGroup> clusteringGroups = new ArrayList<>();
|
||||||
|
clusteringGroups.add(HoodieClusteringGroup.newBuilder()
|
||||||
|
.setVersion(1).setNumOutputFileGroups(1).setMetrics(Collections.emptyMap())
|
||||||
|
.setSlices(Collections.singletonList(sliceInfo)).build());
|
||||||
|
requestedReplaceMetadata.setExtraMetadata(Collections.emptyMap());
|
||||||
|
requestedReplaceMetadata.setClusteringPlan(HoodieClusteringPlan.newBuilder()
|
||||||
|
.setVersion(1).setExtraMetadata(Collections.emptyMap())
|
||||||
|
.setStrategy(HoodieClusteringStrategy.newBuilder().setStrategyClassName("").setVersion(1).build())
|
||||||
|
.setInputGroups(clusteringGroups).build());
|
||||||
|
|
||||||
HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata();
|
HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata();
|
||||||
replaceMetadata.addReplaceFileId(partition, replacedFileId);
|
replaceMetadata.addReplaceFileId(partition, replacedFileId);
|
||||||
replaceMetadata.setOperationType(WriteOperationType.CLUSTER);
|
replaceMetadata.setOperationType(WriteOperationType.CLUSTER);
|
||||||
@@ -905,7 +930,7 @@ public class TestCleaner extends HoodieClientTestBase {
|
|||||||
writeStat.setFileId(newFileId);
|
writeStat.setFileId(newFileId);
|
||||||
replaceMetadata.addWriteStat(partition, writeStat);
|
replaceMetadata.addWriteStat(partition, writeStat);
|
||||||
}
|
}
|
||||||
return replaceMetadata;
|
return Pair.of(requestedReplaceMetadata, replaceMetadata);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
@@ -77,7 +77,6 @@
|
|||||||
<import>${basedir}/src/main/avro/HoodieRollbackMetadata.avsc</import>
|
<import>${basedir}/src/main/avro/HoodieRollbackMetadata.avsc</import>
|
||||||
<import>${basedir}/src/main/avro/HoodieRestoreMetadata.avsc</import>
|
<import>${basedir}/src/main/avro/HoodieRestoreMetadata.avsc</import>
|
||||||
<import>${basedir}/src/main/avro/HoodieReplaceCommitMetadata.avsc</import>
|
<import>${basedir}/src/main/avro/HoodieReplaceCommitMetadata.avsc</import>
|
||||||
<import>${basedir}/src/main/avro/HoodieArchivedMetaEntry.avsc</import>
|
|
||||||
<import>${basedir}/src/main/avro/HoodiePath.avsc</import>
|
<import>${basedir}/src/main/avro/HoodiePath.avsc</import>
|
||||||
<import>${basedir}/src/main/avro/HoodieFSPermission.avsc</import>
|
<import>${basedir}/src/main/avro/HoodieFSPermission.avsc</import>
|
||||||
<import>${basedir}/src/main/avro/HoodieFileStatus.avsc</import>
|
<import>${basedir}/src/main/avro/HoodieFileStatus.avsc</import>
|
||||||
@@ -90,6 +89,7 @@
|
|||||||
<import>${basedir}/src/main/avro/HoodieClusteringPlan.avsc</import>
|
<import>${basedir}/src/main/avro/HoodieClusteringPlan.avsc</import>
|
||||||
<import>${basedir}/src/main/avro/HoodieRequestedReplaceMetadata.avsc</import>
|
<import>${basedir}/src/main/avro/HoodieRequestedReplaceMetadata.avsc</import>
|
||||||
<import>${basedir}/src/main/avro/HoodieMetadata.avsc</import>
|
<import>${basedir}/src/main/avro/HoodieMetadata.avsc</import>
|
||||||
|
<import>${basedir}/src/main/avro/HoodieArchivedMetaEntry.avsc</import>
|
||||||
</imports>
|
</imports>
|
||||||
</configuration>
|
</configuration>
|
||||||
</plugin>
|
</plugin>
|
||||||
|
|||||||
@@ -104,6 +104,14 @@
|
|||||||
"HoodieReplaceCommitMetadata"
|
"HoodieReplaceCommitMetadata"
|
||||||
],
|
],
|
||||||
"default": null
|
"default": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name":"hoodieRequestedReplaceMetadata",
|
||||||
|
"type":[
|
||||||
|
"null",
|
||||||
|
"HoodieRequestedReplaceMetadata"
|
||||||
|
],
|
||||||
|
"default": null
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ package org.apache.hudi.common.testutils;
|
|||||||
|
|
||||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||||
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
||||||
|
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
|
||||||
import org.apache.hudi.common.fs.FSUtils;
|
import org.apache.hudi.common.fs.FSUtils;
|
||||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||||
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
import org.apache.hudi.common.model.HoodiePartitionMetadata;
|
||||||
@@ -28,6 +29,7 @@ import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
|
|||||||
import org.apache.hudi.common.model.IOType;
|
import org.apache.hudi.common.model.IOType;
|
||||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
|
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
|
||||||
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
|
||||||
import org.apache.hudi.common.table.view.TableFileSystemView;
|
import org.apache.hudi.common.table.view.TableFileSystemView;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
@@ -147,8 +149,9 @@ public class FileCreateUtils {
|
|||||||
createMetaFile(basePath, instantTime, HoodieTimeline.REPLACE_COMMIT_EXTENSION, metadata.toJsonString().getBytes(StandardCharsets.UTF_8));
|
createMetaFile(basePath, instantTime, HoodieTimeline.REPLACE_COMMIT_EXTENSION, metadata.toJsonString().getBytes(StandardCharsets.UTF_8));
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void createRequestedReplaceCommit(String basePath, String instantTime) throws IOException {
|
public static void createRequestedReplaceCommit(String basePath, String instantTime, HoodieRequestedReplaceMetadata requestedMetadata) throws IOException {
|
||||||
createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_REPLACE_COMMIT_EXTENSION);
|
createMetaFile(basePath, instantTime, HoodieTimeline.REQUESTED_REPLACE_COMMIT_EXTENSION,
|
||||||
|
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedMetadata).get());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void createInflightReplaceCommit(String basePath, String instantTime) throws IOException {
|
public static void createInflightReplaceCommit(String basePath, String instantTime) throws IOException {
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ package org.apache.hudi.common.testutils;
|
|||||||
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
||||||
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
import org.apache.hudi.avro.model.HoodieCleanerPlan;
|
||||||
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
import org.apache.hudi.avro.model.HoodieCompactionPlan;
|
||||||
|
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
|
||||||
import org.apache.hudi.common.model.FileSlice;
|
import org.apache.hudi.common.model.FileSlice;
|
||||||
import org.apache.hudi.common.model.HoodieFileFormat;
|
import org.apache.hudi.common.model.HoodieFileFormat;
|
||||||
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
|
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
|
||||||
@@ -163,8 +164,8 @@ public class HoodieTestTable {
|
|||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public HoodieTestTable addReplaceCommit(String instantTime, HoodieReplaceCommitMetadata metadata) throws Exception {
|
public HoodieTestTable addReplaceCommit(String instantTime, HoodieRequestedReplaceMetadata requestedReplaceMetadata, HoodieReplaceCommitMetadata metadata) throws Exception {
|
||||||
createRequestedReplaceCommit(basePath, instantTime);
|
createRequestedReplaceCommit(basePath, instantTime, requestedReplaceMetadata);
|
||||||
createInflightReplaceCommit(basePath, instantTime);
|
createInflightReplaceCommit(basePath, instantTime);
|
||||||
createReplaceCommit(basePath, instantTime, metadata);
|
createReplaceCommit(basePath, instantTime, metadata);
|
||||||
currentInstantTime = instantTime;
|
currentInstantTime = instantTime;
|
||||||
|
|||||||
Reference in New Issue
Block a user