From 72fa19bcc9586e693f4afc99232c2467e93d2148 Mon Sep 17 00:00:00 2001 From: cxzl25 Date: Mon, 27 Jun 2022 11:09:30 +0800 Subject: [PATCH] [HUDI-4316] Support for spillable diskmap configuration when constructing HoodieMergedLogRecordScanner (#5959) --- .../clustering/run/strategy/JavaExecutionStrategy.java | 2 ++ .../run/strategy/MultipleSparkJobExecutionStrategy.java | 2 ++ .../apache/hudi/sink/clustering/ClusteringOperator.java | 2 ++ .../java/org/apache/hudi/table/format/FormatUtils.java | 1 + .../scala/org/apache/hudi/HoodieMergeOnReadRDD.scala | 9 +++++++-- 5 files changed, 14 insertions(+), 2 deletions(-) diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java index 233c70ecf..adcbb874e 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java @@ -191,6 +191,8 @@ public abstract class JavaExecutionStrategy> .withBufferSize(config.getMaxDFSStreamBufferSize()) .withSpillableMapBasePath(config.getSpillableMapBasePath()) .withPartition(clusteringOp.getPartitionPath()) + .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType()) + .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) .build(); Option baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index e09457f0e..df0ad6e2b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -221,6 +221,8 @@ public abstract class MultipleSparkJobExecutionStrategy baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java index 676532402..ca48dcf6c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java @@ -269,6 +269,8 @@ public class ClusteringOperator extends TableStreamOperator