[HUDI-1518] Remove the logic that delete replaced file when archive (#3310)
* remove delete replaced file when archive * done * remove unsed import * remove delete replaced files when archive realted UT * code reviewed Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
@@ -20,24 +20,13 @@ package org.apache.hudi.client;
|
||||
|
||||
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.model.FileSlice;
|
||||
import org.apache.hudi.common.model.HoodieFileGroup;
|
||||
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
|
||||
import org.apache.hudi.common.model.HoodieRollingStatMetadata;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.view.TableFileSystemView;
|
||||
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* Operates on marker files for a given write action (commit, delta commit, compaction).
|
||||
@@ -61,45 +50,4 @@ public class ReplaceArchivalHelper implements Serializable {
|
||||
avroMetaData.getExtraMetadata().put(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY, "");
|
||||
return avroMetaData;
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete all files represented by FileSlices in parallel. Return true if all files are deleted successfully.
|
||||
*/
|
||||
public static boolean deleteReplacedFileGroups(HoodieEngineContext context, HoodieTableMetaClient metaClient,
|
||||
TableFileSystemView fileSystemView,
|
||||
HoodieInstant instant, List<String> replacedPartitions) {
|
||||
// There is no file id to be replaced in the very first replace commit file for insert overwrite operation
|
||||
if (replacedPartitions.isEmpty()) {
|
||||
LOG.warn("Found no partition files to replace");
|
||||
return true;
|
||||
}
|
||||
context.setJobStatus(ReplaceArchivalHelper.class.getSimpleName(), "Delete replaced file groups");
|
||||
List<Boolean> f = context.map(replacedPartitions, partition -> {
|
||||
Stream<FileSlice> fileSlices = fileSystemView.getReplacedFileGroupsBeforeOrOn(instant.getTimestamp(), partition)
|
||||
.flatMap(HoodieFileGroup::getAllRawFileSlices);
|
||||
return fileSlices.allMatch(slice -> deleteFileSlice(slice, metaClient, instant));
|
||||
}, replacedPartitions.size());
|
||||
|
||||
return f.stream().reduce((x, y) -> x & y).orElse(true);
|
||||
}
|
||||
|
||||
private static boolean deleteFileSlice(FileSlice fileSlice, HoodieTableMetaClient metaClient, HoodieInstant instant) {
|
||||
boolean baseFileDeleteSuccess = fileSlice.getBaseFile().map(baseFile ->
|
||||
deletePath(new Path(baseFile.getPath()), metaClient, instant)).orElse(true);
|
||||
|
||||
boolean logFileSuccess = fileSlice.getLogFiles().map(logFile ->
|
||||
deletePath(logFile.getPath(), metaClient, instant)).allMatch(x -> x);
|
||||
return baseFileDeleteSuccess & logFileSuccess;
|
||||
}
|
||||
|
||||
private static boolean deletePath(Path path, HoodieTableMetaClient metaClient, HoodieInstant instant) {
|
||||
try {
|
||||
LOG.info("Deleting " + path + " before archiving " + instant);
|
||||
metaClient.getFs().delete(path);
|
||||
return true;
|
||||
} catch (IOException e) {
|
||||
LOG.error("unable to delete file groups that are replaced", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,13 +22,11 @@ import org.apache.avro.Schema;
|
||||
import org.apache.avro.generic.IndexedRecord;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
|
||||
import org.apache.hudi.client.ReplaceArchivalHelper;
|
||||
import org.apache.hudi.client.utils.MetadataConversionUtils;
|
||||
import org.apache.hudi.common.engine.HoodieEngineContext;
|
||||
import org.apache.hudi.common.model.HoodieArchivedLogFile;
|
||||
import org.apache.hudi.common.model.HoodieAvroPayload;
|
||||
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
|
||||
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
import org.apache.hudi.common.table.log.HoodieLogFormat;
|
||||
import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
|
||||
@@ -40,7 +38,6 @@ import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
|
||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
|
||||
import org.apache.hudi.common.table.view.TableFileSystemView;
|
||||
import org.apache.hudi.common.util.CollectionUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
@@ -303,11 +300,6 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
|
||||
LOG.info("Wrapper schema " + wrapperSchema.toString());
|
||||
List<IndexedRecord> records = new ArrayList<>();
|
||||
for (HoodieInstant hoodieInstant : instants) {
|
||||
// TODO HUDI-1518 Cleaner now takes care of removing replaced file groups. This call to deleteReplacedFileGroups can be removed.
|
||||
boolean deleteSuccess = deleteReplacedFileGroups(context, hoodieInstant);
|
||||
if (!deleteSuccess) {
|
||||
LOG.warn("Unable to delete file(s) for " + hoodieInstant.getFileName() + ", replaced files possibly deleted by cleaner");
|
||||
}
|
||||
try {
|
||||
deleteAnyLeftOverMarkers(context, hoodieInstant);
|
||||
records.add(convertToAvroRecord(hoodieInstant));
|
||||
@@ -334,29 +326,6 @@ public class HoodieTimelineArchiveLog<T extends HoodieAvroPayload, I, K, O> {
|
||||
}
|
||||
}
|
||||
|
||||
private boolean deleteReplacedFileGroups(HoodieEngineContext context, HoodieInstant instant) {
|
||||
if (!instant.isCompleted() || !HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
|
||||
// only delete files for completed replace instants
|
||||
return true;
|
||||
}
|
||||
|
||||
TableFileSystemView fileSystemView = this.table.getFileSystemView();
|
||||
List<String> replacedPartitions = getReplacedPartitions(instant);
|
||||
return ReplaceArchivalHelper.deleteReplacedFileGroups(context, metaClient, fileSystemView, instant, replacedPartitions);
|
||||
}
|
||||
|
||||
private List<String> getReplacedPartitions(HoodieInstant instant) {
|
||||
try {
|
||||
HoodieReplaceCommitMetadata metadata = HoodieReplaceCommitMetadata.fromBytes(
|
||||
metaClient.getActiveTimeline().getInstantDetails(instant).get(),
|
||||
HoodieReplaceCommitMetadata.class);
|
||||
|
||||
return new ArrayList<>(metadata.getPartitionToReplaceFileIds().keySet());
|
||||
} catch (IOException e) {
|
||||
throw new HoodieCommitException("Failed to archive because cannot delete replace files", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) throws Exception {
|
||||
if (records.size() > 0) {
|
||||
Map<HeaderMetadataType, String> header = new HashMap<>();
|
||||
|
||||
Reference in New Issue
Block a user