[HUDI-4316] Support for spillable diskmap configuration when constructing HoodieMergedLogRecordScanner (#5959)
This commit is contained in:
@@ -191,6 +191,8 @@ public abstract class JavaExecutionStrategy<T extends HoodieRecordPayload<T>>
|
|||||||
.withBufferSize(config.getMaxDFSStreamBufferSize())
|
.withBufferSize(config.getMaxDFSStreamBufferSize())
|
||||||
.withSpillableMapBasePath(config.getSpillableMapBasePath())
|
.withSpillableMapBasePath(config.getSpillableMapBasePath())
|
||||||
.withPartition(clusteringOp.getPartitionPath())
|
.withPartition(clusteringOp.getPartitionPath())
|
||||||
|
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
|
||||||
|
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
|
Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
|
||||||
|
|||||||
@@ -221,6 +221,8 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa
|
|||||||
.withBufferSize(config.getMaxDFSStreamBufferSize())
|
.withBufferSize(config.getMaxDFSStreamBufferSize())
|
||||||
.withSpillableMapBasePath(config.getSpillableMapBasePath())
|
.withSpillableMapBasePath(config.getSpillableMapBasePath())
|
||||||
.withPartition(clusteringOp.getPartitionPath())
|
.withPartition(clusteringOp.getPartitionPath())
|
||||||
|
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
|
||||||
|
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
|
Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
|
||||||
|
|||||||
@@ -269,6 +269,8 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven
|
|||||||
.withReverseReader(writeConfig.getCompactionReverseLogReadEnabled())
|
.withReverseReader(writeConfig.getCompactionReverseLogReadEnabled())
|
||||||
.withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
|
.withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
|
||||||
.withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
|
.withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
|
||||||
|
.withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType())
|
||||||
|
.withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
|
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
|
||||||
|
|||||||
@@ -136,6 +136,7 @@ public class FormatUtils {
|
|||||||
.withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
|
.withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
|
||||||
.withMaxMemorySizeInBytes(split.getMaxCompactionMemoryInBytes())
|
.withMaxMemorySizeInBytes(split.getMaxCompactionMemoryInBytes())
|
||||||
.withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType())
|
.withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType())
|
||||||
|
.withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
|
||||||
.withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
|
.withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
|
||||||
.withInstantRange(split.getInstantRange())
|
.withInstantRange(split.getInstantRange())
|
||||||
.withOperationField(flinkConf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
|
.withOperationField(flinkConf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
|
|||||||
import org.apache.hadoop.mapred.JobConf
|
import org.apache.hadoop.mapred.JobConf
|
||||||
import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
|
import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
|
||||||
import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability}
|
import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability}
|
||||||
import org.apache.hudi.common.config.HoodieMetadataConfig
|
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
|
||||||
import org.apache.hudi.common.engine.HoodieLocalEngineContext
|
import org.apache.hudi.common.engine.HoodieLocalEngineContext
|
||||||
import org.apache.hudi.common.fs.FSUtils
|
import org.apache.hudi.common.fs.FSUtils
|
||||||
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
|
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
|
||||||
@@ -361,7 +361,12 @@ private object HoodieMergeOnReadRDD {
|
|||||||
.withSpillableMapBasePath(
|
.withSpillableMapBasePath(
|
||||||
hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
|
hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
|
||||||
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
|
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
|
||||||
|
.withDiskMapType(
|
||||||
|
hadoopConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key,
|
||||||
|
HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue))
|
||||||
|
.withBitCaskDiskMapCompressionEnabled(
|
||||||
|
hadoopConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
|
||||||
|
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
|
||||||
if (logFiles.nonEmpty) {
|
if (logFiles.nonEmpty) {
|
||||||
logRecordScannerBuilder.withPartition(
|
logRecordScannerBuilder.withPartition(
|
||||||
getRelativePartitionPath(new Path(tableState.tablePath), logFiles.head.getPath.getParent))
|
getRelativePartitionPath(new Path(tableState.tablePath), logFiles.head.getPath.getParent))
|
||||||
|
|||||||
Reference in New Issue
Block a user