1
0

[HUDI-2044] Integrate consumers with rocksDB and compression within External Spillable Map (#3318)

This commit is contained in:
rmahindra123
2021-07-27 22:31:03 -07:00
committed by GitHub
parent 00cd35f90a
commit 8fef50e237
27 changed files with 405 additions and 109 deletions

View File

@@ -22,6 +22,7 @@ import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.HoodiePrintHelper; import org.apache.hudi.cli.HoodiePrintHelper;
import org.apache.hudi.cli.HoodieTableHeaderFields; import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader; import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
@@ -211,6 +212,8 @@ public class HoodieLogFileCommand implements CommandMarker {
.withMaxMemorySizeInBytes( .withMaxMemorySizeInBytes(
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES) HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH_PROP.defaultValue()) .withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH_PROP.defaultValue())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.build(); .build();
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) { for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) {
Option<IndexedRecord> record = hoodieRecord.getData().getInsertValue(readerSchema); Option<IndexedRecord> record = hoodieRecord.getData().getInsertValue(readerSchema);

View File

@@ -25,6 +25,7 @@ import org.apache.hudi.cli.HoodieTableHeaderFields;
import org.apache.hudi.cli.TableHeader; import org.apache.hudi.cli.TableHeader;
import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest; import org.apache.hudi.cli.testutils.AbstractShellIntegrationTest;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -213,6 +214,8 @@ public class TestHoodieLogFileCommand extends AbstractShellIntegrationTest {
HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLED_PROP.defaultValue())) HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLED_PROP.defaultValue()))
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP.defaultValue()) .withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP.defaultValue())
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH_PROP.defaultValue()) .withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH_PROP.defaultValue())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.build(); .build();
Iterator<HoodieRecord<? extends HoodieRecordPayload>> records = scanner.iterator(); Iterator<HoodieRecord<? extends HoodieRecordPayload>> records = scanner.iterator();

View File

@@ -68,7 +68,8 @@ public class EmbeddedTimelineServerHelper {
Option<String> hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST); Option<String> hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST);
EmbeddedTimelineService timelineService = new EmbeddedTimelineService( EmbeddedTimelineService timelineService = new EmbeddedTimelineService(
context, hostAddr.orElse(null),config.getEmbeddedTimelineServerPort(), context, hostAddr.orElse(null),config.getEmbeddedTimelineServerPort(),
config.getMetadataConfig(), config.getClientSpecifiedViewStorageConfig(), config.getBasePath(), config.getMetadataConfig(), config.getCommonConfig(),
config.getClientSpecifiedViewStorageConfig(), config.getBasePath(),
config.getEmbeddedTimelineServerThreads(), config.getEmbeddedTimelineServerCompressOutput(), config.getEmbeddedTimelineServerThreads(), config.getEmbeddedTimelineServerCompressOutput(),
config.getEmbeddedTimelineServerUseAsync()); config.getEmbeddedTimelineServerUseAsync());
timelineService.startServer(); timelineService.startServer();

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.client.embedded; package org.apache.hudi.client.embedded;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.SerializableConfiguration;
@@ -46,6 +47,7 @@ public class EmbeddedTimelineService {
private final SerializableConfiguration hadoopConf; private final SerializableConfiguration hadoopConf;
private final FileSystemViewStorageConfig config; private final FileSystemViewStorageConfig config;
private final HoodieMetadataConfig metadataConfig; private final HoodieMetadataConfig metadataConfig;
private final HoodieCommonConfig commonConfig;
private final String basePath; private final String basePath;
private final int numThreads; private final int numThreads;
@@ -55,13 +57,14 @@ public class EmbeddedTimelineService {
private transient TimelineService server; private transient TimelineService server;
public EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, int embeddedTimelineServerPort, public EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, int embeddedTimelineServerPort,
HoodieMetadataConfig metadataConfig, FileSystemViewStorageConfig config, String basePath, HoodieMetadataConfig metadataConfig, HoodieCommonConfig commonConfig, FileSystemViewStorageConfig config, String basePath,
int numThreads, boolean compressOutput, boolean useAsync) { int numThreads, boolean compressOutput, boolean useAsync) {
setHostAddr(embeddedTimelineServiceHostAddr); setHostAddr(embeddedTimelineServiceHostAddr);
this.context = context; this.context = context;
this.config = config; this.config = config;
this.basePath = basePath; this.basePath = basePath;
this.metadataConfig = metadataConfig; this.metadataConfig = metadataConfig;
this.commonConfig = commonConfig;
this.hadoopConf = context.getHadoopConf(); this.hadoopConf = context.getHadoopConf();
this.viewManager = createViewManager(); this.viewManager = createViewManager();
this.preferredPort = embeddedTimelineServerPort; this.preferredPort = embeddedTimelineServerPort;
@@ -80,7 +83,7 @@ public class EmbeddedTimelineService {
// Reset to default if set to Remote // Reset to default if set to Remote
builder.withStorageType(FileSystemViewStorageType.MEMORY); builder.withStorageType(FileSystemViewStorageType.MEMORY);
} }
return FileSystemViewManager.createViewManager(context, metadataConfig, builder.build(), basePath); return FileSystemViewManager.createViewManager(context, metadataConfig, builder.build(), commonConfig, basePath);
} }
public void startServer() throws IOException { public void startServer() throws IOException {

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.client.transaction.ConflictResolutionStrategy;
import org.apache.hudi.common.config.ConfigClassProperty; import org.apache.hudi.common.config.ConfigClassProperty;
import org.apache.hudi.common.config.ConfigGroups; import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.engine.EngineType;
@@ -38,7 +39,6 @@ import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorType; import org.apache.hudi.keygen.constant.KeyGeneratorType;
@@ -59,7 +59,6 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Properties; import java.util.Properties;
@@ -312,18 +311,6 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("When enabled, we allow duplicate keys even if inserts are routed to merge with an existing file (for ensuring file sizing)." .withDocumentation("When enabled, we allow duplicate keys even if inserts are routed to merge with an existing file (for ensuring file sizing)."
+ " This is only relevant for insert operation, since upsert, delete operations will ensure unique key constraints are maintained."); + " This is only relevant for insert operation, since upsert, delete operations will ensure unique key constraints are maintained.");
public static final ConfigProperty<ExternalSpillableMap.DiskMapType> SPILLABLE_DISK_MAP_TYPE = ConfigProperty
.key("hoodie.spillable.diskmap.type")
.defaultValue(ExternalSpillableMap.DiskMapType.BITCASK)
.withDocumentation("When handling input data that cannot be held in memory, to merge with a file on storage, a spillable diskmap is employed. "
+ "By default, we use a persistent hashmap based loosely on bitcask, that offers O(1) inserts, lookups. "
+ "Change this to `ROCKS_DB` to prefer using rocksDB, for handling the spill.");
public static final ConfigProperty<Boolean> DISK_MAP_BITCASK_COMPRESSION_ENABLED = ConfigProperty
.key("hoodie.diskmap.bitcask.compression.enabled")
.defaultValue(true)
.withDocumentation("Turn on compression for BITCASK disk map used by the External Spillable Map");
public static final ConfigProperty<Integer> CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP = ConfigProperty public static final ConfigProperty<Integer> CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP = ConfigProperty
.key("hoodie.client.heartbeat.interval_in_ms") .key("hoodie.client.heartbeat.interval_in_ms")
.defaultValue(60 * 1000) .defaultValue(60 * 1000)
@@ -388,6 +375,7 @@ public class HoodieWriteConfig extends HoodieConfig {
private FileSystemViewStorageConfig viewStorageConfig; private FileSystemViewStorageConfig viewStorageConfig;
private HoodiePayloadConfig hoodiePayloadConfig; private HoodiePayloadConfig hoodiePayloadConfig;
private HoodieMetadataConfig metadataConfig; private HoodieMetadataConfig metadataConfig;
private HoodieCommonConfig commonConfig;
private EngineType engineType; private EngineType engineType;
/** /**
@@ -409,6 +397,7 @@ public class HoodieWriteConfig extends HoodieConfig {
this.viewStorageConfig = clientSpecifiedViewStorageConfig; this.viewStorageConfig = clientSpecifiedViewStorageConfig;
this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build(); this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build();
this.metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(props).build(); this.metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(props).build();
this.commonConfig = HoodieCommonConfig.newBuilder().fromProperties(props).build();
} }
public static HoodieWriteConfig.Builder newBuilder() { public static HoodieWriteConfig.Builder newBuilder() {
@@ -597,14 +586,6 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(MERGE_ALLOW_DUPLICATE_ON_INSERTS); return getBoolean(MERGE_ALLOW_DUPLICATE_ON_INSERTS);
} }
public ExternalSpillableMap.DiskMapType getSpillableDiskMapType() {
return ExternalSpillableMap.DiskMapType.valueOf(getString(SPILLABLE_DISK_MAP_TYPE).toUpperCase(Locale.ROOT));
}
public boolean isBitCaskDiskMapCompressionEnabled() {
return getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED);
}
public EngineType getEngineType() { public EngineType getEngineType() {
return engineType; return engineType;
} }
@@ -1159,6 +1140,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return metadataConfig; return metadataConfig;
} }
public HoodieCommonConfig getCommonConfig() {
return commonConfig;
}
/** /**
* Commit call back configs. * Commit call back configs.
*/ */
@@ -1564,16 +1549,6 @@ public class HoodieWriteConfig extends HoodieConfig {
return this; return this;
} }
public Builder withSpillableDiskMapType(ExternalSpillableMap.DiskMapType diskMapType) {
writeConfig.setValue(SPILLABLE_DISK_MAP_TYPE, diskMapType.name());
return this;
}
public Builder withBitcaskDiskMapCompressionEnabled(boolean bitcaskDiskMapCompressionEnabled) {
writeConfig.setValue(DISK_MAP_BITCASK_COMPRESSION_ENABLED, String.valueOf(bitcaskDiskMapCompressionEnabled));
return this;
}
public Builder withHeartbeatIntervalInMs(Integer heartbeatIntervalInMs) { public Builder withHeartbeatIntervalInMs(Integer heartbeatIntervalInMs) {
writeConfig.setValue(CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP, String.valueOf(heartbeatIntervalInMs)); writeConfig.setValue(CLIENT_HEARTBEAT_INTERVAL_IN_MS_PROP, String.valueOf(heartbeatIntervalInMs));
return this; return this;

View File

@@ -211,7 +211,8 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H
LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge); LOG.info("MaxMemoryPerPartitionMerge => " + memoryForMerge);
this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(), this.keyToNewRecords = new ExternalSpillableMap<>(memoryForMerge, config.getSpillableMapBasePath(),
new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(tableSchema), new DefaultSizeEstimator(), new HoodieRecordSizeEstimator(tableSchema),
config.getSpillableDiskMapType(), config.isBitCaskDiskMapCompressionEnabled()); config.getCommonConfig().getSpillableDiskMapType(),
config.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
} catch (IOException io) { } catch (IOException io) {
throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io); throw new HoodieIOException("Cannot instantiate an ExternalSpillableMap", io);
} }

