From b6d949b48a649acac27d5d9b91677bf2e25e9342 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Fri, 16 Apr 2021 11:40:53 +0800 Subject: [PATCH] [HUDI-1801] FlinkMergeHandle rolling over may miss to rename the latest file handle (#2831) The FlinkMergeHandle may rename the N-1 th file handle instead of the latest one, thus to cause data duplication. --- .../org/apache/hudi/io/FlinkMergeHandle.java | 41 ++++++++----------- .../commit/BaseFlinkCommitActionExecutor.java | 2 +- .../table/action/commit/FlinkMergeHelper.java | 8 ++-- .../hudi/table/HoodieDataSourceITCase.java | 28 +++++++++++++ 4 files changed, 49 insertions(+), 30 deletions(-) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java index 43930ad3e..244c56bc0 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java @@ -21,7 +21,6 @@ package org.apache.hudi.io; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.util.HoodieTimer; @@ -37,7 +36,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; -import java.util.Map; /** * A {@link HoodieMergeHandle} that supports merge write incrementally(small data buffers). @@ -63,14 +61,16 @@ public class FlinkMergeHandle * Records the current file handles number that rolls over. */ private int rollNumber = 0; + + /** + * Whether the handle should roll over to new, E.G. the handle has written some intermediate files already. + */ + private volatile boolean shouldRollover = false; + /** * Records the rolled over file paths. */ - private List rolloverPaths; - /** - * Whether it is the first time to generate file handle, E.G. the handle has not rolled over yet. - */ - private boolean needBootStrap = true; + private final List rolloverPaths; public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, Iterator> recordItr, String partitionPath, String fileId, @@ -79,32 +79,22 @@ public class FlinkMergeHandle rolloverPaths = new ArrayList<>(); } - /** - * Called by compactor code path. - */ - public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable hoodieTable, - Map> keyToNewRecords, String partitionPath, String fileId, - HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) { - super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId, - dataFileToBeMerged, taskContextSupplier); - } - /** * Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write. */ - protected String generatesDataFileName() { - final String fileID = this.needBootStrap ? fileId : fileId + "-" + rollNumber; + protected String generatesDataFileNameWithRollover() { + final String fileID = this.fileId + "-" + rollNumber; return FSUtils.makeDataFileName(instantTime, writeToken, fileID, hoodieTable.getBaseFileExtension()); } - public boolean isNeedBootStrap() { - return needBootStrap; + public boolean shouldRollover() { + return shouldRollover; } @Override public List close() { List writeStatus = super.close(); - this.needBootStrap = false; + this.shouldRollover = true; return writeStatus; } @@ -138,12 +128,12 @@ public class FlinkMergeHandle this.writeStatus.setTotalErrorRecords(0); this.timer = new HoodieTimer().startTimer(); - rollNumber++; + rollNumber += 1; rolloverPaths.add(newFilePath); oldFilePath = newFilePath; // Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write. - String newFileName = generatesDataFileName(); + String newFileName = generatesDataFileNameWithRollover(); String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + newFileName).toString(); newFilePath = new Path(config.getBasePath(), relativePath); @@ -160,6 +150,7 @@ public class FlinkMergeHandle public void finishWrite() { // The file visibility should be kept by the configured ConsistencyGuard instance. + rolloverPaths.add(newFilePath); if (rolloverPaths.size() == 1) { // only one flush action, no need to roll over return; @@ -176,7 +167,7 @@ public class FlinkMergeHandle Path lastPath = rolloverPaths.size() > 0 ? rolloverPaths.get(rolloverPaths.size() - 1) : newFilePath; - String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()); + String newFileName = generatesDataFileName(); String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") + newFileName).toString(); final Path desiredPath = new Path(config.getBasePath(), relativePath); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java index 5df46a65b..f3e10e8ca 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java @@ -240,7 +240,7 @@ public abstract class BaseFlinkCommitActionExecutor extends AbstractMer final GenericDatumWriter gWriter; final GenericDatumReader gReader; Schema readSchema; - if (isNeedBootStrap(mergeHandle) + if (isFirstTimeMerge(mergeHandle) && (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent())) { readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema(); gWriter = new GenericDatumWriter<>(readSchema); @@ -86,7 +86,7 @@ public class FlinkMergeHelper extends AbstractMer HoodieFileReader reader = HoodieFileReaderFactory.getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); try { final Iterator readerIterator; - if (isNeedBootStrap(mergeHandle) && baseFile.getBootstrapBaseFile().isPresent()) { + if (isFirstTimeMerge(mergeHandle) && baseFile.getBootstrapBaseFile().isPresent()) { readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation); } else { readerIterator = reader.getRecordIterator(readSchema); @@ -115,8 +115,8 @@ public class FlinkMergeHelper extends AbstractMer } } - private static boolean isNeedBootStrap(HoodieMergeHandle mergeHandle) { - return mergeHandle instanceof FlinkMergeHandle && ((FlinkMergeHandle) mergeHandle).isNeedBootStrap(); + private static boolean isFirstTimeMerge(HoodieMergeHandle mergeHandle) { + return mergeHandle instanceof FlinkMergeHandle && !((FlinkMergeHandle) mergeHandle).shouldRollover(); } } diff --git a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java index d2ba3e693..29a7d7d0a 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java +++ b/hudi-flink/src/test/java/org/apache/hudi/table/HoodieDataSourceITCase.java @@ -324,6 +324,34 @@ public class HoodieDataSourceITCase extends AbstractTestBase { assertRowsEquals(result2, expected); } + @ParameterizedTest + @EnumSource(value = ExecMode.class) + void testUpsertWithMiniBatches(ExecMode execMode) { + TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv; + Map options = new HashMap<>(); + options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath()); + options.put(FlinkOptions.WRITE_BATCH_SIZE.key(), "0.001"); + String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options); + tableEnv.executeSql(hoodieTableDDL); + + final String insertInto1 = "insert into t1 values\n" + + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')"; + + execInsertSql(tableEnv, insertInto1); + + final String insertInto2 = "insert into t1 values\n" + + "('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n" + + "('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par1'),\n" + + "('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par1'),\n" + + "('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par1')"; + + execInsertSql(tableEnv, insertInto2); + + List result = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from t1").execute().collect()); + assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par1]"); + } + // ------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------