1
0

[HUDI-2204] Add marker files for flink writer (#3316)

This commit is contained in:
Danny Chan
2021-07-22 13:34:15 +08:00
committed by GitHub
parent 5a94b6bf54
commit 2370a9facb
6 changed files with 22 additions and 31 deletions

View File

@@ -79,7 +79,7 @@ public class HoodieFlinkEngineContext extends HoodieEngineContext {
@Override
public <I, K, V> Map<K, V> mapToPair(List<I> data, SerializablePairFunction<I, K, V> func, Integer parallelism) {
return data.stream().map(throwingMapToPairWrapper(func)).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
return data.stream().parallel().map(throwingMapToPairWrapper(func)).collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
}
@Override

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
@@ -99,6 +100,12 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
}
}
@Override
protected void createMarkerFile(String partitionPath, String dataFileName) {
MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType());
}
@Override
public Path makeNewPath(String partitionPath) {
Path path = super.makeNewPath(partitionPath);

View File

@@ -28,6 +28,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
@@ -102,7 +103,8 @@ public class FlinkMergeAndReplaceHandle<T extends HoodieRecordPayload, I, K, O>
@Override
protected void createMarkerFile(String partitionPath, String dataFileName) {
// no need to create any marker file for intermediate file.
MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType());
}
@Override

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
@@ -114,9 +115,8 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
@Override
protected void createMarkerFile(String partitionPath, String dataFileName) {
// no need to create marker file for flink merge handle,
// the flink write handle does not rely on MARKER files for
// corrupt files cleaning, see HoodieFlinkCopyOnWriteTable#getInvalidDataPaths for details.
MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType());
}
@Override

View File

@@ -54,7 +54,6 @@ import org.apache.hudi.table.action.commit.FlinkMergeHelper;
import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.rollback.FlinkCopyOnWriteRollbackActionExecutor;
import org.apache.hudi.util.FlinkClientUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,10 +63,6 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
/**
* Implementation of a very heavily read-optimized Hoodie Table where, all data is stored in base files, with
@@ -323,16 +318,6 @@ public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends
throw new HoodieNotSupportedException("Savepoint and restore is not supported yet");
}
@Override
protected Set<String> getInvalidDataPaths(MarkerFiles markers) throws IOException {
// keep only the intermediate file generated by FlinkMergeAndReplaceHandle.
return super.getInvalidDataPaths(markers).stream()
.filter(path -> {
final String fileName = FlinkClientUtil.parseFileName(path);
return fileName.startsWith(".") && fileName.endsWith(PARQUET.getFileExtension());
}).collect(Collectors.toSet());
}
// -------------------------------------------------------------------------
// Used for compaction
// -------------------------------------------------------------------------

View File

@@ -40,7 +40,6 @@ import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -62,9 +61,8 @@ public class FlinkCleanActionExecutor<T extends HoodieRecordPayload> extends
@Override
List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) {
Iterator<Tuple2<String, CleanFileInfo>> filesToBeDeletedPerPartition = cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
.flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile())))).iterator();
Stream<Tuple2<String, CleanFileInfo>> filesToBeDeletedPerPartition = cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
.flatMap(x -> x.getValue().stream().map(y -> new Tuple2<>(x.getKey(), new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile()))));
Stream<Tuple2<String, PartitionCleanStat>> partitionCleanStats =
deleteFilesFunc(filesToBeDeletedPerPartition, table)
@@ -97,12 +95,11 @@ public class FlinkCleanActionExecutor<T extends HoodieRecordPayload> extends
}).collect(Collectors.toList());
}
private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator<Tuple2<String, CleanFileInfo>> iter, HoodieTable table) {
private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Stream<Tuple2<String, CleanFileInfo>> cleanFileInfo, HoodieTable table) {
Map<String, PartitionCleanStat> partitionCleanStatMap = new HashMap<>();
FileSystem fs = table.getMetaClient().getFs();
while (iter.hasNext()) {
Tuple2<String, CleanFileInfo> partitionDelFileTuple = iter.next();
cleanFileInfo.parallel().forEach(partitionDelFileTuple -> {
String partitionPath = partitionDelFileTuple._1();
Path deletePath = new Path(partitionDelFileTuple._2().getFilePath());
String deletePathStr = deletePath.toString();
@@ -112,11 +109,11 @@ public class FlinkCleanActionExecutor<T extends HoodieRecordPayload> extends
} catch (IOException e) {
LOG.error("Delete file failed");
}
if (!partitionCleanStatMap.containsKey(partitionPath)) {
partitionCleanStatMap.put(partitionPath, new PartitionCleanStat(partitionPath));
final PartitionCleanStat partitionCleanStat;
synchronized (partitionCleanStatMap) {
partitionCleanStat = partitionCleanStatMap.computeIfAbsent(partitionPath, k -> new PartitionCleanStat(partitionPath));
}
boolean isBootstrapBasePathFile = partitionDelFileTuple._2().isBootstrapBaseFile();
PartitionCleanStat partitionCleanStat = partitionCleanStatMap.get(partitionPath);
if (isBootstrapBasePathFile) {
// For Bootstrap Base file deletions, store the full file path.
partitionCleanStat.addDeleteFilePatterns(deletePath.toString(), true);
@@ -125,7 +122,7 @@ public class FlinkCleanActionExecutor<T extends HoodieRecordPayload> extends
partitionCleanStat.addDeleteFilePatterns(deletePath.getName(), false);
partitionCleanStat.addDeletedFileResult(deletePath.getName(), deletedFileResult, false);
}
}
});
return partitionCleanStatMap.entrySet().stream().map(e -> Pair.of(e.getKey(), e.getValue()));
}
}