1
0

[HUDI-1801] FlinkMergeHandle rolling over may miss to rename the latest file handle (#2831)

The FlinkMergeHandle may rename the N-1 th file handle instead of the
latest one, thus to cause data duplication.
This commit is contained in:
Danny Chan
2021-04-16 11:40:53 +08:00
committed by GitHub
parent 191470d1fc
commit b6d949b48a
4 changed files with 49 additions and 30 deletions

View File

@@ -21,7 +21,6 @@ package org.apache.hudi.io;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.HoodieTimer;
@@ -37,7 +36,6 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* A {@link HoodieMergeHandle} that supports merge write incrementally(small data buffers). * A {@link HoodieMergeHandle} that supports merge write incrementally(small data buffers).
@@ -63,14 +61,16 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
* Records the current file handles number that rolls over. * Records the current file handles number that rolls over.
*/ */
private int rollNumber = 0; private int rollNumber = 0;
/**
* Whether the handle should roll over to new, E.G. the handle has written some intermediate files already.
*/
private volatile boolean shouldRollover = false;
/** /**
* Records the rolled over file paths. * Records the rolled over file paths.
*/ */
private List<Path> rolloverPaths; private final List<Path> rolloverPaths;
/**
* Whether it is the first time to generate file handle, E.G. the handle has not rolled over yet.
*/
private boolean needBootStrap = true;
public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable, public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr, String partitionPath, String fileId,
@@ -79,32 +79,22 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
rolloverPaths = new ArrayList<>(); rolloverPaths = new ArrayList<>();
} }
/**
* Called by compactor code path.
*/
public FlinkMergeHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T, I, K, O> hoodieTable,
Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, String fileId,
HoodieBaseFile dataFileToBeMerged, TaskContextSupplier taskContextSupplier) {
super(config, instantTime, hoodieTable, keyToNewRecords, partitionPath, fileId,
dataFileToBeMerged, taskContextSupplier);
}
/** /**
* Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write. * Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write.
*/ */
protected String generatesDataFileName() { protected String generatesDataFileNameWithRollover() {
final String fileID = this.needBootStrap ? fileId : fileId + "-" + rollNumber; final String fileID = this.fileId + "-" + rollNumber;
return FSUtils.makeDataFileName(instantTime, writeToken, fileID, hoodieTable.getBaseFileExtension()); return FSUtils.makeDataFileName(instantTime, writeToken, fileID, hoodieTable.getBaseFileExtension());
} }
public boolean isNeedBootStrap() { public boolean shouldRollover() {
return needBootStrap; return shouldRollover;
} }
@Override @Override
public List<WriteStatus> close() { public List<WriteStatus> close() {
List<WriteStatus> writeStatus = super.close(); List<WriteStatus> writeStatus = super.close();
this.needBootStrap = false; this.shouldRollover = true;
return writeStatus; return writeStatus;
} }
@@ -138,12 +128,12 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
this.writeStatus.setTotalErrorRecords(0); this.writeStatus.setTotalErrorRecords(0);
this.timer = new HoodieTimer().startTimer(); this.timer = new HoodieTimer().startTimer();
rollNumber++; rollNumber += 1;
rolloverPaths.add(newFilePath); rolloverPaths.add(newFilePath);
oldFilePath = newFilePath; oldFilePath = newFilePath;
// Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write. // Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write.
String newFileName = generatesDataFileName(); String newFileName = generatesDataFileNameWithRollover();
String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/")
+ newFileName).toString(); + newFileName).toString();
newFilePath = new Path(config.getBasePath(), relativePath); newFilePath = new Path(config.getBasePath(), relativePath);
@@ -160,6 +150,7 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
public void finishWrite() { public void finishWrite() {
// The file visibility should be kept by the configured ConsistencyGuard instance. // The file visibility should be kept by the configured ConsistencyGuard instance.
rolloverPaths.add(newFilePath);
if (rolloverPaths.size() == 1) { if (rolloverPaths.size() == 1) {
// only one flush action, no need to roll over // only one flush action, no need to roll over
return; return;
@@ -176,7 +167,7 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
Path lastPath = rolloverPaths.size() > 0 Path lastPath = rolloverPaths.size() > 0
? rolloverPaths.get(rolloverPaths.size() - 1) ? rolloverPaths.get(rolloverPaths.size() - 1)
: newFilePath; : newFilePath;
String newFileName = FSUtils.makeDataFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()); String newFileName = generatesDataFileName();
String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/")
+ newFileName).toString(); + newFileName).toString();
final Path desiredPath = new Path(config.getBasePath(), relativePath); final Path desiredPath = new Path(config.getBasePath(), relativePath);

View File

@@ -240,7 +240,7 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
} else { } else {
FlinkMergeHandle writeHandle = (FlinkMergeHandle) this.writeHandle; FlinkMergeHandle writeHandle = (FlinkMergeHandle) this.writeHandle;
// add the incremental records. // add the incremental records.
if (!writeHandle.isNeedBootStrap()) { if (writeHandle.shouldRollover()) {
writeHandle.rollOver(recordItr); writeHandle.rollOver(recordItr);
} }
return writeHandle; return writeHandle;

View File

@@ -71,7 +71,7 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends AbstractMer
final GenericDatumWriter<GenericRecord> gWriter; final GenericDatumWriter<GenericRecord> gWriter;
final GenericDatumReader<GenericRecord> gReader; final GenericDatumReader<GenericRecord> gReader;
Schema readSchema; Schema readSchema;
if (isNeedBootStrap(mergeHandle) if (isFirstTimeMerge(mergeHandle)
&& (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent())) { && (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent())) {
readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema(); readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema();
gWriter = new GenericDatumWriter<>(readSchema); gWriter = new GenericDatumWriter<>(readSchema);
@@ -86,7 +86,7 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends AbstractMer
HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath()); HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
try { try {
final Iterator<GenericRecord> readerIterator; final Iterator<GenericRecord> readerIterator;
if (isNeedBootStrap(mergeHandle) && baseFile.getBootstrapBaseFile().isPresent()) { if (isFirstTimeMerge(mergeHandle) && baseFile.getBootstrapBaseFile().isPresent()) {
readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation); readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
} else { } else {
readerIterator = reader.getRecordIterator(readSchema); readerIterator = reader.getRecordIterator(readSchema);
@@ -115,8 +115,8 @@ public class FlinkMergeHelper<T extends HoodieRecordPayload> extends AbstractMer
} }
} }
private static boolean isNeedBootStrap(HoodieMergeHandle<?, ?, ?, ?> mergeHandle) { private static boolean isFirstTimeMerge(HoodieMergeHandle<?, ?, ?, ?> mergeHandle) {
return mergeHandle instanceof FlinkMergeHandle && ((FlinkMergeHandle) mergeHandle).isNeedBootStrap(); return mergeHandle instanceof FlinkMergeHandle && !((FlinkMergeHandle) mergeHandle).shouldRollover();
} }
} }

