1
0

[HUDI-4151] flink split_reader supports rocksdb (#5675)

* [HUDI-4151] flink split_reader supports rocksdb
This commit is contained in:
Bo Cui
2022-05-28 08:37:34 +08:00
committed by GitHub
parent 554caa3421
commit 93fe5a497e
2 changed files with 20 additions and 23 deletions

View File

@@ -30,6 +30,7 @@ import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor;
import org.apache.hudi.common.util.queue.BoundedInMemoryQueueProducer;
import org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.util.StreamerUtil;
@@ -120,38 +121,34 @@ public class FormatUtils {
public static HoodieMergedLogRecordScanner logScanner(
MergeOnReadInputSplit split,
Schema logSchema,
Configuration config,
boolean withOperationField) {
FileSystem fs = FSUtils.getFs(split.getTablePath(), config);
org.apache.flink.configuration.Configuration flinkConf,
Configuration hadoopConf) {
HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(flinkConf);
FileSystem fs = FSUtils.getFs(split.getTablePath(), hadoopConf);
return HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(split.getTablePath())
.withLogFilePaths(split.getLogPaths().get())
.withReaderSchema(logSchema)
.withLatestInstantTime(split.getLatestCommit())
.withReadBlocksLazily(
string2Boolean(
config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
.withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled())
.withReverseReader(false)
.withBufferSize(
config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
.withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
.withMaxMemorySizeInBytes(split.getMaxCompactionMemoryInBytes())
.withSpillableMapBasePath(
config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
.withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType())
.withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
.withInstantRange(split.getInstantRange())
.withOperationField(withOperationField)
.withOperationField(flinkConf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
.build();
}
private static HoodieUnMergedLogRecordScanner unMergedLogScanner(
MergeOnReadInputSplit split,
Schema logSchema,
Configuration config,
org.apache.flink.configuration.Configuration flinkConf,
Configuration hadoopConf,
HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback) {
FileSystem fs = FSUtils.getFs(split.getTablePath(), config);
FileSystem fs = FSUtils.getFs(split.getTablePath(), hadoopConf);
return HoodieUnMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(split.getTablePath())
@@ -160,11 +157,11 @@ public class FormatUtils {
.withLatestInstantTime(split.getLatestCommit())
.withReadBlocksLazily(
string2Boolean(
config.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
flinkConf.getString(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
.withReverseReader(false)
.withBufferSize(
config.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
flinkConf.getInteger(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP,
HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
.withInstantRange(split.getInstantRange())
.withLogRecordScannerCallback(callback)
@@ -198,7 +195,7 @@ public class FormatUtils {
Functions.noop());
// Consumer of this record reader
this.iterator = this.executor.getQueue().iterator();
this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, hadoopConf,
this.scanner = FormatUtils.unMergedLogScanner(split, logSchema, flinkConf, hadoopConf,
record -> executor.getQueue().insertRecord(record));
// Start reading and buffering
this.executor.startProducers();

View File

@@ -192,6 +192,7 @@ public class MergeOnReadInputFormat
getLogFileIterator(split));
} else if (split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) {
this.iterator = new MergeIterator(
conf,
hadoopConf,
split,
this.tableState.getRowType(),
@@ -200,7 +201,6 @@ public class MergeOnReadInputFormat
new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
this.requiredPos,
this.emitDelete,
this.conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED),
this.tableState.getOperationPos(),
getFullSchemaReader(split.getBasePath().get()));
} else {
@@ -323,7 +323,7 @@ public class MergeOnReadInputFormat
final GenericRecordBuilder recordBuilder = new GenericRecordBuilder(requiredSchema);
final AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, hadoopConf, conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED));
final HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(split, tableSchema, conf, hadoopConf);
final Iterator<String> logRecordsKeyIterator = scanner.getRecords().keySet().iterator();
final int[] pkOffset = tableState.getPkOffsetsInRequired();
// flag saying whether the pk semantics has been dropped by user specified
@@ -639,6 +639,7 @@ public class MergeOnReadInputFormat
private RowData currentRecord;
MergeIterator(
Configuration finkConf,
org.apache.hadoop.conf.Configuration hadoopConf,
MergeOnReadInputSplit split,
RowType tableRowType,
@@ -647,12 +648,11 @@ public class MergeOnReadInputFormat
Schema requiredSchema,
int[] requiredPos,
boolean emitDelete,
boolean withOperationField,
int operationPos,
ParquetColumnarRowSplitReader reader) { // the reader should be with full schema
this.tableSchema = tableSchema;
this.reader = reader;
this.scanner = FormatUtils.logScanner(split, tableSchema, hadoopConf, withOperationField);
this.scanner = FormatUtils.logScanner(split, tableSchema, finkConf, hadoopConf);
this.logKeysIterator = scanner.getRecords().keySet().iterator();
this.requiredSchema = requiredSchema;
this.requiredPos = requiredPos;