[HUDI-2112] Support reading pure logs file group for flink batch reader after compaction (#3202)
This commit is contained in:
@@ -19,7 +19,7 @@
|
||||
package org.apache.hudi.table;
|
||||
|
||||
import org.apache.hudi.common.fs.FSUtils;
|
||||
import org.apache.hudi.common.model.HoodieBaseFile;
|
||||
import org.apache.hudi.common.model.BaseFile;
|
||||
import org.apache.hudi.common.model.HoodieLogFile;
|
||||
import org.apache.hudi.common.model.HoodieTableType;
|
||||
import org.apache.hudi.common.table.HoodieTableMetaClient;
|
||||
@@ -29,7 +29,6 @@ import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.configuration.FlinkOptions;
|
||||
import org.apache.hudi.exception.HoodieException;
|
||||
import org.apache.hudi.hadoop.HoodieROTablePathFilter;
|
||||
import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
|
||||
import org.apache.hudi.source.StreamReadMonitoringFunction;
|
||||
import org.apache.hudi.source.StreamReadOperator;
|
||||
import org.apache.hudi.table.format.FilePathUtils;
|
||||
@@ -272,39 +271,24 @@ public class HoodieTableSource implements
|
||||
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient,
|
||||
metaClient.getActiveTimeline().getCommitsTimeline()
|
||||
.filterCompletedInstants(), fileStatuses);
|
||||
List<HoodieBaseFile> latestFiles = fsView.getLatestBaseFiles().collect(Collectors.toList());
|
||||
String latestCommit = fsView.getLastInstant().get().getTimestamp();
|
||||
final String mergeType = this.conf.getString(FlinkOptions.MERGE_TYPE);
|
||||
final AtomicInteger cnt = new AtomicInteger(0);
|
||||
if (latestFiles.size() > 0) {
|
||||
Map<HoodieBaseFile, List<String>> fileGroup =
|
||||
HoodieRealtimeInputFormatUtils.groupLogsByBaseFile(hadoopConf, latestFiles);
|
||||
return fileGroup.entrySet().stream().map(kv -> {
|
||||
HoodieBaseFile baseFile = kv.getKey();
|
||||
Option<List<String>> logPaths = kv.getValue().size() == 0
|
||||
? Option.empty()
|
||||
: Option.of(kv.getValue());
|
||||
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
|
||||
baseFile.getPath(), logPaths, latestCommit,
|
||||
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null);
|
||||
}).collect(Collectors.toList());
|
||||
} else {
|
||||
// all the files are logs
|
||||
return Arrays.stream(paths).map(partitionPath -> {
|
||||
String relPartitionPath = FSUtils.getRelativePartitionPath(path, partitionPath);
|
||||
return fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, latestCommit)
|
||||
.map(fileSlice -> {
|
||||
Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
|
||||
.sorted(HoodieLogFile.getLogFileComparator())
|
||||
.map(logFile -> logFile.getPath().toString())
|
||||
.collect(Collectors.toList()));
|
||||
return new MergeOnReadInputSplit(cnt.getAndAdd(1),
|
||||
null, logPaths, latestCommit,
|
||||
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null);
|
||||
}).collect(Collectors.toList()); })
|
||||
.flatMap(Collection::stream)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
// generates one input split for each file group
|
||||
return Arrays.stream(paths).map(partitionPath -> {
|
||||
String relPartitionPath = FSUtils.getRelativePartitionPath(path, partitionPath);
|
||||
return fsView.getLatestMergedFileSlicesBeforeOrOn(relPartitionPath, latestCommit)
|
||||
.map(fileSlice -> {
|
||||
String basePath = fileSlice.getBaseFile().map(BaseFile::getPath).orElse(null);
|
||||
Option<List<String>> logPaths = Option.ofNullable(fileSlice.getLogFiles()
|
||||
.sorted(HoodieLogFile.getLogFileComparator())
|
||||
.map(logFile -> logFile.getPath().toString())
|
||||
.collect(Collectors.toList()));
|
||||
return new MergeOnReadInputSplit(cnt.getAndAdd(1), basePath, logPaths, latestCommit,
|
||||
metaClient.getBasePath(), maxCompactionMemoryInBytes, mergeType, null);
|
||||
}).collect(Collectors.toList()); })
|
||||
.flatMap(Collection::stream)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
public InputFormat<RowData, ?> getInputFormat() {
|
||||
@@ -431,11 +415,12 @@ public class HoodieTableSource implements
|
||||
}
|
||||
|
||||
/**
|
||||
* Reload the active timeline view.
|
||||
* Reset the state of the table source.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public void reloadActiveTimeline() {
|
||||
public void reset() {
|
||||
this.metaClient.reloadActiveTimeline();
|
||||
this.requiredPartitions = null;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -92,7 +92,7 @@ public class TestInputFormat {
|
||||
TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
|
||||
|
||||
// refresh the input format
|
||||
this.tableSource.reloadActiveTimeline();
|
||||
this.tableSource.reset();
|
||||
inputFormat = this.tableSource.getInputFormat();
|
||||
|
||||
result = readData(inputFormat);
|
||||
@@ -133,8 +133,12 @@ public class TestInputFormat {
|
||||
conf.setBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED, false);
|
||||
TestData.writeData(TestData.DATA_SET_UPDATE_INSERT, conf);
|
||||
|
||||
// write another commit using logs with separate partition
|
||||
// so the file group has only logs
|
||||
TestData.writeData(TestData.DATA_SET_INSERT_SEPARATE_PARTITION, conf);
|
||||
|
||||
// refresh the input format
|
||||
this.tableSource.reloadActiveTimeline();
|
||||
this.tableSource.reset();
|
||||
inputFormat = this.tableSource.getInputFormat();
|
||||
|
||||
result = readData(inputFormat);
|
||||
@@ -143,6 +147,10 @@ public class TestInputFormat {
|
||||
expected = "[id1,Danny,24,1970-01-01T00:00:00.001,par1, "
|
||||
+ "id10,Ella,38,1970-01-01T00:00:00.007,par4, "
|
||||
+ "id11,Phoebe,52,1970-01-01T00:00:00.008,par4, "
|
||||
+ "id12,Monica,27,1970-01-01T00:00:00.009,par5, "
|
||||
+ "id13,Phoebe,31,1970-01-01T00:00:00.010,par5, "
|
||||
+ "id14,Rachel,52,1970-01-01T00:00:00.011,par6, "
|
||||
+ "id15,Ross,29,1970-01-01T00:00:00.012,par6, "
|
||||
+ "id2,Stephen,34,1970-01-01T00:00:00.002,par1, "
|
||||
+ "id3,Julian,54,1970-01-01T00:00:00.003,par2, "
|
||||
+ "id4,Fabian,32,1970-01-01T00:00:00.004,par2, "
|
||||
|
||||
@@ -114,6 +114,17 @@ public class TestData {
|
||||
TimestampData.fromEpochMillis(8), StringData.fromString("par4"))
|
||||
);
|
||||
|
||||
public static List<RowData> DATA_SET_INSERT_SEPARATE_PARTITION = Arrays.asList(
|
||||
insertRow(StringData.fromString("id12"), StringData.fromString("Monica"), 27,
|
||||
TimestampData.fromEpochMillis(9), StringData.fromString("par5")),
|
||||
insertRow(StringData.fromString("id13"), StringData.fromString("Phoebe"), 31,
|
||||
TimestampData.fromEpochMillis(10), StringData.fromString("par5")),
|
||||
insertRow(StringData.fromString("id14"), StringData.fromString("Rachel"), 52,
|
||||
TimestampData.fromEpochMillis(11), StringData.fromString("par6")),
|
||||
insertRow(StringData.fromString("id15"), StringData.fromString("Ross"), 29,
|
||||
TimestampData.fromEpochMillis(12), StringData.fromString("par6"))
|
||||
);
|
||||
|
||||
public static List<RowData> DATA_SET_INSERT_DUPLICATES = new ArrayList<>();
|
||||
static {
|
||||
IntStream.range(0, 5).forEach(i -> DATA_SET_INSERT_DUPLICATES.add(
|
||||
|
||||
Reference in New Issue
Block a user