1
0

[HUDI-2833][Design] Merge small archive files instead of expanding indefinitely. (#4078)

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
YueZhang
2022-01-19 14:42:35 +08:00
committed by GitHub
parent 4bea758738
commit 7647562dad
8 changed files with 639 additions and 46 deletions

View File

@@ -255,6 +255,22 @@ public class HoodieCompactionConfig extends HoodieConfig {
+ "record size estimate compute dynamically based on commit metadata. "
+ " This is critical in computing the insert parallelism and bin-packing inserts into small files.");
public static final ConfigProperty<Integer> ARCHIVE_MERGE_FILES_BATCH_SIZE = ConfigProperty
.key("hoodie.archive.merge.files.batch.size")
.defaultValue(10)
.withDocumentation("The number of small archive files to be merged at once.");
public static final ConfigProperty<Long> ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES = ConfigProperty
.key("hoodie.archive.merge.small.file.limit.bytes")
.defaultValue(20L * 1024 * 1024)
.withDocumentation("This config sets the archive file size limit below which an archive file becomes a candidate to be selected as such a small file.");
public static final ConfigProperty<Boolean> ARCHIVE_MERGE_ENABLE = ConfigProperty
.key("hoodie.archive.merge.enable")
.defaultValue(false)
.withDocumentation("When enable, hoodie will auto merge several small archive files into larger one. It's"
+ " useful when storage scheme doesn't support append operation.");
/** @deprecated Use {@link #CLEANER_POLICY} and its methods instead */
@Deprecated
public static final String CLEANER_POLICY_PROP = CLEANER_POLICY.key();
@@ -547,6 +563,21 @@ public class HoodieCompactionConfig extends HoodieConfig {
return this;
}
public Builder withArchiveMergeFilesBatchSize(int number) {
compactionConfig.setValue(ARCHIVE_MERGE_FILES_BATCH_SIZE, String.valueOf(number));
return this;
}
public Builder withArchiveMergeSmallFileLimit(long size) {
compactionConfig.setValue(ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES, String.valueOf(size));
return this;
}
public Builder withArchiveMergeEnable(boolean enable) {
compactionConfig.setValue(ARCHIVE_MERGE_ENABLE, String.valueOf(enable));
return this;
}
public Builder compactionSmallFileSize(long smallFileLimitBytes) {
compactionConfig.setValue(PARQUET_SMALL_FILE_LIMIT, String.valueOf(smallFileLimitBytes));
return this;

View File

@@ -1082,6 +1082,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getInt(HoodieCompactionConfig.MIN_COMMITS_TO_KEEP);
}
public int getArchiveMergeFilesBatchSize() {
return getInt(HoodieCompactionConfig.ARCHIVE_MERGE_FILES_BATCH_SIZE);
}
public int getParquetSmallFileLimit() {
return getInt(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT);
}
@@ -1110,6 +1114,14 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(HoodieCompactionConfig.AUTO_CLEAN);
}
public boolean getArchiveMergeEnable() {
return getBoolean(HoodieCompactionConfig.ARCHIVE_MERGE_ENABLE);
}
public long getArchiveMergeSmallFileLimitBytes() {
return getLong(HoodieCompactionConfig.ARCHIVE_MERGE_SMALL_FILE_LIMIT_BYTES);
}
public boolean isAutoArchive() {
return getBoolean(HoodieCompactionConfig.AUTO_ARCHIVE);
}

View File

