1
0

[HUDI-4044] When reading data from flink-hudi to external storage, the … (#5516)

Co-authored-by: aliceyyan <aliceyyan@tencent.com>
This commit is contained in:
aliceyyan
2022-05-10 10:25:13 +08:00
committed by GitHub
parent 6285a239a3
commit 6fd21d0f10
3 changed files with 15 additions and 3 deletions

View File

@@ -226,7 +226,7 @@ public class IncrementalInputSplits implements Serializable {
String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
basePath, logPaths, endInstant,
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange);
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, instantRange, fileSlice.getFileId());
}).collect(Collectors.toList()))
.flatMap(Collection::stream)
.collect(Collectors.toList());

View File

@@ -181,6 +181,7 @@ public class HoodieTableSource implements
OneInputStreamOperatorFactory<MergeOnReadInputSplit, RowData> factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat);
SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
.setParallelism(1)
.keyBy(inputSplit -> inputSplit.getFileId())
.transform("split_reader", typeInfo, factory)
.setParallelism(conf.getInteger(FlinkOptions.READ_TASKS));
return new DataStreamSource<>(source);
@@ -316,7 +317,7 @@ public class HoodieTableSource implements
.map(logFile -> logFile.getPath().toString())
.collect(Collectors.toList()));
return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit,
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null);
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null, fileSlice.getFileId());
}).collect(Collectors.toList()))
.flatMap(Collection::stream)
.collect(Collectors.toList());

View File

@@ -43,6 +43,7 @@ public class MergeOnReadInputSplit implements InputSplit {
private final long maxCompactionMemoryInBytes;
private final String mergeType;
private final Option<InstantRange> instantRange;
private String fileId;
// for streaming reader to record the consumed offset,
// which is the start of next round reading.
@@ -56,7 +57,8 @@ public class MergeOnReadInputSplit implements InputSplit {
String tablePath,
long maxCompactionMemoryInBytes,
String mergeType,
@Nullable InstantRange instantRange) {
@Nullable InstantRange instantRange,
String fileId) {
this.splitNum = splitNum;
this.basePath = Option.ofNullable(basePath);
this.logPaths = logPaths;
@@ -65,6 +67,15 @@ public class MergeOnReadInputSplit implements InputSplit {
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
this.mergeType = mergeType;
this.instantRange = Option.ofNullable(instantRange);
this.fileId = fileId;
}
public String getFileId() {
return fileId;
}
public void setFileId(String fileId) {
this.fileId = fileId;
}
public Option<String> getBasePath() {