From 8fef50e237b2342ea3366be32950a2b87a9608c4 Mon Sep 17 00:00:00 2001 From: rmahindra123 <76502047+rmahindra123@users.noreply.github.com> Date: Tue, 27 Jul 2021 22:31:03 -0700 Subject: [PATCH] [HUDI-2044] Integrate consumers with rocksDB and compression within External Spillable Map (#3318) --- .../cli/commands/HoodieLogFileCommand.java | 3 + .../commands/TestHoodieLogFileCommand.java | 3 + .../EmbeddedTimelineServerHelper.java | 3 +- .../embedded/EmbeddedTimelineService.java | 7 +- .../apache/hudi/config/HoodieWriteConfig.java | 39 +---- .../org/apache/hudi/io/HoodieMergeHandle.java | 3 +- .../org/apache/hudi/table/HoodieTable.java | 4 +- .../HoodieFlinkMergeOnReadTableCompactor.java | 2 + ...ExecuteClusteringCommitActionExecutor.java | 2 + .../HoodieSparkMergeOnReadTableCompactor.java | 2 + .../apache/hudi/io/TestHoodieMergeHandle.java | 49 ++++++- .../common/config/HoodieCommonConfig.java | 93 ++++++++++++ .../log/HoodieMergedLogRecordScanner.java | 21 ++- .../table/view/FileSystemViewManager.java | 18 ++- .../view/SpillableMapBasedFileSystemView.java | 27 ++-- .../metadata/HoodieBackedTableMetadata.java | 4 + .../HoodieMetadataMergedLogRecordScanner.java | 19 ++- .../functional/TestHoodieLogFormat.java | 135 ++++++++++++++---- .../TestSpillableMapBasedFileSystemView.java | 4 +- ...pillableMapBasedIncrementalFSViewSync.java | 4 +- .../sink/bootstrap/BootstrapFunction.java | 2 + .../java/org/apache/hudi/utils/TestData.java | 3 + .../RealtimeCompactedRecordReader.java | 4 + .../TestHoodieRealtimeRecordReader.java | 48 ++++++- .../reader/DFSHoodieDatasetInputReader.java | 3 + .../timeline/service/TimelineService.java | 8 +- .../TestRemoteHoodieTableFileSystemView.java | 4 +- 27 files changed, 405 insertions(+), 109 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java index 13963c574..b2319d698 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java @@ -22,6 +22,7 @@ import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.HoodieTableHeaderFields; import org.apache.hudi.cli.TableHeader; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; @@ -211,6 +212,8 @@ public class HoodieLogFileCommand implements CommandMarker { .withMaxMemorySizeInBytes( HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES) .withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH_PROP.defaultValue()) + .withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()) + .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()) .build(); for (HoodieRecord hoodieRecord : scanner) { Option record = hoodieRecord.getData().getInsertValue(readerSchema); diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java index 316091209..e0640edc1 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestHoodieLogFileCommand.java @@ -25,6 +25,7 @@ import org.apache.hudi.cli.HoodieTableHeaderFields; import org.apache.hudi.cli.TableHeader; import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -213,6 +214,8 @@ public class TestHoodieLogFileCommand extends AbstractShellIntegrationTest { HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLED_PROP.defaultValue())) .withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP.defaultValue()) .withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH_PROP.defaultValue()) + .withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()) + .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()) .build(); Iterator> records = scanner.iterator(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java index 3112a9185..7dffe7b99 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java @@ -68,7 +68,8 @@ public class EmbeddedTimelineServerHelper { Option hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST); EmbeddedTimelineService timelineService = new EmbeddedTimelineService( context, hostAddr.orElse(null),config.getEmbeddedTimelineServerPort(), - config.getMetadataConfig(), config.getClientSpecifiedViewStorageConfig(), config.getBasePath(), + config.getMetadataConfig(), config.getCommonConfig(), + config.getClientSpecifiedViewStorageConfig(), config.getBasePath(), config.getEmbeddedTimelineServerThreads(), config.getEmbeddedTimelineServerCompressOutput(), config.getEmbeddedTimelineServerUseAsync()); timelineService.startServer(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java index a6d27cedf..0c2732e3e 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java @@ -18,6 +18,7 @@ package org.apache.hudi.client.embedded; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; @@ -46,6 +47,7 @@ public class EmbeddedTimelineService { private final SerializableConfiguration hadoopConf; private final FileSystemViewStorageConfig config; private final HoodieMetadataConfig metadataConfig; + private final HoodieCommonConfig commonConfig; private final String basePath; private final int numThreads; @@ -55,13 +57,14 @@ public class EmbeddedTimelineService { private transient TimelineService server; public EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, int embeddedTimelineServerPort, - HoodieMetadataConfig metadataConfig, FileSystemViewStorageConfig config, String basePath, + HoodieMetadataConfig metadataConfig, HoodieCommonConfig commonConfig, FileSystemViewStorageConfig config, String basePath, int numThreads, boolean compressOutput, boolean useAsync) { setHostAddr(embeddedTimelineServiceHostAddr); this.context = context; this.config = config; this.basePath = basePath; this.metadataConfig = metadataConfig; + this.commonConfig = commonConfig; this.hadoopConf = context.getHadoopConf(); this.viewManager = createViewManager(); this.preferredPort = embeddedTimelineServerPort; @@ -80,7 +83,7 @@ public class EmbeddedTimelineService { // Reset to default if set to Remote builder.withStorageType(FileSystemViewStorageType.MEMORY); } - return FileSystemViewManager.createViewManager(context, metadataConfig, builder.build(), basePath); + return FileSystemViewManager.createViewManager(context, metadataConfig, builder.build(), commonConfig, basePath); } public void startServer() throws IOException { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 836b01dff..212b0197a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -24,6 +24,7 @@ import org.apache.hudi.client.transaction.ConflictResolutionStrategy; import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigProperty; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.EngineType; @@ -38,7 +39,6 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.constant.KeyGeneratorType; @@ -59,7 +59,6 @@ import java.io.IOException; import java.io.InputStream; import java.util.Arrays; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Properties; @@ -312,18 +311,6 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("When enabled, we allow duplicate keys even if inserts are routed to merge with an existing file (for ensuring file sizing)." + " This is only relevant for insert operation, since upsert, delete operations will ensure unique key constraints are maintained."); - public static final ConfigProperty SPILLABLE_DISK_MAP_TYPE = ConfigProperty - .key("hoodie.spillable.diskmap.type") - .defaultValue(ExternalSpillableMap.DiskMapType.BITCASK) - .withDocumentation("When handling input data that cannot be held in memory, to merge with a file on storage, a spillable diskmap is employed. " - + "By default, we use a persistent hashmap based loosely on bitcask, that offers O(1) inserts, lookups. " - + "Change this to `ROCKS_DB` to prefer using rocksDB, for handling the spill."); - - public static final ConfigProperty DISK_MAP_BITCASK_COMPRESSION_ENABLED = ConfigProperty - .key("hoodie.diskmap.bitcask.compression.enabled") - .defaultValue(true) - .withDocumentation("Turn on compression for BITCASK disk map used by the External Spillable Map"); - public static final ConfigProperty CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP = ConfigProperty .key("hoodie.client.heartbeat.interval_in_ms") .defaultValue(60 * 1000) @@ -388,6 +375,7 @@ public class HoodieWriteConfig extends HoodieConfig { private FileSystemViewStorageConfig viewStorageConfig; private HoodiePayloadConfig hoodiePayloadConfig; private HoodieMetadataConfig metadataConfig; + private HoodieCommonConfig commonConfig; private EngineType engineType; /** @@ -409,6 +397,7 @@ public class HoodieWriteConfig extends HoodieConfig { this.viewStorageConfig = clientSpecifiedViewStorageConfig; this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build(); this.metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(props).build(); + this.commonConfig = HoodieCommonConfig.newBuilder().fromProperties(props).build(); } public static HoodieWriteConfig.Builder newBuilder() { @@ -597,14 +586,6 @@ public class HoodieWriteConfig extends HoodieConfig { return getBoolean(MERGE_ALLOW_DUPLICATE_ON_INSERTS); } - public ExternalSpillableMap.DiskMapType getSpillableDiskMapType() { - return ExternalSpillableMap.DiskMapType.valueOf(getString(SPILLABLE_DISK_MAP_TYPE).toUpperCase(Locale.ROOT)); - } - - public boolean isBitCaskDiskMapCompressionEnabled() { - return getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED); - } - public EngineType getEngineType() { return engineType; } @@ -1159,6 +1140,10 @@ public class HoodieWriteConfig extends HoodieConfig { return metadataConfig; } + public HoodieCommonConfig getCommonConfig() { + return commonConfig; + } + /** * Commit call back configs. */ @@ -1564,16 +1549,6 @@ public class HoodieWriteConfig extends HoodieConfig { return this; } - public Builder withSpillableDiskMapType(ExternalSpillableMap.DiskMapType diskMapType) { - writeConfig.setValue(SPILLABLE_DISK_MAP_TYPE, diskMapType.name()); - return this; - } - - public Builder withBitcaskDiskMapCompressionEnabled(boolean bitcaskDiskMapCompressionEnabled) { - writeConfig.setValue(DISK_MAP_BITCASK_COMPRESSION_ENABLED, String.valueOf(bitcaskDiskMapCompressionEnabled)); - return this; - } - public Builder withHeartbeatIntervalInMs(Integer heartbeatIntervalInMs) { writeConfig.setValue(CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP, String.valueOf(heartbeatIntervalInMs)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 84141f33d..778e82574 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -211,7 +211,8 @@ public class HoodieMergeHandle extends H LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge); this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(tableSchema), - config.getSpillableDiskMapType(), config.isBitCaskDiskMapCompressionEnabled()); + config.getCommonConfig().getSpillableDiskMapType(), + config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()); } catch (IOException io) { throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index a90a07f14..19cc010bf 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -113,7 +113,7 @@ public abstract class HoodieTable implem this.metadata = HoodieTableMetadata.create(context, metadataConfig, config.getBasePath(), FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue()); - this.viewManager = FileSystemViewManager.createViewManager(context, config.getMetadataConfig(), config.getViewStorageConfig(), () -> metadata); + this.viewManager = FileSystemViewManager.createViewManager(context, config.getMetadataConfig(), config.getViewStorageConfig(), config.getCommonConfig(), () -> metadata); this.metaClient = metaClient; this.index = getIndex(config, context); this.taskContextSupplier = context.getTaskContextSupplier(); @@ -123,7 +123,7 @@ public abstract class HoodieTable implem private synchronized FileSystemViewManager getViewManager() { if (null == viewManager) { - viewManager = FileSystemViewManager.createViewManager(getContext(), config.getMetadataConfig(), config.getViewStorageConfig(), () -> metadata); + viewManager = FileSystemViewManager.createViewManager(getContext(), config.getMetadataConfig(), config.getViewStorageConfig(), config.getCommonConfig(), () -> metadata); } return viewManager; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java index e89ba5645..f08c8b5d1 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/compact/HoodieFlinkMergeOnReadTableCompactor.java @@ -129,6 +129,8 @@ public class HoodieFlinkMergeOnReadTableCompactor .withReverseReader(config.getCompactionReverseLogReadEnabled()) .withBufferSize(config.getMaxDFSStreamBufferSize()) .withSpillableMapBasePath(config.getSpillableMapBasePath()) + .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType()) + .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) .build(); if (!scanner.iterator().hasNext()) { return new ArrayList<>(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java index 415c89f9a..4cee1d2aa 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java @@ -207,6 +207,8 @@ public class SparkExecuteClusteringCommitActionExecutor .withReverseReader(config.getCompactionReverseLogReadEnabled()) .withBufferSize(config.getMaxDFSStreamBufferSize()) .withSpillableMapBasePath(config.getSpillableMapBasePath()) + .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType()) + .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) .build(); if (!scanner.iterator().hasNext()) { return new ArrayList<>(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java index 95ed61aa4..17ebccb15 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieMergeHandle.java @@ -20,6 +20,7 @@ package org.apache.hudi.io; import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieWriteStat; @@ -27,6 +28,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieStorageConfig; @@ -41,16 +43,21 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; +import java.util.Properties; +import java.util.stream.Stream; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.params.provider.Arguments.arguments; @SuppressWarnings("unchecked") public class TestHoodieMergeHandle extends HoodieClientTestHarness { @@ -69,14 +76,23 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness { cleanupResources(); } - @Test - public void testUpsertsForMultipleRecordsInSameFile() throws Exception { + @ParameterizedTest + @MethodSource("testArguments") + public void testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled) throws Exception { // Create records in a single partition String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0]; dataGen = new HoodieTestDataGenerator(new String[] {partitionPath}); + // Build a common config with diff configs + Properties properties = new Properties(); + properties.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), diskMapType.name()); + properties.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), String.valueOf(isCompressionEnabled)); + // Build a write config with bulkinsertparallelism set - HoodieWriteConfig cfg = getConfigBuilder().build(); + HoodieWriteConfig cfg = getConfigBuilder() + .withProperties(properties) + .build(); try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { FileSystem fs = FSUtils.getFs(basePath, hadoopConf); @@ -215,10 +231,19 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness { } } - @Test - public void testHoodieMergeHandleWriteStatMetrics() throws Exception { + @ParameterizedTest + @MethodSource("testArguments") + public void testHoodieMergeHandleWriteStatMetrics(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled) throws Exception { // insert 100 records - HoodieWriteConfig config = getConfigBuilder().build(); + // Build a common config with diff configs + Properties properties = new Properties(); + properties.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), diskMapType.name()); + properties.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), String.valueOf(isCompressionEnabled)); + + HoodieWriteConfig config = getConfigBuilder() + .withProperties(properties) + .build(); try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) { String newCommitTime = "100"; writeClient.startCommitWithTime(newCommitTime); @@ -317,6 +342,16 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness { .withBulkInsertParallelism(2).withWriteStatusClass(TestWriteStatus.class); } + private static Stream testArguments() { + // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled + return Stream.of( + arguments(ExternalSpillableMap.DiskMapType.BITCASK, false), + arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false), + arguments(ExternalSpillableMap.DiskMapType.BITCASK, true), + arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true) + ); + } + /** * Overridden so that we can capture and inspect all success records. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java new file mode 100644 index 000000000..54a4d77ab --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieCommonConfig.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.config; + +import org.apache.hudi.common.util.collection.ExternalSpillableMap; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Locale; +import java.util.Properties; + +/** + * Common Configurations used across Hudi. + */ +public class HoodieCommonConfig extends HoodieConfig { + + public static final ConfigProperty SPILLABLE_DISK_MAP_TYPE = ConfigProperty + .key("hoodie.common.spillable.diskmap.type") + .defaultValue(ExternalSpillableMap.DiskMapType.BITCASK) + .withDocumentation("When handling input data that cannot be held in memory, to merge with a file on storage, a spillable diskmap is employed. " + + "By default, we use a persistent hashmap based loosely on bitcask, that offers O(1) inserts, lookups. " + + "Change this to `ROCKS_DB` to prefer using rocksDB, for handling the spill."); + + public static final ConfigProperty DISK_MAP_BITCASK_COMPRESSION_ENABLED = ConfigProperty + .key("hoodie.common.diskmap.compression.enabled") + .defaultValue(true) + .withDocumentation("Turn on compression for BITCASK disk map used by the External Spillable Map"); + + public ExternalSpillableMap.DiskMapType getSpillableDiskMapType() { + return ExternalSpillableMap.DiskMapType.valueOf(getString(SPILLABLE_DISK_MAP_TYPE).toUpperCase(Locale.ROOT)); + } + + public boolean isBitCaskDiskMapCompressionEnabled() { + return getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED); + } + + private HoodieCommonConfig() { + super(); + } + + public static HoodieCommonConfig.Builder newBuilder() { + return new HoodieCommonConfig.Builder(); + } + + public static class Builder { + + private final HoodieCommonConfig commonConfig = new HoodieCommonConfig(); + + public HoodieCommonConfig.Builder fromFile(File propertiesFile) throws IOException { + try (FileReader reader = new FileReader(propertiesFile)) { + commonConfig.getProps().load(reader); + return this; + } + } + + public HoodieCommonConfig.Builder fromProperties(Properties props) { + commonConfig.getProps().putAll(props); + return this; + } + + public Builder withSpillableDiskMapType(ExternalSpillableMap.DiskMapType diskMapType) { + commonConfig.setValue(SPILLABLE_DISK_MAP_TYPE, diskMapType.name()); + return this; + } + + public Builder withBitcaskDiskMapCompressionEnabled(boolean bitcaskDiskMapCompressionEnabled) { + commonConfig.setValue(DISK_MAP_BITCASK_COMPRESSION_ENABLED, String.valueOf(bitcaskDiskMapCompressionEnabled)); + return this; + } + + public HoodieCommonConfig build() { + commonConfig.setDefaults(HoodieCommonConfig.class.getName()); + return commonConfig; + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index 12e27bad0..4d51fbb85 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.table.log; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -73,12 +74,13 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily, boolean reverseReader, int bufferSize, String spillableMapBasePath, - Option instantRange, boolean autoScan) { + Option instantRange, boolean autoScan, + ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled) { super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange); try { // Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(), - new HoodieRecordSizeEstimator(readerSchema)); + new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled); this.maxMemorySizeInBytes = maxMemorySizeInBytes; } catch (IOException e) { throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e); @@ -169,6 +171,8 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner // specific configurations protected Long maxMemorySizeInBytes; protected String spillableMapBasePath; + protected ExternalSpillableMap.DiskMapType diskMapType = HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue(); + protected boolean isBitCaskDiskMapCompressionEnabled = HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue(); // incremental filtering private Option instantRange = Option.empty(); // auto scan default true @@ -229,6 +233,16 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner return this; } + public Builder withDiskMapType(ExternalSpillableMap.DiskMapType diskMapType) { + this.diskMapType = diskMapType; + return this; + } + + public Builder withBitCaskDiskMapCompressionEnabled(boolean isBitCaskDiskMapCompressionEnabled) { + this.isBitCaskDiskMapCompressionEnabled = isBitCaskDiskMapCompressionEnabled; + return this; + } + public Builder withAutoScan(boolean autoScan) { this.autoScan = autoScan; return this; @@ -238,7 +252,8 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner public HoodieMergedLogRecordScanner build() { return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader, - bufferSize, spillableMapBasePath, instantRange, autoScan); + bufferSize, spillableMapBasePath, instantRange, autoScan, + diskMapType, isBitCaskDiskMapCompressionEnabled); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java index f89c2a670..7eaed5f9e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.table.view; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -145,10 +146,10 @@ public class FileSystemViewManager { * @return */ private static SpillableMapBasedFileSystemView createSpillableMapBasedFileSystemView(SerializableConfiguration conf, - FileSystemViewStorageConfig viewConf, HoodieTableMetaClient metaClient) { + FileSystemViewStorageConfig viewConf, HoodieTableMetaClient metaClient, HoodieCommonConfig commonConfig) { LOG.info("Creating SpillableMap based view for basePath " + metaClient.getBasePath()); HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); - return new SpillableMapBasedFileSystemView(metaClient, timeline, viewConf); + return new SpillableMapBasedFileSystemView(metaClient, timeline, viewConf, commonConfig); } /** @@ -205,15 +206,17 @@ public class FileSystemViewManager { public static FileSystemViewManager createViewManager(final HoodieEngineContext context, final HoodieMetadataConfig metadataConfig, - final FileSystemViewStorageConfig config) { - return createViewManager(context, metadataConfig, config, (SerializableSupplier) null); + final FileSystemViewStorageConfig config, + final HoodieCommonConfig commonConfig) { + return createViewManager(context, metadataConfig, config, commonConfig, (SerializableSupplier) null); } public static FileSystemViewManager createViewManager(final HoodieEngineContext context, final HoodieMetadataConfig metadataConfig, final FileSystemViewStorageConfig config, + final HoodieCommonConfig commonConfig, final String basePath) { - return createViewManager(context, metadataConfig, config, + return createViewManager(context, metadataConfig, config, commonConfig, () -> HoodieTableMetadata.create(context, metadataConfig, basePath, config.getSpillableDir(), true)); } @@ -224,6 +227,7 @@ public class FileSystemViewManager { public static FileSystemViewManager createViewManager(final HoodieEngineContext context, final HoodieMetadataConfig metadataConfig, final FileSystemViewStorageConfig config, + final HoodieCommonConfig commonConfig, final SerializableSupplier metadataSupplier) { LOG.info("Creating View Manager with storage type :" + config.getStorageType()); final SerializableConfiguration conf = context.getHadoopConf(); @@ -235,7 +239,7 @@ public class FileSystemViewManager { case SPILLABLE_DISK: LOG.info("Creating Spillable Disk based Table View"); return new FileSystemViewManager(context, config, - (metaClient, viewConf) -> createSpillableMapBasedFileSystemView(conf, viewConf, metaClient)); + (metaClient, viewConf) -> createSpillableMapBasedFileSystemView(conf, viewConf, metaClient, commonConfig)); case MEMORY: LOG.info("Creating in-memory based Table View"); return new FileSystemViewManager(context, config, @@ -258,7 +262,7 @@ public class FileSystemViewManager { secondaryView = createRocksDBBasedFileSystemView(conf, viewConfig, metaClient); break; case SPILLABLE_DISK: - secondaryView = createSpillableMapBasedFileSystemView(conf, viewConfig, metaClient); + secondaryView = createSpillableMapBasedFileSystemView(conf, viewConfig, metaClient, commonConfig); break; default: throw new IllegalArgumentException("Secondary Storage type can only be in-memory or spillable. Was :" diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java index 1dafe33a8..e41444208 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/SpillableMapBasedFileSystemView.java @@ -19,6 +19,8 @@ package org.apache.hudi.common.table.view; import org.apache.hadoop.fs.FileStatus; + +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.model.BootstrapBaseFileMapping; import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.model.HoodieFileGroup; @@ -53,9 +55,11 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView { private final long maxMemoryForReplaceFileGroups; private final long maxMemoryForClusteringFileGroups; private final String baseStoreDir; + private final ExternalSpillableMap.DiskMapType diskMapType; + private final boolean isBitCaskDiskMapCompressionEnabled; public SpillableMapBasedFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, - FileSystemViewStorageConfig config) { + FileSystemViewStorageConfig config, HoodieCommonConfig commonConfig) { super(config.isIncrementalTimelineSyncEnabled()); this.maxMemoryForFileGroupMap = config.getMaxMemoryForFileGroupMap(); this.maxMemoryForPendingCompaction = config.getMaxMemoryForPendingCompaction(); @@ -63,12 +67,14 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView { this.maxMemoryForReplaceFileGroups = config.getMaxMemoryForReplacedFileGroups(); this.maxMemoryForClusteringFileGroups = config.getMaxMemoryForPendingClusteringFileGroups(); this.baseStoreDir = config.getSpillableDir(); + diskMapType = commonConfig.getSpillableDiskMapType(); + isBitCaskDiskMapCompressionEnabled = commonConfig.isBitCaskDiskMapCompressionEnabled(); init(metaClient, visibleActiveTimeline); } public SpillableMapBasedFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, - FileStatus[] fileStatuses, FileSystemViewStorageConfig config) { - this(metaClient, visibleActiveTimeline, config); + FileStatus[] fileStatuses, FileSystemViewStorageConfig config, HoodieCommonConfig commonConfig) { + this(metaClient, visibleActiveTimeline, config, commonConfig); addFilesToView(fileStatuses); } @@ -79,7 +85,8 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView { + ", BaseDir=" + baseStoreDir); new File(baseStoreDir).mkdirs(); return (Map>) (new ExternalSpillableMap<>(maxMemoryForFileGroupMap, baseStoreDir, - new DefaultSizeEstimator(), new DefaultSizeEstimator<>())); + new DefaultSizeEstimator(), new DefaultSizeEstimator<>(), + diskMapType, isBitCaskDiskMapCompressionEnabled)); } catch (IOException e) { throw new RuntimeException(e); } @@ -93,7 +100,8 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView { + ", BaseDir=" + baseStoreDir); new File(baseStoreDir).mkdirs(); Map> pendingMap = new ExternalSpillableMap<>( - maxMemoryForPendingCompaction, baseStoreDir, new DefaultSizeEstimator(), new DefaultSizeEstimator<>()); + maxMemoryForPendingCompaction, baseStoreDir, new DefaultSizeEstimator(), new DefaultSizeEstimator<>(), + diskMapType, isBitCaskDiskMapCompressionEnabled); pendingMap.putAll(fgIdToPendingCompaction); return pendingMap; } catch (IOException e) { @@ -109,7 +117,8 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView { + ", BaseDir=" + baseStoreDir); new File(baseStoreDir).mkdirs(); Map pendingMap = new ExternalSpillableMap<>( - maxMemoryForBootstrapBaseFile, baseStoreDir, new DefaultSizeEstimator(), new DefaultSizeEstimator<>()); + maxMemoryForBootstrapBaseFile, baseStoreDir, new DefaultSizeEstimator(), new DefaultSizeEstimator<>(), + diskMapType, isBitCaskDiskMapCompressionEnabled); pendingMap.putAll(fileGroupIdBootstrapBaseFileMap); return pendingMap; } catch (IOException e) { @@ -124,7 +133,8 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView { + ", BaseDir=" + baseStoreDir); new File(baseStoreDir).mkdirs(); Map pendingMap = new ExternalSpillableMap<>( - maxMemoryForReplaceFileGroups, baseStoreDir, new DefaultSizeEstimator(), new DefaultSizeEstimator<>()); + maxMemoryForReplaceFileGroups, baseStoreDir, new DefaultSizeEstimator(), new DefaultSizeEstimator<>(), + diskMapType, isBitCaskDiskMapCompressionEnabled); pendingMap.putAll(replacedFileGroups); return pendingMap; } catch (IOException e) { @@ -139,7 +149,8 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView { + ", BaseDir=" + baseStoreDir); new File(baseStoreDir).mkdirs(); Map pendingMap = new ExternalSpillableMap<>( - maxMemoryForClusteringFileGroups, baseStoreDir, new DefaultSizeEstimator(), new DefaultSizeEstimator<>()); + maxMemoryForClusteringFileGroups, baseStoreDir, new DefaultSizeEstimator(), new DefaultSizeEstimator<>(), + diskMapType, isBitCaskDiskMapCompressionEnabled); pendingMap.putAll(fileGroupsInClustering); return pendingMap; } catch (IOException e) { diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index f374d61b5..1667f188e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -20,6 +20,7 @@ package org.apache.hudi.metadata; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieMetadataRecord; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -213,6 +214,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { // Load the schema Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema()); + HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build(); logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder() .withFileSystem(metaClient.getFs()) .withBasePath(metadataBasePath) @@ -222,6 +224,8 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { .withMaxMemorySizeInBytes(MAX_MEMORY_SIZE_IN_BYTES) .withBufferSize(BUFFER_SIZE) .withSpillableMapBasePath(spillableMapDirectory) + .withDiskMapType(commonConfig.getSpillableDiskMapType()) + .withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled()) .build(); logScannerOpenMs = timer.endTimer(); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java index 23549da3c..51b031564 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java @@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; /** * A {@code HoodieMergedLogRecordScanner} implementation which only merged records matching providing keys. This is @@ -41,9 +42,10 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS private HoodieMetadataMergedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes, int bufferSize, - String spillableMapBasePath, Set mergeKeyFilter) { + String spillableMapBasePath, Set mergeKeyFilter, + ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled) { super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, false, false, bufferSize, - spillableMapBasePath, Option.empty(), false); + spillableMapBasePath, Option.empty(), false, diskMapType, isBitCaskDiskMapCompressionEnabled); this.mergeKeyFilter = mergeKeyFilter; performScan(); @@ -134,6 +136,16 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS return this; } + public Builder withDiskMapType(ExternalSpillableMap.DiskMapType diskMapType) { + this.diskMapType = diskMapType; + return this; + } + + public Builder withBitCaskDiskMapCompressionEnabled(boolean isBitCaskDiskMapCompressionEnabled) { + this.isBitCaskDiskMapCompressionEnabled = isBitCaskDiskMapCompressionEnabled; + return this; + } + public Builder withMergeKeyFilter(Set mergeKeyFilter) { this.mergeKeyFilter = mergeKeyFilter; return this; @@ -142,7 +154,8 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS @Override public HoodieMetadataMergedLogRecordScanner build() { return new HoodieMetadataMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, - latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath, mergeKeyFilter); + latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath, mergeKeyFilter, + diskMapType, isBitCaskDiskMapCompressionEnabled); } } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 69fc60f6a..3368c17c7 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -51,14 +51,18 @@ import org.apache.hudi.common.testutils.HoodieCommonTestHarness; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.exception.CorruptedLogFileException; + import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; @@ -73,6 +77,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -81,6 +86,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.params.provider.Arguments.arguments; /** * Tests hoodie log format {@link HoodieLogFormat}. @@ -447,8 +453,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testBasicAppendAndScanMultipleFiles(boolean readBlocksLazily) + @MethodSource("testArguments") + public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException { Writer writer = HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) @@ -487,6 +495,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReverseReader(false) .withBufferSize(bufferSize) .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withDiskMapType(diskMapType) + .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .build(); List scannedRecords = new ArrayList<>(); @@ -594,8 +604,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testAvroLogRecordReaderBasic(boolean readBlocksLazily) + @MethodSource("testArguments") + public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); // Set a small threshold so that every block is a new version @@ -639,6 +651,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReverseReader(false) .withBufferSize(bufferSize) .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withDiskMapType(diskMapType) + .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .build(); assertEquals(200, scanner.getTotalLogRecords()); Set readKeys = new HashSet<>(200); @@ -652,8 +666,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testAvroLogRecordReaderWithRollbackTombstone(boolean readBlocksLazily) + @MethodSource("testArguments") + public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); // Set a small threshold so that every block is a new version @@ -713,6 +729,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReverseReader(false) .withBufferSize(bufferSize) .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withDiskMapType(diskMapType) + .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .build(); assertEquals(200, scanner.getTotalLogRecords(), "We read 200 records from 2 write batches"); Set readKeys = new HashSet<>(200); @@ -725,8 +743,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { assertEquals(originalKeys, readKeys, "CompositeAvroLogReader should return 200 records from 2 versions"); } - @Test - public void testAvroLogRecordReaderWithFailedPartialBlock() + @ParameterizedTest + @MethodSource("testArguments") + public void testAvroLogRecordReaderWithFailedPartialBlock(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled) throws IOException, URISyntaxException, InterruptedException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); // Set a small threshold so that every block is a new version @@ -796,6 +816,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReverseReader(false) .withBufferSize(bufferSize) .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withDiskMapType(diskMapType) + .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .build(); assertEquals(200, scanner.getTotalLogRecords(), "We would read 200 records"); Set readKeys = new HashSet<>(200); @@ -809,8 +831,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testAvroLogRecordReaderWithDeleteAndRollback(boolean readBlocksLazily) + @MethodSource("testArguments") + public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); // Set a small threshold so that every block is a new version @@ -870,6 +894,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReverseReader(false) .withBufferSize(bufferSize) .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withDiskMapType(diskMapType) + .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .build(); assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 records"); @@ -914,14 +940,18 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReverseReader(false) .withBufferSize(bufferSize) .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withDiskMapType(diskMapType) + .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .build(); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); assertEquals(200, readKeys.size(), "Stream collect should return all 200 records after rollback of delete"); } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testAvroLogRecordReaderWithFailedRollbacks(boolean readBlocksLazily) + @MethodSource("testArguments") + public void testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException { // Write a Data block and Delete block with same InstantTime (written in same batch) @@ -991,6 +1021,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReverseReader(false) .withBufferSize(bufferSize) .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withDiskMapType(diskMapType) + .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .build(); assertEquals(0, scanner.getTotalLogRecords(), "We would have scanned 0 records because of rollback"); @@ -1001,8 +1033,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testAvroLogRecordReaderWithInsertDeleteAndRollback(boolean readBlocksLazily) + @MethodSource("testArguments") + public void testAvroLogRecordReaderWithInsertDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException { // Write a Data block and Delete block with same InstantTime (written in same batch) @@ -1055,14 +1089,18 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReverseReader(false) .withBufferSize(bufferSize) .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withDiskMapType(diskMapType) + .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .build(); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); FileCreateUtils.deleteDeltaCommit(basePath, "100", fs); } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testAvroLogRecordReaderWithInvalidRollback(boolean readBlocksLazily) + @MethodSource("testArguments") + public void testAvroLogRecordReaderWithInvalidRollback(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException { Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); // Set a small threshold so that every block is a new version @@ -1102,6 +1140,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReverseReader(false) .withBufferSize(bufferSize) .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withDiskMapType(diskMapType) + .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .build(); assertEquals(100, scanner.getTotalLogRecords(), "We still would read 100 records"); final List readKeys = new ArrayList<>(100); @@ -1110,8 +1150,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(boolean readBlocksLazily) + @MethodSource("testArguments") + public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException { // Write a 3 Data blocs with same InstantTime (written in same batch) @@ -1168,13 +1210,17 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReverseReader(false) .withBufferSize(bufferSize) .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withDiskMapType(diskMapType) + .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .build(); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(boolean readBlocksLazily) + @MethodSource("testArguments") + public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean readBlocksLazily) throws IOException, URISyntaxException, InterruptedException { // Write a 3 Data blocs with same InstantTime (written in same batch) @@ -1270,6 +1316,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReverseReader(false) .withBufferSize(bufferSize) .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withDiskMapType(diskMapType) + .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .build(); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); FileCreateUtils.deleteDeltaCommit(basePath, "100", fs); @@ -1289,7 +1337,9 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { * */ private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1, int numRecordsInLog2, - boolean readBlocksLazily) { + ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean readBlocksLazily) { try { // Write one Data block with same InstantTime (written in same batch) Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); @@ -1340,6 +1390,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { .withReverseReader(false) .withBufferSize(bufferSize) .withSpillableMapBasePath(BASE_OUTPUT_PATH) + .withDiskMapType(diskMapType) + .withBitCaskDiskMapCompressionEnabled(isCompressionEnabled) .build(); assertEquals(Math.max(numRecordsInLog1, numRecordsInLog2), scanner.getNumMergedRecordsInLog(), @@ -1351,33 +1403,42 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt(boolean readBlocksLazily) { + @MethodSource("testArguments") + public void testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean readBlocksLazily) { /* * FIRST_ATTEMPT_FAILED: * Original task from the stage attempt failed, but subsequent stage retry succeeded. */ - testAvroLogRecordReaderMergingMultipleLogFiles(77, 100, readBlocksLazily); + testAvroLogRecordReaderMergingMultipleLogFiles(77, 100, + diskMapType, isCompressionEnabled, readBlocksLazily); } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt(boolean readBlocksLazily) { + @MethodSource("testArguments") + public void testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean readBlocksLazily) { /* * SECOND_ATTEMPT_FAILED: * Original task from stage attempt succeeded, but subsequent retry attempt failed. */ - testAvroLogRecordReaderMergingMultipleLogFiles(100, 66, readBlocksLazily); + testAvroLogRecordReaderMergingMultipleLogFiles(100, 66, + diskMapType, isCompressionEnabled, readBlocksLazily); } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts(boolean readBlocksLazily) { + @MethodSource("testArguments") + public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean readBlocksLazily) { /* * BOTH_ATTEMPTS_SUCCEEDED: * Original task from the stage attempt and duplicate task from the stage retry succeeded. */ - testAvroLogRecordReaderMergingMultipleLogFiles(100, 100, readBlocksLazily); + testAvroLogRecordReaderMergingMultipleLogFiles(100, 100, + diskMapType, isCompressionEnabled, readBlocksLazily); } @ParameterizedTest @@ -1618,4 +1679,18 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { throw new RuntimeException("Unknown data block type " + dataBlockType); } } + + private static Stream testArguments() { + // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, Arg3: readBlocksLazily + return Stream.of( + arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, false), + arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, false), + arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, false), + arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, false), + arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, true), + arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, true), + arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, true), + arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true) + ); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestSpillableMapBasedFileSystemView.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestSpillableMapBasedFileSystemView.java index 7f2e0dc29..8109249c1 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestSpillableMapBasedFileSystemView.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestSpillableMapBasedFileSystemView.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.table.view; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.table.timeline.HoodieTimeline; /** @@ -29,6 +30,7 @@ public class TestSpillableMapBasedFileSystemView extends TestHoodieTableFileSyst protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) { return new SpillableMapBasedFileSystemView(metaClient, timeline, FileSystemViewStorageConfig.newBuilder() // pure disk base View - .withStorageType(FileSystemViewStorageType.SPILLABLE_DISK).withMaxMemoryForView(0L).build()); + .withStorageType(FileSystemViewStorageType.SPILLABLE_DISK).withMaxMemoryForView(0L).build(), + HoodieCommonConfig.newBuilder().build()); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestSpillableMapBasedIncrementalFSViewSync.java b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestSpillableMapBasedIncrementalFSViewSync.java index d3478ce14..c678dd2e4 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestSpillableMapBasedIncrementalFSViewSync.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/view/TestSpillableMapBasedIncrementalFSViewSync.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.table.view; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -29,6 +30,7 @@ public class TestSpillableMapBasedIncrementalFSViewSync extends TestIncrementalF @Override protected SyncableFileSystemView getFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline timeline) { return new SpillableMapBasedFileSystemView(metaClient, timeline, - FileSystemViewStorageConfig.newBuilder().withMaxMemoryForView(0L).withIncrementalTimelineSync(true).build()); + FileSystemViewStorageConfig.newBuilder().withMaxMemoryForView(0L).withIncrementalTimelineSync(true).build(), + HoodieCommonConfig.newBuilder().build()); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java index d39ab7c39..d24452b1f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapFunction.java @@ -234,6 +234,8 @@ public class BootstrapFunction .withBufferSize(this.writeConfig.getMaxDFSStreamBufferSize()) .withMaxMemorySizeInBytes(this.writeConfig.getMaxMemoryPerPartitionMerge()) .withSpillableMapBasePath(this.writeConfig.getSpillableMapBasePath()) + .withDiskMapType(this.writeConfig.getCommonConfig().getSpillableDiskMapType()) + .withBitCaskDiskMapCompressionEnabled(this.writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) .build(); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java index 045fad165..b85c35b92 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java @@ -20,6 +20,7 @@ package org.apache.hudi.utils; import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; @@ -497,6 +498,8 @@ public class TestData { .withBufferSize(16 * 1024 * 1024) .withMaxMemorySizeInBytes(1024 * 1024L) .withSpillableMapBasePath("/tmp/") + .withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()) + .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()) .build(); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index ddb3708ea..c552e3a78 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -19,6 +19,7 @@ package org.apache.hudi.hadoop.realtime; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; @@ -81,6 +82,9 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader .withReverseReader(false) .withBufferSize(jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) .withSpillableMapBasePath(jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) + .withDiskMapType(jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())) + .withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), + HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())) .build(); } diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java index 1f955e87e..278539775 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java @@ -19,6 +19,7 @@ package org.apache.hudi.hadoop.realtime; import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieLogFile; @@ -29,6 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.SchemaTestUtil; +import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; @@ -58,7 +60,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.io.File; import java.io.IOException; @@ -68,11 +71,13 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.REALTIME_SKIP_MERGE_PROP; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.params.provider.Arguments.arguments; public class TestHoodieRealtimeRecordReader { @@ -115,8 +120,10 @@ public class TestHoodieRealtimeRecordReader { } @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testReader(boolean partitioned) throws Exception { + @MethodSource("testArguments") + public void testReader(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled, + boolean partitioned) throws Exception { // initial commit Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); @@ -177,6 +184,9 @@ public class TestHoodieRealtimeRecordReader { List fields = schema.getFields(); setHiveColumnNameProps(fields, jobConf, partitioned); + jobConf.setEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), diskMapType); + jobConf.setBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), isCompressionEnabled); + // validate record reader compaction HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); @@ -287,8 +297,10 @@ public class TestHoodieRealtimeRecordReader { recordReader.close(); } - @Test - public void testReaderWithNestedAndComplexSchema() throws Exception { + @ParameterizedTest + @MethodSource("testArguments") + public void testReaderWithNestedAndComplexSchema(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled) throws Exception { // initial commit Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getComplexEvolvedSchema()); HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); @@ -323,6 +335,9 @@ public class TestHoodieRealtimeRecordReader { List fields = schema.getFields(); setHiveColumnNameProps(fields, jobConf, true); + jobConf.setEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), diskMapType); + jobConf.setBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), isCompressionEnabled); + // validate record reader compaction HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); @@ -416,8 +431,10 @@ public class TestHoodieRealtimeRecordReader { } } - @Test - public void testSchemaEvolutionAndRollbackBlockInLastLogFile() throws Exception { + @ParameterizedTest + @MethodSource("testArguments") + public void testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMap.DiskMapType diskMapType, + boolean isCompressionEnabled) throws Exception { // initial commit List logFilePaths = new ArrayList<>(); Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); @@ -468,6 +485,9 @@ public class TestHoodieRealtimeRecordReader { // Try to read all the fields passed by the new schema setHiveColumnNameProps(fields, jobConf, true); + jobConf.setEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), diskMapType); + jobConf.setBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), isCompressionEnabled); + HoodieRealtimeRecordReader recordReader; try { // validate record reader compaction @@ -488,4 +508,18 @@ public class TestHoodieRealtimeRecordReader { // keep reading } } + + private static Stream testArguments() { + // Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, Arg3: partitioned + return Stream.of( + arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, false), + arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, false), + arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, false), + arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, false), + arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, true), + arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, true), + arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, true), + arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true) + ); + } } diff --git a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java index 23bd81a85..bcdc5aaae 100644 --- a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java +++ b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/reader/DFSHoodieDatasetInputReader.java @@ -20,6 +20,7 @@ package org.apache.hudi.integ.testsuite.reader; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; @@ -282,6 +283,8 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader { .withReverseReader(false) .withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP.defaultValue()) .withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH_PROP.defaultValue()) + .withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()) + .withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()) .build(); // readAvro log files Iterable> iterable = () -> scanner.iterator(); diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java index f6ae7f43b..caecd65f2 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java @@ -18,6 +18,7 @@ package org.apache.hudi.timeline.service; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieLocalEngineContext; @@ -175,25 +176,26 @@ public class TimelineService { HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(conf.get()); // Just use defaults for now HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().build(); + HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().build(); switch (config.viewStorageType) { case MEMORY: FileSystemViewStorageConfig.Builder inMemConfBuilder = FileSystemViewStorageConfig.newBuilder(); inMemConfBuilder.withStorageType(FileSystemViewStorageType.MEMORY); - return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, inMemConfBuilder.build()); + return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, inMemConfBuilder.build(), commonConfig); case SPILLABLE_DISK: { FileSystemViewStorageConfig.Builder spillableConfBuilder = FileSystemViewStorageConfig.newBuilder(); spillableConfBuilder.withStorageType(FileSystemViewStorageType.SPILLABLE_DISK) .withBaseStoreDir(config.baseStorePathForFileGroups) .withMaxMemoryForView(config.maxViewMemPerTableInMB * 1024 * 1024L) .withMemFractionForPendingCompaction(config.memFractionForCompactionPerTable); - return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, spillableConfBuilder.build()); + return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, spillableConfBuilder.build(), commonConfig); } case EMBEDDED_KV_STORE: { FileSystemViewStorageConfig.Builder rocksDBConfBuilder = FileSystemViewStorageConfig.newBuilder(); rocksDBConfBuilder.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE) .withRocksDBPath(config.rocksDBPath); - return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, rocksDBConfBuilder.build()); + return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, rocksDBConfBuilder.build(), commonConfig); } default: throw new IllegalArgumentException("Invalid view manager storage type :" + config.viewStorageType); diff --git a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java index 0865585de..a8922711a 100644 --- a/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java +++ b/hudi-timeline-service/src/test/java/org/apache/hudi/timeline/service/functional/TestRemoteHoodieTableFileSystemView.java @@ -18,6 +18,7 @@ package org.apache.hudi.timeline.service.functional; +import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -46,11 +47,12 @@ public class TestRemoteHoodieTableFileSystemView extends TestHoodieTableFileSyst FileSystemViewStorageConfig sConf = FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.SPILLABLE_DISK).build(); HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().build(); + HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().build(); HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); try { server = new TimelineService(0, - FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, sConf)); + FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, sConf, commonConfig)); server.startService(); } catch (Exception ex) { throw new RuntimeException(ex);