@@ -18,14 +18,19 @@
package org.apache.hudi.table;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan;
import org.apache.hudi.client.utils.MetadataConversionUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.fs.StorageSchemes;
import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
@@ -36,8 +41,10 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -57,6 +64,7 @@ import org.apache.log4j.Logger;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
@@ -106,6 +114,19 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
}
}
public Writer reOpenWriter() {
try {
if (this.writer != null) {
this.writer.close();
this.writer = null;
}
this.writer = openWriter();
return writer;
} catch (IOException e) {
throw new HoodieException("Unable to initialize HoodieLogFormat writer", e);
}
}
private void close() {
try {
if (this.writer != null) {
@@ -122,7 +143,7 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
public boolean archiveIfRequired(HoodieEngineContext context) throws IOException {
try {
List<HoodieInstant> instantsToArchive = getInstantsToArchive().collect(Collectors.toList());
verifyLastMergeArchiveFilesIfNecessary(context);
boolean success = true;
if (!instantsToArchive.isEmpty()) {
this.writer = openWriter();
@@ -134,12 +155,212 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
LOG.info("No Instants to archive");
}
if (shouldMergeSmallArchiveFies()) {
mergeArchiveFilesIfNecessary(context);
}
return success;
} finally {
close();
}
}
public boolean shouldMergeSmallArchiveFies() {
return config.getArchiveMergeEnable() && !StorageSchemes.isAppendSupported(metaClient.getFs().getScheme());
}
/**
* Here Hoodie can merge the small archive files into a new larger one.
* Only used for filesystem which does not support append operation.
* The whole merge small archive files operation has four stages:
* 1. Build merge plan with merge candidates/merged file name infos.
* 2. Do merge.
* 3. Delete all the candidates.
* 4. Delete the merge plan.
* @param context HoodieEngineContext
* @throws IOException
*/
private void mergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException {
Path planPath = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME);
// Flush remained content if existed and open a new write
reOpenWriter();
// List all archive files
FileStatus[] fsStatuses = metaClient.getFs().globStatus(
new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
// Sort files by version suffix in reverse (implies reverse chronological order)
Arrays.sort(fsStatuses, new HoodieArchivedTimeline.ArchiveFileVersionComparator());
int archiveMergeFilesBatchSize = config.getArchiveMergeFilesBatchSize();
long smallFileLimitBytes = config.getArchiveMergeSmallFileLimitBytes();
List<FileStatus> mergeCandidate = getMergeCandidates(smallFileLimitBytes, fsStatuses);
if (mergeCandidate.size() >= archiveMergeFilesBatchSize) {
List<String> candidateFiles = mergeCandidate.stream().map(fs -> fs.getPath().toString()).collect(Collectors.toList());
// before merge archive files build merge plan
String logFileName = computeLogFileName();
buildArchiveMergePlan(candidateFiles, planPath, logFileName);
// merge archive files
mergeArchiveFiles(mergeCandidate);
// after merge, delete the small archive files.
deleteFilesParallelize(metaClient, candidateFiles, context, true);
LOG.info("Success to delete replaced small archive files.");
// finally, delete archiveMergePlan which means merging small archive files operation is succeed.
metaClient.getFs().delete(planPath, false);
LOG.info("Success to merge small archive files.");
}
}
/**
* Find the latest 'huge archive file' index as a break point and only check/merge newer archive files.
* Because we need to keep the original order of archive files which is important when loading archived instants with time filter.
* {@link HoodieArchivedTimeline} loadInstants(TimeRangeFilter filter, boolean loadInstantDetails, Function<GenericRecord, Boolean> commitsFilter)
* @param smallFileLimitBytes small File Limit Bytes
* @param fsStatuses Sort by version suffix in reverse
* @return merge candidates
*/
private List<FileStatus> getMergeCandidates(long smallFileLimitBytes, FileStatus[] fsStatuses) {
int index = 0;
for (; index < fsStatuses.length; index++) {
if (fsStatuses[index].getLen() > smallFileLimitBytes) {
break;
}
}
return Arrays.stream(fsStatuses).limit(index).collect(Collectors.toList());
}
/**
* Get final written archive file name based on storageSchemes which does not support append.
*/
private String computeLogFileName() throws IOException {
String logWriteToken = writer.getLogFile().getLogWriteToken();
HoodieLogFile hoodieLogFile = writer.getLogFile().rollOver(metaClient.getFs(), logWriteToken);
return hoodieLogFile.getFileName();
}
/**
* Check/Solve if there is any failed and unfinished merge small archive files operation
* @param context HoodieEngineContext used for parallelize to delete small archive files if necessary.
* @throws IOException
*/
private void verifyLastMergeArchiveFilesIfNecessary(HoodieEngineContext context) throws IOException {
if (shouldMergeSmallArchiveFies()) {
Path planPath = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME);
HoodieWrapperFileSystem fs = metaClient.getFs();
// If plan exist, last merge small archive files was failed.
// we need to revert or complete last action.
if (fs.exists(planPath)) {
HoodieMergeArchiveFilePlan plan = null;
try {
plan = TimelineMetadataUtils.deserializeAvroMetadata(FileIOUtils.readDataFromPath(fs, planPath).get(), HoodieMergeArchiveFilePlan.class);
} catch (IOException e) {
LOG.warn("Parsing merge archive plan failed.", e);
// Reading partial plan file which means last merge action is failed during writing plan file.
fs.delete(planPath);
return;
}
Path mergedArchiveFile = new Path(metaClient.getArchivePath(), plan.getMergedArchiveFileName());
List<Path> candidates = plan.getCandidate().stream().map(Path::new).collect(Collectors.toList());
if (candidateAllExists(candidates)) {
// Last merge action is failed during writing merged archive file.
// But all the small archive files are not deleted.
// Revert last action by deleting mergedArchiveFile if existed.
if (fs.exists(mergedArchiveFile)) {
fs.delete(mergedArchiveFile, false);
}
} else {
// Last merge action is failed during deleting small archive files.
// But the merged files is completed.
// Try to complete last action
if (fs.exists(mergedArchiveFile)) {
deleteFilesParallelize(metaClient, plan.getCandidate(), context, true);
}
}
fs.delete(planPath);
}
}
}
/**
* If all the candidate small archive files existed, last merge operation was failed during writing the merged archive file.
* If at least one of candidate small archive files existed, the merged archive file was created and last operation was failed during deleting the small archive files.
*/
private boolean candidateAllExists(List<Path> candidates) throws IOException {
for (Path archiveFile : candidates) {
if (!metaClient.getFs().exists(archiveFile)) {
// candidate is deleted
return false;
}
}
return true;
}
public void buildArchiveMergePlan(List<String> compactCandidate, Path planPath, String compactedArchiveFileName) throws IOException {
LOG.info("Start to build archive merge plan.");
HoodieMergeArchiveFilePlan plan = HoodieMergeArchiveFilePlan.newBuilder()
.setCandidate(compactCandidate)
.setMergedArchiveFileName(compactedArchiveFileName)
.build();
Option<byte[]> content = TimelineMetadataUtils.serializeAvroMetadata(plan, HoodieMergeArchiveFilePlan.class);
// building merge archive files plan.
FileIOUtils.createFileInPath(metaClient.getFs(), planPath, content);
LOG.info("Success to build archive merge plan");
}
public void mergeArchiveFiles(List<FileStatus> compactCandidate) throws IOException {
LOG.info("Starting to merge small archive files.");
Schema wrapperSchema = HoodieArchivedMetaEntry.getClassSchema();
try {
List<IndexedRecord> records = new ArrayList<>();
for (FileStatus fs : compactCandidate) {
// Read the archived file
try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(),
new HoodieLogFile(fs.getPath()), HoodieArchivedMetaEntry.getClassSchema())) {
// Read the avro blocks
while (reader.hasNext()) {
HoodieAvroDataBlock blk = (HoodieAvroDataBlock) reader.next();
List<IndexedRecord> recordsPerFile = blk.getRecords();
records.addAll(recordsPerFile);
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
writeToFile(wrapperSchema, records);
}
}
}
}
writeToFile(wrapperSchema, records);
} catch (Exception e) {
throw new HoodieCommitException("Failed to merge small archive files", e);
} finally {
writer.close();
}
LOG.info("Success to merge small archive files.");
}
private Map<String, Boolean> deleteFilesParallelize(HoodieTableMetaClient metaClient, List<String> paths, HoodieEngineContext context, boolean ignoreFailed) {
return FSUtils.parallelizeFilesProcess(context,
metaClient.getFs(),
config.getArchiveDeleteParallelism(),
pairOfSubPathAndConf -> {
Path file = new Path(pairOfSubPathAndConf.getKey());
try {
FileSystem fs = metaClient.getFs();
if (fs.exists(file)) {
return fs.delete(file, false);
}
return true;
} catch (IOException e) {
if (!ignoreFailed) {
throw new HoodieIOException("Failed to delete : " + file, e);
} else {
LOG.warn("Ignore failed deleting : " + file);
return true;
}
}
},
paths);
}
private Stream<HoodieInstant> getCleanInstantsToArchive() {
HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION, HoodieTimeline.ROLLBACK_ACTION)).filterCompletedInstants();
@@ -238,22 +459,7 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
).map(Path::toString).collect(Collectors.toList());
context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants");
Map<String, Boolean> resultDeleteInstantFiles = FSUtils.parallelizeFilesProcess(context,
metaClient.getFs(),
config.getArchiveDeleteParallelism(),
pairOfSubPathAndConf -> {
Path commitFile = new Path(pairOfSubPathAndConf.getKey());
try {
FileSystem fs = commitFile.getFileSystem(pairOfSubPathAndConf.getValue().get());
if (fs.exists(commitFile)) {
return fs.delete(commitFile, false);
}
return true;
} catch (IOException e) {
throw new HoodieIOException("Failed to delete archived instant " + commitFile, e);
}
},
instantFiles);
Map<String, Boolean> resultDeleteInstantFiles = deleteFilesParallelize(metaClient, instantFiles, context, false);
for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) {
LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue());

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.io;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.utils.MetadataConversionUtils;
import org.apache.hudi.common.config.HoodieMetadataConfig;
@@ -26,6 +27,8 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
@@ -35,9 +38,12 @@ import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
import org.apache.hudi.table.HoodieSparkTable;
@@ -68,6 +74,7 @@ import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
@@ -113,12 +120,41 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, maxDeltaCommitsMetadataTable, HoodieTableType.COPY_ON_WRITE);
}
private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata, int minArchivalCommits, int maxArchivalCommits, int maxDeltaCommitsMetadataTable,
private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
int minArchivalCommits,
int maxArchivalCommits,
int maxDeltaCommitsMetadataTable,
HoodieTableType tableType) throws Exception {
return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits, maxDeltaCommitsMetadataTable, tableType, false, 10, 209715200);
}
private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
int minArchivalCommits,
int maxArchivalCommits,
int maxDeltaCommitsMetadataTable,
boolean enableArchiveMerge,
int archiveFilesBatch,
long size) throws Exception {
return initTestTableAndGetWriteConfig(enableMetadata, minArchivalCommits, maxArchivalCommits,
maxDeltaCommitsMetadataTable, HoodieTableType.COPY_ON_WRITE, enableArchiveMerge, archiveFilesBatch, size);
}
private HoodieWriteConfig initTestTableAndGetWriteConfig(boolean enableMetadata,
int minArchivalCommits,
int maxArchivalCommits,
int maxDeltaCommitsMetadataTable,
HoodieTableType tableType,
boolean enableArchiveMerge,
int archiveFilesBatch,
long size) throws Exception {
init(tableType);
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minArchivalCommits, maxArchivalCommits).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().retainCommits(1).archiveCommitsWith(minArchivalCommits, maxArchivalCommits)
.withArchiveMergeEnable(enableArchiveMerge)
.withArchiveMergeFilesBatchSize(archiveFilesBatch)
.withArchiveMergeSmallFileLimit(size)
.build())
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withRemoteServerPort(timelineServicePort).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadata)
@@ -174,6 +210,222 @@ public class TestHoodieTimelineArchiveLog extends HoodieClientTestHarness {
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testMergeSmallArchiveFilesRecoverFromBuildPlanFailed(boolean enableArchiveMerge) throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, enableArchiveMerge, 3, 209715200);
// do ingestion and trigger archive actions here.
for (int i = 1; i < 8; i++) {
testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
archiveAndGetCommitsList(writeConfig);
}
// build a merge small archive plan with dummy content
// this plan can not be deserialized.
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table);
FileStatus[] fsStatuses = metaClient.getFs().globStatus(
new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
List<String> candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList());
archiveLog.reOpenWriter();
Path plan = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME);
archiveLog.buildArchiveMergePlan(candidateFiles, plan, ".commits_.archive.3_1-0-1");
String s = "Dummy Content";
// stain the current merge plan file.
FileIOUtils.createFileInPath(metaClient.getFs(), plan, Option.of(s.getBytes()));
// check that damaged plan file will not block archived timeline loading.
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload();
assertEquals(7 * 3, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants());
// trigger several archive after left damaged merge small archive file plan.
for (int i = 1; i < 10; i++) {
testTable.doWriteOperation("1000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
archiveAndGetCommitsList(writeConfig);
}
// loading archived timeline and active timeline success
HoodieActiveTimeline rawActiveTimeline1 = new HoodieActiveTimeline(metaClient, false);
HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline().reload();
// check instant number
assertEquals(16 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants());
// if there are damaged archive files and damaged plan, hoodie need throw ioe while loading archived timeline.
Path damagedFile = new Path(metaClient.getArchivePath(), ".commits_.archive.300_1-0-1");
FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of(s.getBytes()));
assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testMergeSmallArchiveFilesRecoverFromMergeFailed(boolean enableArchiveMerge) throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, enableArchiveMerge, 3, 209715200);
// do ingestion and trigger archive actions here.
for (int i = 1; i < 8; i++) {
testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
archiveAndGetCommitsList(writeConfig);
}
// do a single merge small archive files
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table);
FileStatus[] fsStatuses = metaClient.getFs().globStatus(
new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
List<String> candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList());
archiveLog.reOpenWriter();
archiveLog.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1");
archiveLog.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList()));
HoodieLogFormat.Writer writer = archiveLog.reOpenWriter();
// check loading archived and active timeline success
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload();
assertEquals(7 * 3, rawActiveTimeline.countInstants() + archivedTimeLine.reload().countInstants());
String s = "Dummy Content";
// stain the current merged archive file.
FileIOUtils.createFileInPath(metaClient.getFs(), writer.getLogFile().getPath(), Option.of(s.getBytes()));
// do another archive actions with merge small archive files.
for (int i = 1; i < 10; i++) {
testTable.doWriteOperation("1000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
archiveAndGetCommitsList(writeConfig);
}
// check result.
// we need to load archived timeline successfully and ignore the parsing damage merged archive files exception.
HoodieActiveTimeline rawActiveTimeline1 = new HoodieActiveTimeline(metaClient, false);
HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline().reload();
assertEquals(16 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants());
// if there are a damaged merged archive files and other common damaged archive file.
// hoodie need throw ioe while loading archived timeline because of parsing the damaged archive file.
Path damagedFile = new Path(metaClient.getArchivePath(), ".commits_.archive.300_1-0-1");
FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of(s.getBytes()));
assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testMergeSmallArchiveFilesRecoverFromDeleteFailed(boolean enableArchiveMerge) throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, enableArchiveMerge, 3, 209715200);
// do ingestion and trigger archive actions here.
for (int i = 1; i < 8; i++) {
testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
archiveAndGetCommitsList(writeConfig);
}
// do a single merge small archive files
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table);
FileStatus[] fsStatuses = metaClient.getFs().globStatus(
new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
List<String> candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList());
archiveLog.reOpenWriter();
archiveLog.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1");
archiveLog.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList()));
archiveLog.reOpenWriter();
// delete only one of the small archive file to simulate delete action failed.
metaClient.getFs().delete(fsStatuses[0].getPath());
// loading archived timeline and active timeline success
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload();
assertEquals(7 * 3, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants());
// do another archive actions with merge small archive files.
for (int i = 1; i < 10; i++) {
testTable.doWriteOperation("1000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
archiveAndGetCommitsList(writeConfig);
}
// check result.
HoodieActiveTimeline rawActiveTimeline1 = new HoodieActiveTimeline(metaClient, false);
HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline().reload();
assertEquals(16 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testLoadArchiveTimelineWithDamagedPlanFile(boolean enableArchiveMerge) throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, enableArchiveMerge, 3, 209715200);
// do ingestion and trigger archive actions here.
for (int i = 1; i < 8; i++) {
testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
archiveAndGetCommitsList(writeConfig);
}
Path plan = new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME);
String s = "Dummy Content";
// stain the current merge plan file.
FileIOUtils.createFileInPath(metaClient.getFs(), plan, Option.of(s.getBytes()));
// check that damaged plan file will not block archived timeline loading.
HoodieActiveTimeline rawActiveTimeline = new HoodieActiveTimeline(metaClient, false);
HoodieArchivedTimeline archivedTimeLine = metaClient.getArchivedTimeline().reload();
assertEquals(7 * 3, rawActiveTimeline.countInstants() + archivedTimeLine.countInstants());
// if there are damaged archive files and damaged plan, hoodie need throw ioe while loading archived timeline.
Path damagedFile = new Path(metaClient.getArchivePath(), ".commits_.archive.300_1-0-1");
FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of(s.getBytes()));
assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testLoadArchiveTimelineWithUncompletedMergeArchiveFile(boolean enableArchiveMerge) throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2, 3, 2, enableArchiveMerge, 3, 209715200);
for (int i = 1; i < 8; i++) {
testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
archiveAndGetCommitsList(writeConfig);
}
HoodieTable table = HoodieSparkTable.create(writeConfig, context, metaClient);
HoodieTimelineArchiveLog archiveLog = new HoodieTimelineArchiveLog(writeConfig, table);
FileStatus[] fsStatuses = metaClient.getFs().globStatus(
new Path(metaClient.getArchivePath() + "/.commits_.archive*"));
List<String> candidateFiles = Arrays.stream(fsStatuses).map(fs -> fs.getPath().toString()).collect(Collectors.toList());
archiveLog.reOpenWriter();
archiveLog.buildArchiveMergePlan(candidateFiles, new Path(metaClient.getArchivePath(), HoodieArchivedTimeline.MERGE_ARCHIVE_PLAN_NAME), ".commits_.archive.3_1-0-1");
archiveLog.mergeArchiveFiles(Arrays.stream(fsStatuses).collect(Collectors.toList()));
HoodieLogFormat.Writer writer = archiveLog.reOpenWriter();
String s = "Dummy Content";
// stain the current merged archive file.
FileIOUtils.createFileInPath(metaClient.getFs(), writer.getLogFile().getPath(), Option.of(s.getBytes()));
// if there's only a damaged merged archive file, we need to ignore the exception while reading this damaged file.
HoodieActiveTimeline rawActiveTimeline1 = new HoodieActiveTimeline(metaClient, false);
HoodieArchivedTimeline archivedTimeLine1 = metaClient.getArchivedTimeline();
assertEquals(7 * 3, archivedTimeLine1.countInstants() + rawActiveTimeline1.countInstants());
// if there are a damaged merged archive files and other common damaged archive file.
// hoodie need throw ioe while loading archived timeline because of parsing the damaged archive file.
Path damagedFile = new Path(metaClient.getArchivePath(), ".commits_.archive.300_1-0-1");
FileIOUtils.createFileInPath(metaClient.getFs(), damagedFile, Option.of(s.getBytes()));
assertThrows(HoodieException.class, () -> metaClient.getArchivedTimeline().reload());
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testNoArchivalUntilMaxArchiveConfigWithExtraInflightCommits(boolean enableMetadata) throws Exception {

View File

@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{
"namespace":"org.apache.hudi.avro.model",
"type":"record",
"name":"HoodieMergeArchiveFilePlan",
"fields":[
{
"name":"version",
"type":["int", "null"],
"default": 1
},
{
"name":"candidate",
"type":["null", {
"type":"array",
"items": "string"
}],
"default": null
},
{
"name":"mergedArchiveFileName",
"type":["null", "string"],
"default": null
}
]
}

View File

@@ -361,7 +361,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
private void createFileInAuxiliaryFolder(HoodieInstant instant, Option<byte[]> data) {
// This will be removed in future release. See HUDI-546
Path fullPath = new Path(metaClient.getMetaAuxiliaryPath(), instant.getFileName());
createFileInPath(fullPath, data);
FileIOUtils.createFileInPath(metaClient.getFs(), fullPath, data);
}
//-----------------------------------------------------------------
@@ -505,7 +505,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
fromInstant.getFileName())));
// Use Write Once to create Target File
if (allowRedundantTransitions) {
createFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data);
FileIOUtils.createFileInPath(metaClient.getFs(), new Path(metaClient.getMetaPath(), toInstant.getFileName()), data);
} else {
createImmutableFileInPath(new Path(metaClient.getMetaPath(), toInstant.getFileName()), data);
}
@@ -602,33 +602,12 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
private void createFileInMetaPath(String filename, Option<byte[]> content, boolean allowOverwrite) {
Path fullPath = new Path(metaClient.getMetaPath(), filename);
if (allowOverwrite || metaClient.getTimelineLayoutVersion().isNullVersion()) {
createFileInPath(fullPath, content);
FileIOUtils.createFileInPath(metaClient.getFs(), fullPath, content);
} else {
createImmutableFileInPath(fullPath, content);
}
}
private void createFileInPath(Path fullPath, Option<byte[]> content) {
try {
// If the path does not exist, create it first
if (!metaClient.getFs().exists(fullPath)) {
if (metaClient.getFs().createNewFile(fullPath)) {
LOG.info("Created a new file in meta path: " + fullPath);
} else {
throw new HoodieIOException("Failed to create file " + fullPath);
}
}
if (content.isPresent()) {
FSDataOutputStream fsout = metaClient.getFs().create(fullPath, true);
fsout.write(content.get());
fsout.close();
}
} catch (IOException e) {
throw new HoodieIOException("Failed to create file " + fullPath, e);
}
}
/**
* Creates a new file in timeline with overwrite set to false. This ensures
* files are created only once and never rewritten

View File

@@ -20,13 +20,17 @@ package org.apache.hudi.common.table.timeline;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.avro.generic.GenericRecord;
@@ -46,6 +50,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -66,6 +71,7 @@ import java.util.stream.Collectors;
* This class can be serialized and de-serialized and on de-serialization the FileSystem is re-initialized.
*/
public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
public static final String MERGE_ARCHIVE_PLAN_NAME = "mergeArchivePlan";
private static final Pattern ARCHIVE_FILE_PATTERN =
Pattern.compile("^\\.commits_\\.archive\\.([0-9]+).*");
@@ -218,7 +224,7 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
// Sort files by version suffix in reverse (implies reverse chronological order)
Arrays.sort(fsStatuses, new ArchiveFileVersionComparator());
List<HoodieInstant> instantsInRange = new ArrayList<>();
Set<HoodieInstant> instantsInRange = new HashSet<>();
for (FileStatus fs : fsStatuses) {
// Read the archived file
try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(metaClient.getFs(),
@@ -248,11 +254,32 @@ public class HoodieArchivedTimeline extends HoodieDefaultTimeline {
break;
}
}
} catch (Exception originalException) {
// merge small archive files may left uncompleted archive file which will cause exception.
// need to ignore this kind of exception here.
try {
Path planPath = new Path(metaClient.getArchivePath(), MERGE_ARCHIVE_PLAN_NAME);
HoodieWrapperFileSystem fileSystem = metaClient.getFs();
if (fileSystem.exists(planPath)) {
HoodieMergeArchiveFilePlan plan = TimelineMetadataUtils.deserializeAvroMetadata(FileIOUtils.readDataFromPath(fileSystem, planPath).get(), HoodieMergeArchiveFilePlan.class);
String mergedArchiveFileName = plan.getMergedArchiveFileName();
if (!StringUtils.isNullOrEmpty(mergedArchiveFileName) && fs.getPath().getName().equalsIgnoreCase(mergedArchiveFileName)) {
LOG.warn("Catch exception because of reading uncompleted merging archive file " + mergedArchiveFileName + ". Ignore it here.");
continue;
}
}
throw originalException;
} catch (Exception e) {
// If anything wrong during parsing merge archive plan, we need to throw the original exception.
// For example corrupted archive file and corrupted plan are both existed.
throw originalException;
}
}
}
Collections.sort(instantsInRange);
return instantsInRange;
ArrayList<HoodieInstant> result = new ArrayList<>(instantsInRange);
Collections.sort(result);
return result;
} catch (IOException e) {
throw new HoodieIOException(
"Could not load archived commit timeline from path " + metaClient.getArchivePath(), e);

View File

@@ -160,4 +160,48 @@ public class FileIOUtils {
LOG.warn("IOException during close", e);
}
}
public static void createFileInPath(FileSystem fileSystem, org.apache.hadoop.fs.Path fullPath, Option<byte[]> content, boolean ignoreIOE) {
try {
// If the path does not exist, create it first
if (!fileSystem.exists(fullPath)) {
if (fileSystem.createNewFile(fullPath)) {
LOG.info("Created a new file in meta path: " + fullPath);
} else {
throw new HoodieIOException("Failed to create file " + fullPath);
}
}
if (content.isPresent()) {
FSDataOutputStream fsout = fileSystem.create(fullPath, true);
fsout.write(content.get());
fsout.close();
}
} catch (IOException e) {
LOG.warn("Failed to create file " + fullPath, e);
if (!ignoreIOE) {
throw new HoodieIOException("Failed to create file " + fullPath, e);
}
}
}
public static void createFileInPath(FileSystem fileSystem, org.apache.hadoop.fs.Path fullPath, Option<byte[]> content) {
createFileInPath(fileSystem, fullPath, content, false);
}
public static Option<byte[]> readDataFromPath(FileSystem fileSystem, org.apache.hadoop.fs.Path detailPath, boolean ignoreIOE) {
try (FSDataInputStream is = fileSystem.open(detailPath)) {
return Option.of(FileIOUtils.readAsByteArray(is));
} catch (IOException e) {
LOG.warn("Could not read commit details from " + detailPath, e);
if (!ignoreIOE) {
throw new HoodieIOException("Could not read commit details from " + detailPath, e);
}
return Option.empty();
}
}
public static Option<byte[]> readDataFromPath(FileSystem fileSystem, org.apache.hadoop.fs.Path detailPath) {
return readDataFromPath(fileSystem, detailPath, false);
}
}