View File

@@ -113,7 +113,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
this.metadata = HoodieTableMetadata.create(context, metadataConfig, config.getBasePath(), this.metadata = HoodieTableMetadata.create(context, metadataConfig, config.getBasePath(),
FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue()); FileSystemViewStorageConfig.FILESYSTEM_VIEW_SPILLABLE_DIR.defaultValue());
this.viewManager = FileSystemViewManager.createViewManager(context, config.getMetadataConfig(), config.getViewStorageConfig(), () -> metadata); this.viewManager = FileSystemViewManager.createViewManager(context, config.getMetadataConfig(), config.getViewStorageConfig(), config.getCommonConfig(), () -> metadata);
this.metaClient = metaClient; this.metaClient = metaClient;
this.index = getIndex(config, context); this.index = getIndex(config, context);
this.taskContextSupplier = context.getTaskContextSupplier(); this.taskContextSupplier = context.getTaskContextSupplier();
@@ -123,7 +123,7 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
private synchronized FileSystemViewManager getViewManager() { private synchronized FileSystemViewManager getViewManager() {
if (null == viewManager) { if (null == viewManager) {
viewManager = FileSystemViewManager.createViewManager(getContext(), config.getMetadataConfig(), config.getViewStorageConfig(), () -> metadata); viewManager = FileSystemViewManager.createViewManager(getContext(), config.getMetadataConfig(), config.getViewStorageConfig(), config.getCommonConfig(), () -> metadata);
} }
return viewManager; return viewManager;
} }

View File

@@ -129,6 +129,8 @@ public class HoodieFlinkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
.withReverseReader(config.getCompactionReverseLogReadEnabled()) .withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize()) .withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath()) .withSpillableMapBasePath(config.getSpillableMapBasePath())
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build(); .build();
if (!scanner.iterator().hasNext()) { if (!scanner.iterator().hasNext()) {
return new ArrayList<>(); return new ArrayList<>();

View File

@@ -207,6 +207,8 @@ public class SparkExecuteClusteringCommitActionExecutor<T extends HoodieRecordPa
.withReverseReader(config.getCompactionReverseLogReadEnabled()) .withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize()) .withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath()) .withSpillableMapBasePath(config.getSpillableMapBasePath())
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build(); .build();
recordIterators.add(HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema, recordIterators.add(HoodieFileSliceReader.getFileSliceReader(baseFileReader, scanner, readerSchema,

View File

@@ -151,6 +151,8 @@ public class HoodieSparkMergeOnReadTableCompactor<T extends HoodieRecordPayload>
.withReverseReader(config.getCompactionReverseLogReadEnabled()) .withReverseReader(config.getCompactionReverseLogReadEnabled())
.withBufferSize(config.getMaxDFSStreamBufferSize()) .withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath()) .withSpillableMapBasePath(config.getSpillableMapBasePath())
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build(); .build();
if (!scanner.iterator().hasNext()) { if (!scanner.iterator().hasNext()) {
return new ArrayList<>(); return new ArrayList<>();

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.io;
import org.apache.hudi.client.SparkRDDWriteClient; import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus; import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.HoodieWriteStat;
@@ -27,6 +28,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig; import org.apache.hudi.config.HoodieStorageConfig;
@@ -41,16 +43,21 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row; import org.apache.spark.sql.Row;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Properties;
import java.util.stream.Stream;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public class TestHoodieMergeHandle extends HoodieClientTestHarness { public class TestHoodieMergeHandle extends HoodieClientTestHarness {
@@ -69,14 +76,23 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
cleanupResources(); cleanupResources();
} }
@Test @ParameterizedTest
public void testUpsertsForMultipleRecordsInSameFile() throws Exception { @MethodSource("testArguments")
public void testUpsertsForMultipleRecordsInSameFile(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled) throws Exception {
// Create records in a single partition // Create records in a single partition
String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0]; String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
dataGen = new HoodieTestDataGenerator(new String[] {partitionPath}); dataGen = new HoodieTestDataGenerator(new String[] {partitionPath});
// Build a common config with diff configs
Properties properties = new Properties();
properties.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), diskMapType.name());
properties.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), String.valueOf(isCompressionEnabled));
// Build a write config with bulkinsertparallelism set // Build a write config with bulkinsertparallelism set
HoodieWriteConfig cfg = getConfigBuilder().build(); HoodieWriteConfig cfg = getConfigBuilder()
.withProperties(properties)
.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) { try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
FileSystem fs = FSUtils.getFs(basePath, hadoopConf); FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
@@ -215,10 +231,19 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
} }
} }
@Test @ParameterizedTest
public void testHoodieMergeHandleWriteStatMetrics() throws Exception { @MethodSource("testArguments")
public void testHoodieMergeHandleWriteStatMetrics(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled) throws Exception {
// insert 100 records // insert 100 records
HoodieWriteConfig config = getConfigBuilder().build(); // Build a common config with diff configs
Properties properties = new Properties();
properties.setProperty(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), diskMapType.name());
properties.setProperty(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), String.valueOf(isCompressionEnabled));
HoodieWriteConfig config = getConfigBuilder()
.withProperties(properties)
.build();
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) { try (SparkRDDWriteClient writeClient = getHoodieWriteClient(config);) {
String newCommitTime = "100"; String newCommitTime = "100";
writeClient.startCommitWithTime(newCommitTime); writeClient.startCommitWithTime(newCommitTime);
@@ -317,6 +342,16 @@ public class TestHoodieMergeHandle extends HoodieClientTestHarness {
.withBulkInsertParallelism(2).withWriteStatusClass(TestWriteStatus.class); .withBulkInsertParallelism(2).withWriteStatusClass(TestWriteStatus.class);
} }
private static Stream<Arguments> testArguments() {
// Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled
return Stream.of(
arguments(ExternalSpillableMap.DiskMapType.BITCASK, false),
arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false),
arguments(ExternalSpillableMap.DiskMapType.BITCASK, true),
arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true)
);
}
/** /**
* Overridden so that we can capture and inspect all success records. * Overridden so that we can capture and inspect all success records.
*/ */

