1
0

[HUDI-2087] Support Append only in Flink stream (#3252)

Co-authored-by: 喻兆靖 <yuzhaojing@bilibili.com>
This commit is contained in:
yuzhaojing
2021-07-10 14:49:35 +08:00
committed by GitHub
parent 7c6eebf98c
commit 783c9cb369
14 changed files with 243 additions and 29 deletions

View File

@@ -56,6 +56,7 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.HoodieTimelineArchiveLog;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.action.compact.FlinkCompactHelpers;
import org.apache.hudi.table.upgrade.FlinkUpgradeDowngrade;
import org.apache.hudi.util.FlinkClientUtil;
@@ -408,6 +409,12 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
final HoodieRecordLocation loc = record.getCurrentLocation();
final String fileID = loc.getFileId();
final String partitionPath = record.getPartitionPath();
// append only mode always use FlinkCreateHandle
if (loc.getInstantTime().equals(BucketType.APPEND_ONLY.name())) {
return new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
fileID, table.getTaskContextSupplier());
}
if (bucketToHandles.containsKey(fileID)) {
MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID);
if (lastHandle.shouldReplace()) {
@@ -424,7 +431,8 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
if (isDelta) {
writeHandle = new FlinkAppendHandle<>(config, instantTime, table, partitionPath, fileID, recordItr,
table.getTaskContextSupplier());
} else if (loc.getInstantTime().equals("I")) {
} else if (loc.getInstantTime().equals(BucketType.INSERT.name()) || loc.getInstantTime().equals(BucketType.APPEND_ONLY.name())) {
// use the same handle for insert bucket
writeHandle = new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
fileID, table.getTaskContextSupplier());
} else {

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.MarkerFiles;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -80,7 +81,7 @@ public class FlinkAppendHandle<T extends HoodieRecordPayload, I, K, O>
@Override
protected boolean isUpdateRecord(HoodieRecord<T> hoodieRecord) {
return hoodieRecord.getCurrentLocation() != null
&& hoodieRecord.getCurrentLocation().getInstantTime().equals("U");
&& hoodieRecord.getCurrentLocation().getInstantTime().equals(BucketType.UPDATE.name());
}
@Override

View File

@@ -30,6 +30,7 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.table.MarkerFiles;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -70,6 +71,17 @@ public class FlinkCreateHandle<T extends HoodieRecordPayload, I, K, O>
}
}
@Override
protected void createMarkerFile(String partitionPath, String dataFileName) {
// In some rare cases, the task was pulled up again with same write file name,
// for e.g, reuse the small log files from last commit instant.
// Just skip the marker file creation if it already exists, the new data would append to
// the file directly.
MarkerFiles markerFiles = new MarkerFiles(hoodieTable, instantTime);
markerFiles.createIfNotExists(partitionPath, dataFileName, getIOType());
}
/**
* The flink checkpoints start in sequence and asynchronously, when one write task finish the checkpoint(A)
* (thus the fs view got the written data files some of which may be invalid),

View File

@@ -102,9 +102,7 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
final HoodieRecord<?> record = inputRecords.get(0);
final String partitionPath = record.getPartitionPath();
final String fileId = record.getCurrentLocation().getFileId();
final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I")
? BucketType.INSERT
: BucketType.UPDATE;
final BucketType bucketType = BucketType.valueOf(record.getCurrentLocation().getInstantTime());
handleUpsertPartition(
instantTime,
partitionPath,
@@ -185,6 +183,7 @@ public abstract class BaseFlinkCommitActionExecutor<T extends HoodieRecordPayloa
} else {
switch (bucketType) {
case INSERT:
case APPEND_ONLY:
return handleInsert(fileIdHint, recordItr);
case UPDATE:
return handleUpdate(partitionPath, fileIdHint, recordItr);