View File

@@ -324,6 +324,34 @@ public class HoodieDataSourceITCase extends AbstractTestBase {
assertRowsEquals(result2, expected); assertRowsEquals(result2, expected);
} }
@ParameterizedTest
@EnumSource(value = ExecMode.class)
void testUpsertWithMiniBatches(ExecMode execMode) {
TableEnvironment tableEnv = execMode == ExecMode.BATCH ? batchTableEnv : streamTableEnv;
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), tempFile.getAbsolutePath());
options.put(FlinkOptions.WRITE_BATCH_SIZE.key(), "0.001");
String hoodieTableDDL = TestConfigurations.getCreateHoodieTableDDL("t1", options);
tableEnv.executeSql(hoodieTableDDL);
final String insertInto1 = "insert into t1 values\n"
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1')";
execInsertSql(tableEnv, insertInto1);
final String insertInto2 = "insert into t1 values\n"
+ "('id1','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),\n"
+ "('id1','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par1'),\n"
+ "('id1','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par1'),\n"
+ "('id1','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par1')";
execInsertSql(tableEnv, insertInto2);
List<Row> result = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
assertRowsEquals(result, "[id1,Sophia,18,1970-01-01T00:00:05,par1]");
}
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------
// Utilities // Utilities
// ------------------------------------------------------------------------- // -------------------------------------------------------------------------