View File

@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.common.config;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Locale;
import java.util.Properties;
/**
* Common Configurations used across Hudi.
*/
public class HoodieCommonConfig extends HoodieConfig {
public static final ConfigProperty<ExternalSpillableMap.DiskMapType> SPILLABLE_DISK_MAP_TYPE = ConfigProperty
.key("hoodie.common.spillable.diskmap.type")
.defaultValue(ExternalSpillableMap.DiskMapType.BITCASK)
.withDocumentation("When handling input data that cannot be held in memory, to merge with a file on storage, a spillable diskmap is employed. "
+ "By default, we use a persistent hashmap based loosely on bitcask, that offers O(1) inserts, lookups. "
+ "Change this to `ROCKS_DB` to prefer using rocksDB, for handling the spill.");
public static final ConfigProperty<Boolean> DISK_MAP_BITCASK_COMPRESSION_ENABLED = ConfigProperty
.key("hoodie.common.diskmap.compression.enabled")
.defaultValue(true)
.withDocumentation("Turn on compression for BITCASK disk map used by the External Spillable Map");
public ExternalSpillableMap.DiskMapType getSpillableDiskMapType() {
return ExternalSpillableMap.DiskMapType.valueOf(getString(SPILLABLE_DISK_MAP_TYPE).toUpperCase(Locale.ROOT));
}
public boolean isBitCaskDiskMapCompressionEnabled() {
return getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED);
}
private HoodieCommonConfig() {
super();
}
public static HoodieCommonConfig.Builder newBuilder() {
return new HoodieCommonConfig.Builder();
}
public static class Builder {
private final HoodieCommonConfig commonConfig = new HoodieCommonConfig();
public HoodieCommonConfig.Builder fromFile(File propertiesFile) throws IOException {
try (FileReader reader = new FileReader(propertiesFile)) {
commonConfig.getProps().load(reader);
return this;
}
}
public HoodieCommonConfig.Builder fromProperties(Properties props) {
commonConfig.getProps().putAll(props);
return this;
}
public Builder withSpillableDiskMapType(ExternalSpillableMap.DiskMapType diskMapType) {
commonConfig.setValue(SPILLABLE_DISK_MAP_TYPE, diskMapType.name());
return this;
}
public Builder withBitcaskDiskMapCompressionEnabled(boolean bitcaskDiskMapCompressionEnabled) {
commonConfig.setValue(DISK_MAP_BITCASK_COMPRESSION_ENABLED, String.valueOf(bitcaskDiskMapCompressionEnabled));
return this;
}
public HoodieCommonConfig build() {
commonConfig.setDefaults(HoodieCommonConfig.class.getName());
return commonConfig;
}
}
}

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.common.table.log; package org.apache.hudi.common.table.log;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -73,12 +74,13 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema, protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily, String latestInstantTime, Long maxMemorySizeInBytes, boolean readBlocksLazily,
boolean reverseReader, int bufferSize, String spillableMapBasePath, boolean reverseReader, int bufferSize, String spillableMapBasePath,
Option<InstantRange> instantRange, boolean autoScan) { Option<InstantRange> instantRange, boolean autoScan,
ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange); super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, readBlocksLazily, reverseReader, bufferSize, instantRange);
try { try {
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize // Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(), this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(readerSchema)); new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled);
this.maxMemorySizeInBytes = maxMemorySizeInBytes; this.maxMemorySizeInBytes = maxMemorySizeInBytes;
} catch (IOException e) { } catch (IOException e) {
throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e); throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
@@ -169,6 +171,8 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
// specific configurations // specific configurations
protected Long maxMemorySizeInBytes; protected Long maxMemorySizeInBytes;
protected String spillableMapBasePath; protected String spillableMapBasePath;
protected ExternalSpillableMap.DiskMapType diskMapType = HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue();
protected boolean isBitCaskDiskMapCompressionEnabled = HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue();
// incremental filtering // incremental filtering
private Option<InstantRange> instantRange = Option.empty(); private Option<InstantRange> instantRange = Option.empty();
// auto scan default true // auto scan default true
@@ -229,6 +233,16 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
return this; return this;
} }
public Builder withDiskMapType(ExternalSpillableMap.DiskMapType diskMapType) {
this.diskMapType = diskMapType;
return this;
}
public Builder withBitCaskDiskMapCompressionEnabled(boolean isBitCaskDiskMapCompressionEnabled) {
this.isBitCaskDiskMapCompressionEnabled = isBitCaskDiskMapCompressionEnabled;
return this;
}
public Builder withAutoScan(boolean autoScan) { public Builder withAutoScan(boolean autoScan) {
this.autoScan = autoScan; this.autoScan = autoScan;
return this; return this;
@@ -238,7 +252,8 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
public HoodieMergedLogRecordScanner build() { public HoodieMergedLogRecordScanner build() {
return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader, latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader,
bufferSize, spillableMapBasePath, instantRange, autoScan); bufferSize, spillableMapBasePath, instantRange, autoScan,
diskMapType, isBitCaskDiskMapCompressionEnabled);
} }
} }
} }

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.common.table.view; package org.apache.hudi.common.table.view;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -145,10 +146,10 @@ public class FileSystemViewManager {
* @return * @return
*/ */
private static SpillableMapBasedFileSystemView createSpillableMapBasedFileSystemView(SerializableConfiguration conf, private static SpillableMapBasedFileSystemView createSpillableMapBasedFileSystemView(SerializableConfiguration conf,
FileSystemViewStorageConfig viewConf, HoodieTableMetaClient metaClient) { FileSystemViewStorageConfig viewConf, HoodieTableMetaClient metaClient, HoodieCommonConfig commonConfig) {
LOG.info("Creating SpillableMap based view for basePath " + metaClient.getBasePath()); LOG.info("Creating SpillableMap based view for basePath " + metaClient.getBasePath());
HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); HoodieTimeline timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
return new SpillableMapBasedFileSystemView(metaClient, timeline, viewConf); return new SpillableMapBasedFileSystemView(metaClient, timeline, viewConf, commonConfig);
} }
/** /**
@@ -205,15 +206,17 @@ public class FileSystemViewManager {
public static FileSystemViewManager createViewManager(final HoodieEngineContext context, public static FileSystemViewManager createViewManager(final HoodieEngineContext context,
final HoodieMetadataConfig metadataConfig, final HoodieMetadataConfig metadataConfig,
final FileSystemViewStorageConfig config) { final FileSystemViewStorageConfig config,
return createViewManager(context, metadataConfig, config, (SerializableSupplier<HoodieTableMetadata>) null); final HoodieCommonConfig commonConfig) {
return createViewManager(context, metadataConfig, config, commonConfig, (SerializableSupplier<HoodieTableMetadata>) null);
} }
public static FileSystemViewManager createViewManager(final HoodieEngineContext context, public static FileSystemViewManager createViewManager(final HoodieEngineContext context,
final HoodieMetadataConfig metadataConfig, final HoodieMetadataConfig metadataConfig,
final FileSystemViewStorageConfig config, final FileSystemViewStorageConfig config,
final HoodieCommonConfig commonConfig,
final String basePath) { final String basePath) {
return createViewManager(context, metadataConfig, config, return createViewManager(context, metadataConfig, config, commonConfig,
() -> HoodieTableMetadata.create(context, metadataConfig, basePath, config.getSpillableDir(), true)); () -> HoodieTableMetadata.create(context, metadataConfig, basePath, config.getSpillableDir(), true));
} }
@@ -224,6 +227,7 @@ public class FileSystemViewManager {
public static FileSystemViewManager createViewManager(final HoodieEngineContext context, public static FileSystemViewManager createViewManager(final HoodieEngineContext context,
final HoodieMetadataConfig metadataConfig, final HoodieMetadataConfig metadataConfig,
final FileSystemViewStorageConfig config, final FileSystemViewStorageConfig config,
final HoodieCommonConfig commonConfig,
final SerializableSupplier<HoodieTableMetadata> metadataSupplier) { final SerializableSupplier<HoodieTableMetadata> metadataSupplier) {
LOG.info("Creating View Manager with storage type :" + config.getStorageType()); LOG.info("Creating View Manager with storage type :" + config.getStorageType());
final SerializableConfiguration conf = context.getHadoopConf(); final SerializableConfiguration conf = context.getHadoopConf();
@@ -235,7 +239,7 @@ public class FileSystemViewManager {
case SPILLABLE_DISK: case SPILLABLE_DISK:
LOG.info("Creating Spillable Disk based Table View"); LOG.info("Creating Spillable Disk based Table View");
return new FileSystemViewManager(context, config, return new FileSystemViewManager(context, config,
(metaClient, viewConf) -> createSpillableMapBasedFileSystemView(conf, viewConf, metaClient)); (metaClient, viewConf) -> createSpillableMapBasedFileSystemView(conf, viewConf, metaClient, commonConfig));
case MEMORY: case MEMORY:
LOG.info("Creating in-memory based Table View"); LOG.info("Creating in-memory based Table View");
return new FileSystemViewManager(context, config, return new FileSystemViewManager(context, config,
@@ -258,7 +262,7 @@ public class FileSystemViewManager {
secondaryView = createRocksDBBasedFileSystemView(conf, viewConfig, metaClient); secondaryView = createRocksDBBasedFileSystemView(conf, viewConfig, metaClient);
break; break;
case SPILLABLE_DISK: case SPILLABLE_DISK:
secondaryView = createSpillableMapBasedFileSystemView(conf, viewConfig, metaClient); secondaryView = createSpillableMapBasedFileSystemView(conf, viewConfig, metaClient, commonConfig);
break; break;
default: default:
throw new IllegalArgumentException("Secondary Storage type can only be in-memory or spillable. Was :" throw new IllegalArgumentException("Secondary Storage type can only be in-memory or spillable. Was :"

View File

@@ -19,6 +19,8 @@
package org.apache.hudi.common.table.view; package org.apache.hudi.common.table.view;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.BootstrapBaseFileMapping; import org.apache.hudi.common.model.BootstrapBaseFileMapping;
import org.apache.hudi.common.model.CompactionOperation; import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieFileGroup;
@@ -53,9 +55,11 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {
private final long maxMemoryForReplaceFileGroups; private final long maxMemoryForReplaceFileGroups;
private final long maxMemoryForClusteringFileGroups; private final long maxMemoryForClusteringFileGroups;
private final String baseStoreDir; private final String baseStoreDir;
private final ExternalSpillableMap.DiskMapType diskMapType;
private final boolean isBitCaskDiskMapCompressionEnabled;
public SpillableMapBasedFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, public SpillableMapBasedFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
FileSystemViewStorageConfig config) { FileSystemViewStorageConfig config, HoodieCommonConfig commonConfig) {
super(config.isIncrementalTimelineSyncEnabled()); super(config.isIncrementalTimelineSyncEnabled());
this.maxMemoryForFileGroupMap = config.getMaxMemoryForFileGroupMap(); this.maxMemoryForFileGroupMap = config.getMaxMemoryForFileGroupMap();
this.maxMemoryForPendingCompaction = config.getMaxMemoryForPendingCompaction(); this.maxMemoryForPendingCompaction = config.getMaxMemoryForPendingCompaction();
@@ -63,12 +67,14 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {
this.maxMemoryForReplaceFileGroups = config.getMaxMemoryForReplacedFileGroups(); this.maxMemoryForReplaceFileGroups = config.getMaxMemoryForReplacedFileGroups();
this.maxMemoryForClusteringFileGroups = config.getMaxMemoryForPendingClusteringFileGroups(); this.maxMemoryForClusteringFileGroups = config.getMaxMemoryForPendingClusteringFileGroups();
this.baseStoreDir = config.getSpillableDir(); this.baseStoreDir = config.getSpillableDir();
diskMapType = commonConfig.getSpillableDiskMapType();
isBitCaskDiskMapCompressionEnabled = commonConfig.isBitCaskDiskMapCompressionEnabled();
init(metaClient, visibleActiveTimeline); init(metaClient, visibleActiveTimeline);
} }
public SpillableMapBasedFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline, public SpillableMapBasedFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
FileStatus[] fileStatuses, FileSystemViewStorageConfig config) { FileStatus[] fileStatuses, FileSystemViewStorageConfig config, HoodieCommonConfig commonConfig) {
this(metaClient, visibleActiveTimeline, config); this(metaClient, visibleActiveTimeline, config, commonConfig);
addFilesToView(fileStatuses); addFilesToView(fileStatuses);
} }
@@ -79,7 +85,8 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {
+ ", BaseDir=" + baseStoreDir); + ", BaseDir=" + baseStoreDir);
new File(baseStoreDir).mkdirs(); new File(baseStoreDir).mkdirs();
return (Map<String, List<HoodieFileGroup>>) (new ExternalSpillableMap<>(maxMemoryForFileGroupMap, baseStoreDir, return (Map<String, List<HoodieFileGroup>>) (new ExternalSpillableMap<>(maxMemoryForFileGroupMap, baseStoreDir,
new DefaultSizeEstimator(), new DefaultSizeEstimator<>())); new DefaultSizeEstimator(), new DefaultSizeEstimator<>(),
diskMapType, isBitCaskDiskMapCompressionEnabled));
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@@ -93,7 +100,8 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {
+ ", BaseDir=" + baseStoreDir); + ", BaseDir=" + baseStoreDir);
new File(baseStoreDir).mkdirs(); new File(baseStoreDir).mkdirs();
Map<HoodieFileGroupId, Pair<String, CompactionOperation>> pendingMap = new ExternalSpillableMap<>( Map<HoodieFileGroupId, Pair<String, CompactionOperation>> pendingMap = new ExternalSpillableMap<>(
maxMemoryForPendingCompaction, baseStoreDir, new DefaultSizeEstimator(), new DefaultSizeEstimator<>()); maxMemoryForPendingCompaction, baseStoreDir, new DefaultSizeEstimator(), new DefaultSizeEstimator<>(),
diskMapType, isBitCaskDiskMapCompressionEnabled);
pendingMap.putAll(fgIdToPendingCompaction); pendingMap.putAll(fgIdToPendingCompaction);
return pendingMap; return pendingMap;
} catch (IOException e) { } catch (IOException e) {
@@ -109,7 +117,8 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {
+ ", BaseDir=" + baseStoreDir); + ", BaseDir=" + baseStoreDir);
new File(baseStoreDir).mkdirs(); new File(baseStoreDir).mkdirs();
Map<HoodieFileGroupId, BootstrapBaseFileMapping> pendingMap = new ExternalSpillableMap<>( Map<HoodieFileGroupId, BootstrapBaseFileMapping> pendingMap = new ExternalSpillableMap<>(
maxMemoryForBootstrapBaseFile, baseStoreDir, new DefaultSizeEstimator(), new DefaultSizeEstimator<>()); maxMemoryForBootstrapBaseFile, baseStoreDir, new DefaultSizeEstimator(), new DefaultSizeEstimator<>(),
diskMapType, isBitCaskDiskMapCompressionEnabled);
pendingMap.putAll(fileGroupIdBootstrapBaseFileMap); pendingMap.putAll(fileGroupIdBootstrapBaseFileMap);
return pendingMap; return pendingMap;
} catch (IOException e) { } catch (IOException e) {
@@ -124,7 +133,8 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {
+ ", BaseDir=" + baseStoreDir); + ", BaseDir=" + baseStoreDir);
new File(baseStoreDir).mkdirs(); new File(baseStoreDir).mkdirs();
Map<HoodieFileGroupId, HoodieInstant> pendingMap = new ExternalSpillableMap<>( Map<HoodieFileGroupId, HoodieInstant> pendingMap = new ExternalSpillableMap<>(
maxMemoryForReplaceFileGroups, baseStoreDir, new DefaultSizeEstimator(), new DefaultSizeEstimator<>()); maxMemoryForReplaceFileGroups, baseStoreDir, new DefaultSizeEstimator(), new DefaultSizeEstimator<>(),
diskMapType, isBitCaskDiskMapCompressionEnabled);
pendingMap.putAll(replacedFileGroups); pendingMap.putAll(replacedFileGroups);
return pendingMap; return pendingMap;
} catch (IOException e) { } catch (IOException e) {
@@ -139,7 +149,8 @@ public class SpillableMapBasedFileSystemView extends HoodieTableFileSystemView {
+ ", BaseDir=" + baseStoreDir); + ", BaseDir=" + baseStoreDir);
new File(baseStoreDir).mkdirs(); new File(baseStoreDir).mkdirs();
Map<HoodieFileGroupId, HoodieInstant> pendingMap = new ExternalSpillableMap<>( Map<HoodieFileGroupId, HoodieInstant> pendingMap = new ExternalSpillableMap<>(
maxMemoryForClusteringFileGroups, baseStoreDir, new DefaultSizeEstimator(), new DefaultSizeEstimator<>()); maxMemoryForClusteringFileGroups, baseStoreDir, new DefaultSizeEstimator(), new DefaultSizeEstimator<>(),
diskMapType, isBitCaskDiskMapCompressionEnabled);
pendingMap.putAll(fileGroupsInClustering); pendingMap.putAll(fileGroupsInClustering);
return pendingMap; return pendingMap;
} catch (IOException e) { } catch (IOException e) {

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.metadata;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -213,6 +214,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
// Load the schema // Load the schema
Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema()); Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder() logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder()
.withFileSystem(metaClient.getFs()) .withFileSystem(metaClient.getFs())
.withBasePath(metadataBasePath) .withBasePath(metadataBasePath)
@@ -222,6 +224,8 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
.withMaxMemorySizeInBytes(MAX_MEMORY_SIZE_IN_BYTES) .withMaxMemorySizeInBytes(MAX_MEMORY_SIZE_IN_BYTES)
.withBufferSize(BUFFER_SIZE) .withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableMapDirectory) .withSpillableMapBasePath(spillableMapDirectory)
.withDiskMapType(commonConfig.getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
.build(); .build();
logScannerOpenMs = timer.endTimer(); logScannerOpenMs = timer.endTimer();

View File

@@ -30,6 +30,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
/** /**
* A {@code HoodieMergedLogRecordScanner} implementation which only merged records matching providing keys. This is * A {@code HoodieMergedLogRecordScanner} implementation which only merged records matching providing keys. This is
@@ -41,9 +42,10 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS
private HoodieMetadataMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths, private HoodieMetadataMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths,
Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes, int bufferSize, Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes, int bufferSize,
String spillableMapBasePath, Set<String> mergeKeyFilter) { String spillableMapBasePath, Set<String> mergeKeyFilter,
ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, false, false, bufferSize, super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, false, false, bufferSize,
spillableMapBasePath, Option.empty(), false); spillableMapBasePath, Option.empty(), false, diskMapType, isBitCaskDiskMapCompressionEnabled);
this.mergeKeyFilter = mergeKeyFilter; this.mergeKeyFilter = mergeKeyFilter;
performScan(); performScan();
@@ -134,6 +136,16 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS
return this; return this;
} }
public Builder withDiskMapType(ExternalSpillableMap.DiskMapType diskMapType) {
this.diskMapType = diskMapType;
return this;
}
public Builder withBitCaskDiskMapCompressionEnabled(boolean isBitCaskDiskMapCompressionEnabled) {
this.isBitCaskDiskMapCompressionEnabled = isBitCaskDiskMapCompressionEnabled;
return this;
}
public Builder withMergeKeyFilter(Set<String> mergeKeyFilter) { public Builder withMergeKeyFilter(Set<String> mergeKeyFilter) {
this.mergeKeyFilter = mergeKeyFilter; this.mergeKeyFilter = mergeKeyFilter;
return this; return this;
@@ -142,7 +154,8 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS
@Override @Override
public HoodieMetadataMergedLogRecordScanner build() { public HoodieMetadataMergedLogRecordScanner build() {
return new HoodieMetadataMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema, return new HoodieMetadataMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath, mergeKeyFilter); latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath, mergeKeyFilter,
diskMapType, isBitCaskDiskMapCompressionEnabled);
} }
} }
} }

View File

@@ -51,14 +51,18 @@ import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil; import org.apache.hudi.common.testutils.minicluster.MiniClusterUtil;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.exception.CorruptedLogFileException; import org.apache.hudi.exception.CorruptedLogFileException;
import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException; import java.io.IOException;
@@ -73,6 +77,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -81,6 +86,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
/** /**
* Tests hoodie log format {@link HoodieLogFormat}. * Tests hoodie log format {@link HoodieLogFormat}.
@@ -447,8 +453,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @MethodSource("testArguments")
public void testBasicAppendAndScanMultipleFiles(boolean readBlocksLazily) public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily)
throws IOException, URISyntaxException, InterruptedException { throws IOException, URISyntaxException, InterruptedException {
Writer writer = Writer writer =
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
@@ -487,6 +495,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withReverseReader(false) .withReverseReader(false)
.withBufferSize(bufferSize) .withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH) .withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build(); .build();
List<IndexedRecord> scannedRecords = new ArrayList<>(); List<IndexedRecord> scannedRecords = new ArrayList<>();
@@ -594,8 +604,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @MethodSource("testArguments")
public void testAvroLogRecordReaderBasic(boolean readBlocksLazily) public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily)
throws IOException, URISyntaxException, InterruptedException { throws IOException, URISyntaxException, InterruptedException {
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version // Set a small threshold so that every block is a new version
@@ -639,6 +651,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withReverseReader(false) .withReverseReader(false)
.withBufferSize(bufferSize) .withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH) .withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build(); .build();
assertEquals(200, scanner.getTotalLogRecords()); assertEquals(200, scanner.getTotalLogRecords());
Set<String> readKeys = new HashSet<>(200); Set<String> readKeys = new HashSet<>(200);
@@ -652,8 +666,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @MethodSource("testArguments")
public void testAvroLogRecordReaderWithRollbackTombstone(boolean readBlocksLazily) public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily)
throws IOException, URISyntaxException, InterruptedException { throws IOException, URISyntaxException, InterruptedException {
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version // Set a small threshold so that every block is a new version
@@ -713,6 +729,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withReverseReader(false) .withReverseReader(false)
.withBufferSize(bufferSize) .withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH) .withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build(); .build();
assertEquals(200, scanner.getTotalLogRecords(), "We read 200 records from 2 write batches"); assertEquals(200, scanner.getTotalLogRecords(), "We read 200 records from 2 write batches");
Set<String> readKeys = new HashSet<>(200); Set<String> readKeys = new HashSet<>(200);
@@ -725,8 +743,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
assertEquals(originalKeys, readKeys, "CompositeAvroLogReader should return 200 records from 2 versions"); assertEquals(originalKeys, readKeys, "CompositeAvroLogReader should return 200 records from 2 versions");
} }
@Test @ParameterizedTest
public void testAvroLogRecordReaderWithFailedPartialBlock() @MethodSource("testArguments")
public void testAvroLogRecordReaderWithFailedPartialBlock(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled)
throws IOException, URISyntaxException, InterruptedException { throws IOException, URISyntaxException, InterruptedException {
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version // Set a small threshold so that every block is a new version
@@ -796,6 +816,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withReverseReader(false) .withReverseReader(false)
.withBufferSize(bufferSize) .withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH) .withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build(); .build();
assertEquals(200, scanner.getTotalLogRecords(), "We would read 200 records"); assertEquals(200, scanner.getTotalLogRecords(), "We would read 200 records");
Set<String> readKeys = new HashSet<>(200); Set<String> readKeys = new HashSet<>(200);
@@ -809,8 +831,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @MethodSource("testArguments")
public void testAvroLogRecordReaderWithDeleteAndRollback(boolean readBlocksLazily) public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily)
throws IOException, URISyntaxException, InterruptedException { throws IOException, URISyntaxException, InterruptedException {
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version // Set a small threshold so that every block is a new version
@@ -870,6 +894,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withReverseReader(false) .withReverseReader(false)
.withBufferSize(bufferSize) .withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH) .withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build(); .build();
assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 records"); assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 records");
@@ -914,14 +940,18 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withReverseReader(false) .withReverseReader(false)
.withBufferSize(bufferSize) .withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH) .withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build(); .build();
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey())); scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
assertEquals(200, readKeys.size(), "Stream collect should return all 200 records after rollback of delete"); assertEquals(200, readKeys.size(), "Stream collect should return all 200 records after rollback of delete");
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @MethodSource("testArguments")
public void testAvroLogRecordReaderWithFailedRollbacks(boolean readBlocksLazily) public void testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily)
throws IOException, URISyntaxException, InterruptedException { throws IOException, URISyntaxException, InterruptedException {
// Write a Data block and Delete block with same InstantTime (written in same batch) // Write a Data block and Delete block with same InstantTime (written in same batch)
@@ -991,6 +1021,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withReverseReader(false) .withReverseReader(false)
.withBufferSize(bufferSize) .withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH) .withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build(); .build();
assertEquals(0, scanner.getTotalLogRecords(), "We would have scanned 0 records because of rollback"); assertEquals(0, scanner.getTotalLogRecords(), "We would have scanned 0 records because of rollback");
@@ -1001,8 +1033,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @MethodSource("testArguments")
public void testAvroLogRecordReaderWithInsertDeleteAndRollback(boolean readBlocksLazily) public void testAvroLogRecordReaderWithInsertDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily)
throws IOException, URISyntaxException, InterruptedException { throws IOException, URISyntaxException, InterruptedException {
// Write a Data block and Delete block with same InstantTime (written in same batch) // Write a Data block and Delete block with same InstantTime (written in same batch)
@@ -1055,14 +1089,18 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withReverseReader(false) .withReverseReader(false)
.withBufferSize(bufferSize) .withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH) .withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build(); .build();
assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
FileCreateUtils.deleteDeltaCommit(basePath, "100", fs); FileCreateUtils.deleteDeltaCommit(basePath, "100", fs);
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @MethodSource("testArguments")
public void testAvroLogRecordReaderWithInvalidRollback(boolean readBlocksLazily) public void testAvroLogRecordReaderWithInvalidRollback(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily)
throws IOException, URISyntaxException, InterruptedException { throws IOException, URISyntaxException, InterruptedException {
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
// Set a small threshold so that every block is a new version // Set a small threshold so that every block is a new version
@@ -1102,6 +1140,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withReverseReader(false) .withReverseReader(false)
.withBufferSize(bufferSize) .withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH) .withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build(); .build();
assertEquals(100, scanner.getTotalLogRecords(), "We still would read 100 records"); assertEquals(100, scanner.getTotalLogRecords(), "We still would read 100 records");
final List<String> readKeys = new ArrayList<>(100); final List<String> readKeys = new ArrayList<>(100);
@@ -1110,8 +1150,10 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @MethodSource("testArguments")
public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(boolean readBlocksLazily) public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily)
throws IOException, URISyntaxException, InterruptedException { throws IOException, URISyntaxException, InterruptedException {
// Write a 3 Data blocs with same InstantTime (written in same batch) // Write a 3 Data blocs with same InstantTime (written in same batch)
@@ -1168,13 +1210,17 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withReverseReader(false) .withReverseReader(false)
.withBufferSize(bufferSize) .withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH) .withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build(); .build();
assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @MethodSource("testArguments")
public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(boolean readBlocksLazily) public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily)
throws IOException, URISyntaxException, InterruptedException { throws IOException, URISyntaxException, InterruptedException {
// Write a 3 Data blocs with same InstantTime (written in same batch) // Write a 3 Data blocs with same InstantTime (written in same batch)
@@ -1270,6 +1316,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withReverseReader(false) .withReverseReader(false)
.withBufferSize(bufferSize) .withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH) .withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build(); .build();
assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records"); assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
FileCreateUtils.deleteDeltaCommit(basePath, "100", fs); FileCreateUtils.deleteDeltaCommit(basePath, "100", fs);
@@ -1289,7 +1337,9 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
* *
*/ */
private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1, int numRecordsInLog2, private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1, int numRecordsInLog2,
boolean readBlocksLazily) { ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily) {
try { try {
// Write one Data block with same InstantTime (written in same batch) // Write one Data block with same InstantTime (written in same batch)
Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema()); Schema schema = HoodieAvroUtils.addMetadataFields(getSimpleSchema());
@@ -1340,6 +1390,8 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
.withReverseReader(false) .withReverseReader(false)
.withBufferSize(bufferSize) .withBufferSize(bufferSize)
.withSpillableMapBasePath(BASE_OUTPUT_PATH) .withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.build(); .build();
assertEquals(Math.max(numRecordsInLog1, numRecordsInLog2), scanner.getNumMergedRecordsInLog(), assertEquals(Math.max(numRecordsInLog1, numRecordsInLog2), scanner.getNumMergedRecordsInLog(),
@@ -1351,33 +1403,42 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @MethodSource("testArguments")
public void testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt(boolean readBlocksLazily) { public void testAvroLogRecordReaderWithFailedTaskInFirstStageAttempt(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily) {
/* /*
* FIRST_ATTEMPT_FAILED: * FIRST_ATTEMPT_FAILED:
* Original task from the stage attempt failed, but subsequent stage retry succeeded. * Original task from the stage attempt failed, but subsequent stage retry succeeded.
*/ */
testAvroLogRecordReaderMergingMultipleLogFiles(77, 100, readBlocksLazily); testAvroLogRecordReaderMergingMultipleLogFiles(77, 100,
diskMapType, isCompressionEnabled, readBlocksLazily);
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @MethodSource("testArguments")
public void testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt(boolean readBlocksLazily) { public void testAvroLogRecordReaderWithFailedTaskInSecondStageAttempt(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily) {
/* /*
* SECOND_ATTEMPT_FAILED: * SECOND_ATTEMPT_FAILED:
* Original task from stage attempt succeeded, but subsequent retry attempt failed. * Original task from stage attempt succeeded, but subsequent retry attempt failed.
*/ */
testAvroLogRecordReaderMergingMultipleLogFiles(100, 66, readBlocksLazily); testAvroLogRecordReaderMergingMultipleLogFiles(100, 66,
diskMapType, isCompressionEnabled, readBlocksLazily);
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @MethodSource("testArguments")
public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts(boolean readBlocksLazily) { public void testAvroLogRecordReaderTasksSucceededInBothStageAttempts(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean readBlocksLazily) {
/* /*
* BOTH_ATTEMPTS_SUCCEEDED: * BOTH_ATTEMPTS_SUCCEEDED:
* Original task from the stage attempt and duplicate task from the stage retry succeeded. * Original task from the stage attempt and duplicate task from the stage retry succeeded.
*/ */
testAvroLogRecordReaderMergingMultipleLogFiles(100, 100, readBlocksLazily); testAvroLogRecordReaderMergingMultipleLogFiles(100, 100,
diskMapType, isCompressionEnabled, readBlocksLazily);
} }
@ParameterizedTest @ParameterizedTest
@@ -1618,4 +1679,18 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness {
throw new RuntimeException("Unknown data block type " + dataBlockType); throw new RuntimeException("Unknown data block type " + dataBlockType);
} }
} }
private static Stream<Arguments> testArguments() {
// Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, Arg3: readBlocksLazily
return Stream.of(
arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, false),
arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, false),
arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, false),
arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, false),
arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, true),
arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, true),
arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, true),
arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true)
);
}
} }

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.common.table.view; package org.apache.hudi.common.table.view;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
/** /**
@@ -29,6 +30,7 @@ public class TestSpillableMapBasedFileSystemView extends TestHoodieTableFileSyst
protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) { protected SyncableFileSystemView getFileSystemView(HoodieTimeline timeline) {
return new SpillableMapBasedFileSystemView(metaClient, timeline, FileSystemViewStorageConfig.newBuilder() return new SpillableMapBasedFileSystemView(metaClient, timeline, FileSystemViewStorageConfig.newBuilder()
// pure disk base View // pure disk base View
.withStorageType(FileSystemViewStorageType.SPILLABLE_DISK).withMaxMemoryForView(0L).build()); .withStorageType(FileSystemViewStorageType.SPILLABLE_DISK).withMaxMemoryForView(0L).build(),
HoodieCommonConfig.newBuilder().build());
} }
} }

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.common.table.view; package org.apache.hudi.common.table.view;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -29,6 +30,7 @@ public class TestSpillableMapBasedIncrementalFSViewSync extends TestIncrementalF
@Override @Override
protected SyncableFileSystemView getFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline timeline) { protected SyncableFileSystemView getFileSystemView(HoodieTableMetaClient metaClient, HoodieTimeline timeline) {
return new SpillableMapBasedFileSystemView(metaClient, timeline, return new SpillableMapBasedFileSystemView(metaClient, timeline,
FileSystemViewStorageConfig.newBuilder().withMaxMemoryForView(0L).withIncrementalTimelineSync(true).build()); FileSystemViewStorageConfig.newBuilder().withMaxMemoryForView(0L).withIncrementalTimelineSync(true).build(),
HoodieCommonConfig.newBuilder().build());
} }
} }

View File

@@ -234,6 +234,8 @@ public class BootstrapFunction<I, O extends HoodieRecord>
.withBufferSize(this.writeConfig.getMaxDFSStreamBufferSize()) .withBufferSize(this.writeConfig.getMaxDFSStreamBufferSize())
.withMaxMemorySizeInBytes(this.writeConfig.getMaxMemoryPerPartitionMerge()) .withMaxMemorySizeInBytes(this.writeConfig.getMaxMemoryPerPartitionMerge())
.withSpillableMapBasePath(this.writeConfig.getSpillableMapBasePath()) .withSpillableMapBasePath(this.writeConfig.getSpillableMapBasePath())
.withDiskMapType(this.writeConfig.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(this.writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build(); .build();
} }

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.utils;
import org.apache.hudi.client.FlinkTaskContextSupplier; import org.apache.hudi.client.FlinkTaskContextSupplier;
import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
@@ -497,6 +498,8 @@ public class TestData {
.withBufferSize(16 * 1024 * 1024) .withBufferSize(16 * 1024 * 1024)
.withMaxMemorySizeInBytes(1024 * 1024L) .withMaxMemorySizeInBytes(1024 * 1024L)
.withSpillableMapBasePath("/tmp/") .withSpillableMapBasePath("/tmp/")
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.build(); .build();
} }

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.hadoop.realtime; package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -81,6 +82,9 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader
.withReverseReader(false) .withReverseReader(false)
.withBufferSize(jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE)) .withBufferSize(jobConf.getInt(HoodieRealtimeConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP, HoodieRealtimeConfig.DEFAULT_MAX_DFS_STREAM_BUFFER_SIZE))
.withSpillableMapBasePath(jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) .withSpillableMapBasePath(jobConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
.withDiskMapType(jobConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue()))
.withBitCaskDiskMapCompressionEnabled(jobConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
.build(); .build();
} }

View File

@@ -19,6 +19,7 @@
package org.apache.hudi.hadoop.realtime; package org.apache.hudi.hadoop.realtime;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieLogFile;
@@ -29,6 +30,7 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.FileCreateUtils; import org.apache.hudi.common.testutils.FileCreateUtils;
import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.testutils.InputFormatTestUtil; import org.apache.hudi.hadoop.testutils.InputFormatTestUtil;
@@ -58,7 +60,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource; import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@@ -68,11 +71,13 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.REALTIME_SKIP_MERGE_PROP; import static org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader.REALTIME_SKIP_MERGE_PROP;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.params.provider.Arguments.arguments;
public class TestHoodieRealtimeRecordReader { public class TestHoodieRealtimeRecordReader {
@@ -115,8 +120,10 @@ public class TestHoodieRealtimeRecordReader {
} }
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = {true, false}) @MethodSource("testArguments")
public void testReader(boolean partitioned) throws Exception { public void testReader(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled,
boolean partitioned) throws Exception {
// initial commit // initial commit
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema()); Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ);
@@ -177,6 +184,9 @@ public class TestHoodieRealtimeRecordReader {
List<Schema.Field> fields = schema.getFields(); List<Schema.Field> fields = schema.getFields();
setHiveColumnNameProps(fields, jobConf, partitioned); setHiveColumnNameProps(fields, jobConf, partitioned);
jobConf.setEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), diskMapType);
jobConf.setBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), isCompressionEnabled);
// validate record reader compaction // validate record reader compaction
HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);
@@ -287,8 +297,10 @@ public class TestHoodieRealtimeRecordReader {
recordReader.close(); recordReader.close();
} }
@Test @ParameterizedTest
public void testReaderWithNestedAndComplexSchema() throws Exception { @MethodSource("testArguments")
public void testReaderWithNestedAndComplexSchema(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled) throws Exception {
// initial commit // initial commit
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getComplexEvolvedSchema()); Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getComplexEvolvedSchema());
HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ); HoodieTestUtils.init(hadoopConf, basePath.toString(), HoodieTableType.MERGE_ON_READ);
@@ -323,6 +335,9 @@ public class TestHoodieRealtimeRecordReader {
List<Schema.Field> fields = schema.getFields(); List<Schema.Field> fields = schema.getFields();
setHiveColumnNameProps(fields, jobConf, true); setHiveColumnNameProps(fields, jobConf, true);
jobConf.setEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), diskMapType);
jobConf.setBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), isCompressionEnabled);
// validate record reader compaction // validate record reader compaction
HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader); HoodieRealtimeRecordReader recordReader = new HoodieRealtimeRecordReader(split, jobConf, reader);
@@ -416,8 +431,10 @@ public class TestHoodieRealtimeRecordReader {
} }
} }
@Test @ParameterizedTest
public void testSchemaEvolutionAndRollbackBlockInLastLogFile() throws Exception { @MethodSource("testArguments")
public void testSchemaEvolutionAndRollbackBlockInLastLogFile(ExternalSpillableMap.DiskMapType diskMapType,
boolean isCompressionEnabled) throws Exception {
// initial commit // initial commit
List<String> logFilePaths = new ArrayList<>(); List<String> logFilePaths = new ArrayList<>();
Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema()); Schema schema = HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getSimpleSchema());
@@ -468,6 +485,9 @@ public class TestHoodieRealtimeRecordReader {
// Try to read all the fields passed by the new schema // Try to read all the fields passed by the new schema
setHiveColumnNameProps(fields, jobConf, true); setHiveColumnNameProps(fields, jobConf, true);
jobConf.setEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key(), diskMapType);
jobConf.setBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), isCompressionEnabled);
HoodieRealtimeRecordReader recordReader; HoodieRealtimeRecordReader recordReader;
try { try {
// validate record reader compaction // validate record reader compaction
@@ -488,4 +508,18 @@ public class TestHoodieRealtimeRecordReader {
// keep reading // keep reading
} }
} }
private static Stream<Arguments> testArguments() {
// Arg1: ExternalSpillableMap Type, Arg2: isDiskMapCompressionEnabled, Arg3: partitioned
return Stream.of(
arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, false),
arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, false),
arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, false),
arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, false),
arguments(ExternalSpillableMap.DiskMapType.BITCASK, false, true),
arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, false, true),
arguments(ExternalSpillableMap.DiskMapType.BITCASK, true, true),
arguments(ExternalSpillableMap.DiskMapType.ROCKS_DB, true, true)
);
}
} }

View File

@@ -20,6 +20,7 @@ package org.apache.hudi.integ.testsuite.reader;
import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.FileSlice;
@@ -282,6 +283,8 @@ public class DFSHoodieDatasetInputReader extends DFSDeltaInputReader {
.withReverseReader(false) .withReverseReader(false)
.withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP.defaultValue()) .withBufferSize(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE_PROP.defaultValue())
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH_PROP.defaultValue()) .withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH_PROP.defaultValue())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.build(); .build();
// readAvro log files // readAvro log files
Iterable<HoodieRecord<? extends HoodieRecordPayload>> iterable = () -> scanner.iterator(); Iterable<HoodieRecord<? extends HoodieRecordPayload>> iterable = () -> scanner.iterator();

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.timeline.service; package org.apache.hudi.timeline.service;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.engine.HoodieLocalEngineContext;
@@ -175,25 +176,26 @@ public class TimelineService {
HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(conf.get()); HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(conf.get());
// Just use defaults for now // Just use defaults for now
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().build(); HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().build();
HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().build();
switch (config.viewStorageType) { switch (config.viewStorageType) {
case MEMORY: case MEMORY:
FileSystemViewStorageConfig.Builder inMemConfBuilder = FileSystemViewStorageConfig.newBuilder(); FileSystemViewStorageConfig.Builder inMemConfBuilder = FileSystemViewStorageConfig.newBuilder();
inMemConfBuilder.withStorageType(FileSystemViewStorageType.MEMORY); inMemConfBuilder.withStorageType(FileSystemViewStorageType.MEMORY);
return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, inMemConfBuilder.build()); return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, inMemConfBuilder.build(), commonConfig);
case SPILLABLE_DISK: { case SPILLABLE_DISK: {
FileSystemViewStorageConfig.Builder spillableConfBuilder = FileSystemViewStorageConfig.newBuilder(); FileSystemViewStorageConfig.Builder spillableConfBuilder = FileSystemViewStorageConfig.newBuilder();
spillableConfBuilder.withStorageType(FileSystemViewStorageType.SPILLABLE_DISK) spillableConfBuilder.withStorageType(FileSystemViewStorageType.SPILLABLE_DISK)
.withBaseStoreDir(config.baseStorePathForFileGroups) .withBaseStoreDir(config.baseStorePathForFileGroups)
.withMaxMemoryForView(config.maxViewMemPerTableInMB * 1024 * 1024L) .withMaxMemoryForView(config.maxViewMemPerTableInMB * 1024 * 1024L)
.withMemFractionForPendingCompaction(config.memFractionForCompactionPerTable); .withMemFractionForPendingCompaction(config.memFractionForCompactionPerTable);
return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, spillableConfBuilder.build()); return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, spillableConfBuilder.build(), commonConfig);
} }
case EMBEDDED_KV_STORE: { case EMBEDDED_KV_STORE: {
FileSystemViewStorageConfig.Builder rocksDBConfBuilder = FileSystemViewStorageConfig.newBuilder(); FileSystemViewStorageConfig.Builder rocksDBConfBuilder = FileSystemViewStorageConfig.newBuilder();
rocksDBConfBuilder.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE) rocksDBConfBuilder.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE)
.withRocksDBPath(config.rocksDBPath); .withRocksDBPath(config.rocksDBPath);
return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, rocksDBConfBuilder.build()); return FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, rocksDBConfBuilder.build(), commonConfig);
} }
default: default:
throw new IllegalArgumentException("Invalid view manager storage type :" + config.viewStorageType); throw new IllegalArgumentException("Invalid view manager storage type :" + config.viewStorageType);

View File

@@ -18,6 +18,7 @@
package org.apache.hudi.timeline.service.functional; package org.apache.hudi.timeline.service.functional;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -46,11 +47,12 @@ public class TestRemoteHoodieTableFileSystemView extends TestHoodieTableFileSyst
FileSystemViewStorageConfig sConf = FileSystemViewStorageConfig sConf =
FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.SPILLABLE_DISK).build(); FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.SPILLABLE_DISK).build();
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().build(); HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().build();
HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().build();
HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf()); HoodieLocalEngineContext localEngineContext = new HoodieLocalEngineContext(metaClient.getHadoopConf());
try { try {
server = new TimelineService(0, server = new TimelineService(0,
FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, sConf)); FileSystemViewManager.createViewManager(localEngineContext, metadataConfig, sConf, commonConfig));
server.startService(); server.startService();
} catch (Exception ex) { } catch (Exception ex) {
throw new RuntimeException(ex); throw new RuntimeException(ex);