1
0

[HUDI-1902] Clean the corrupted files generated by FlinkMergeAndReplaceHandle (#2949)

Make the intermediate files of FlinkMergeAndReplaceHandle hidden, when
committing the instant, clean these files in case there was some
corrupted files left(in normal case, the intermediate files should be cleaned
by the FlinkMergeAndReplaceHandle itself).
This commit is contained in:
Danny Chan
2021-05-14 15:43:37 +08:00
committed by GitHub
parent 12443e4187
commit 8869b3b418
5 changed files with 32 additions and 8 deletions

View File

@@ -479,6 +479,13 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
}, config.getFinalizeWriteParallelism()); }, config.getFinalizeWriteParallelism());
} }
/**
* Returns the possible invalid data file name with given marker files.
*/
protected Set<String> getInvalidDataPaths(MarkerFiles markers) throws IOException {
return markers.createdAndMergedDataPaths(context, config.getFinalizeWriteParallelism());
}
/** /**
* Reconciles WriteStats and marker files to detect and safely delete duplicate data files created because of Spark * Reconciles WriteStats and marker files to detect and safely delete duplicate data files created because of Spark
* retries. * retries.
@@ -505,7 +512,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
} }
// we are not including log appends here, since they are already fail-safe. // we are not including log appends here, since they are already fail-safe.
Set<String> invalidDataPaths = markers.createdAndMergedDataPaths(context, config.getFinalizeWriteParallelism()); Set<String> invalidDataPaths = getInvalidDataPaths(markers);
Set<String> validDataPaths = stats.stream() Set<String> validDataPaths = stats.stream()
.map(HoodieWriteStat::getPath) .map(HoodieWriteStat::getPath)
.filter(p -> p.endsWith(this.getBaseFileExtension())) .filter(p -> p.endsWith(this.getBaseFileExtension()))

View File

@@ -128,8 +128,9 @@ public class FlinkMergeAndReplaceHandle<T extends HoodieRecordPayload, I, K, O>
*/ */
protected String newFileNameWithRollover(int rollNumber) { protected String newFileNameWithRollover(int rollNumber) {
// make the intermediate file as hidden // make the intermediate file as hidden
final String fileID = "." + this.fileId;
return FSUtils.makeDataFileName(instantTime, writeToken + "-" + rollNumber, return FSUtils.makeDataFileName(instantTime, writeToken + "-" + rollNumber,
this.fileId, hoodieTable.getBaseFileExtension()); fileID, hoodieTable.getBaseFileExtension());
} }
@Override @Override

View File

@@ -31,12 +31,10 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieNotSupportedException; import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.io.HoodieCreateHandle; import org.apache.hudi.io.HoodieCreateHandle;
@@ -56,6 +54,7 @@ import org.apache.hudi.table.action.commit.FlinkMergeHelper;
import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor; import org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.rollback.FlinkCopyOnWriteRollbackActionExecutor; import org.apache.hudi.table.action.rollback.FlinkCopyOnWriteRollbackActionExecutor;
import org.apache.hudi.util.FlinkClientUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -65,6 +64,10 @@ import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
/** /**
* Implementation of a very heavily read-optimized Hoodie Table where, all data is stored in base files, with * Implementation of a very heavily read-optimized Hoodie Table where, all data is stored in base files, with
@@ -321,9 +324,13 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends
} }
@Override @Override
public void finalizeWrite(HoodieEngineContext context, String instantTs, List<HoodieWriteStat> stats) throws HoodieIOException { protected Set<String> getInvalidDataPaths(MarkerFiles markers) throws IOException {
// do nothing because flink create and merge handles can clean the // keep only the intermediate file generated by FlinkMergeAndReplaceHandle.
// retry files by themselves. return super.getInvalidDataPaths(markers).stream()
.filter(path -> {
final String fileName = FlinkClientUtil.parseFileName(path);
return fileName.startsWith(".") && fileName.endsWith(PARQUET.getFileExtension());
}).collect(Collectors.toSet());
} }
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------

View File

@@ -29,6 +29,14 @@ import java.io.File;
*/ */
public class FlinkClientUtil { public class FlinkClientUtil {
/**
* Parses the file name from path.
*/
public static String parseFileName(String path) {
int slash = path.lastIndexOf(Path.SEPARATOR);
return path.substring(slash + 1);
}
/** /**
* Returns the hadoop configuration with possible hadoop conf paths. * Returns the hadoop configuration with possible hadoop conf paths.
* E.G. the configurations under path $HADOOP_CONF_DIR and $HADOOP_HOME. * E.G. the configurations under path $HADOOP_CONF_DIR and $HADOOP_HOME.

View File

@@ -241,7 +241,8 @@ public class BucketAssigner {
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList()); .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
for (HoodieBaseFile file : allFiles) { for (HoodieBaseFile file : allFiles) {
if (file.getFileSize() < config.getParquetSmallFileLimit()) { // filter out the corrupted files.
if (file.getFileSize() < config.getParquetSmallFileLimit() && file.getFileSize() > 0) {
String filename = file.getFileName(); String filename = file.getFileName();
SmallFile sf = new SmallFile(); SmallFile sf = new SmallFile();
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));