From 3b36cb805d066a3112e3a355ef502dbe4b2c1824 Mon Sep 17 00:00:00 2001 From: Prashant Wason Date: Mon, 15 Mar 2021 13:42:57 -0700 Subject: [PATCH] [HUDI-1552] Improve performance of key lookups from base file in Metadata Table. (#2494) * [HUDI-1552] Improve performance of key lookups from base file in Metadata Table. 1. Cache the KeyScanner across lookups so that the HFile index does not have to be read for each lookup. 2. Enable block caching in KeyScanner. 3. Move the lock to a limited scope of the code to reduce lock contention. 4. Removed reuse configuration * Properly close the readers, when metadata table is accessed from executors - Passing a reuse boolean into HoodieBackedTableMetadata - Preserve the fast return behavior when reusing and opening from multiple threads (no contention) - Handle concurrent close() and open readers, for reuse=false, by always synchronizing Co-authored-by: Vinoth Chandar --- .../hudi/cli/commands/MetadataCommand.java | 6 +- .../org/apache/hudi/table/HoodieTable.java | 3 +- .../metadata/TestHoodieBackedMetadata.java | 1 - .../common/config/HoodieMetadataConfig.java | 15 -- .../table/view/FileSystemViewManager.java | 2 +- .../hudi/io/storage/HoodieHFileReader.java | 35 +++-- .../metadata/HoodieBackedTableMetadata.java | 144 +++++++++++------- .../hudi/metadata/HoodieTableMetadata.java | 7 +- 8 files changed, 126 insertions(+), 87 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java index 5b005540a..ac898a8e6 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java @@ -24,6 +24,7 @@ import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -147,7 +148,8 @@ public class MetadataCommand implements CommandMarker { public String stats() throws IOException { HoodieCLI.getTableMetaClient(); HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build(); - HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(HoodieCLI.conf, config, HoodieCLI.basePath, "/tmp"); + HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(HoodieCLI.conf), + config, HoodieCLI.basePath, "/tmp"); Map stats = metadata.stats(); StringBuffer out = new StringBuffer("\n"); @@ -197,7 +199,7 @@ public class MetadataCommand implements CommandMarker { final String partition) throws IOException { HoodieCLI.getTableMetaClient(); HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build(); - HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(HoodieCLI.conf, config, HoodieCLI.basePath, "/tmp"); + HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp"); StringBuffer out = new StringBuffer("\n"); if (!metaReader.enabled()) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 74ffdfc65..4e72f1871 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -107,9 +107,8 @@ public abstract class HoodieTable implem this.hadoopConfiguration = context.getHadoopConf(); this.context = context; - // disable reuse of resources, given there is no close() called on the executors ultimately HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(config.getMetadataConfig().getProps()) - .enableReuse(false).build(); + .build(); this.metadata = HoodieTableMetadata.create(context, metadataConfig, config.getBasePath(), FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java index 5809ab210..52308593f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java @@ -973,7 +973,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder() .enable(useFileListingMetadata) - .enableReuse(false) .enableMetrics(enableMetrics) .enableFallback(false).build()) .withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 6346a65fb..1eff82f0c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -67,10 +67,6 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig { public static final String CLEANER_COMMITS_RETAINED_PROP = METADATA_PREFIX + ".cleaner.commits.retained"; public static final int DEFAULT_CLEANER_COMMITS_RETAINED = 3; - // Controls whether or no the base file open/log merges are reused per API call - public static final String ENABLE_REUSE_PROP = METADATA_PREFIX + ".reuse.enable"; - public static final String DEFAULT_ENABLE_REUSE = "true"; - // Controls whether or not, upon failure to fetch from metadata table, should fallback to listing. public static final String ENABLE_FALLBACK_PROP = METADATA_PREFIX + ".fallback.enable"; public static final String DEFAULT_ENABLE_FALLBACK = "true"; @@ -105,10 +101,6 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig { return Boolean.parseBoolean(props.getProperty(METADATA_ENABLE_PROP)); } - public boolean enableReuse() { - return Boolean.parseBoolean(props.getProperty(ENABLE_REUSE_PROP)); - } - public boolean enableFallback() { return Boolean.parseBoolean(props.getProperty(ENABLE_FALLBACK_PROP)); } @@ -151,11 +143,6 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig { return this; } - public Builder enableReuse(boolean reuse) { - props.setProperty(ENABLE_REUSE_PROP, String.valueOf(reuse)); - return this; - } - public Builder enableFallback(boolean fallback) { props.setProperty(ENABLE_FALLBACK_PROP, String.valueOf(fallback)); return this; @@ -233,8 +220,6 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig { HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING); setDefaultOnCondition(props, !props.containsKey(ENABLE_FALLBACK_PROP), ENABLE_FALLBACK_PROP, DEFAULT_ENABLE_FALLBACK); - setDefaultOnCondition(props, !props.containsKey(ENABLE_REUSE_PROP), ENABLE_REUSE_PROP, - DEFAULT_ENABLE_REUSE); setDefaultOnCondition(props, !props.containsKey(DIRECTORY_FILTER_REGEX), DIRECTORY_FILTER_REGEX, DEFAULT_DIRECTORY_FILTER_REGEX); return config; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java index 0c218ee44..f89c2a670 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java @@ -214,7 +214,7 @@ public class FileSystemViewManager { final FileSystemViewStorageConfig config, final String basePath) { return createViewManager(context, metadataConfig, config, - () -> HoodieTableMetadata.create(context, metadataConfig, basePath, config.getSpillableDir())); + () -> HoodieTableMetadata.create(context, metadataConfig, basePath, config.getSpillableDir(), true)); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java index 1d7692953..b954e57e7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java @@ -56,6 +56,9 @@ public class HoodieHFileReader implements HoodieFileRea private Configuration conf; private HFile.Reader reader; private Schema schema; + // Scanner used to read individual keys. This is cached to prevent the overhead of opening the scanner for each + // key retrieval. + private HFileScanner keyScanner; public static final String KEY_SCHEMA = "schema"; public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter"; @@ -140,7 +143,7 @@ public class HoodieHFileReader implements HoodieFileRea public List> readAllRecords(Schema writerSchema, Schema readerSchema) throws IOException { List> recordList = new LinkedList<>(); try { - HFileScanner scanner = reader.getScanner(false, false); + final HFileScanner scanner = reader.getScanner(false, false); if (scanner.seekTo()) { do { Cell c = scanner.getKeyValue(); @@ -174,7 +177,7 @@ public class HoodieHFileReader implements HoodieFileRea // To handle when hasNext() is called multiple times for idempotency and/or the first time if (this.next == null && !this.eof) { if (!scanner.isSeeked() && scanner.seekTo()) { - this.next = (R)getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema); + this.next = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema); } } return this.next != null; @@ -194,7 +197,7 @@ public class HoodieHFileReader implements HoodieFileRea } R retVal = this.next; if (scanner.next()) { - this.next = (R)getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema); + this.next = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema); } else { this.next = null; this.eof = true; @@ -209,12 +212,23 @@ public class HoodieHFileReader implements HoodieFileRea @Override public Option getRecordByKey(String key, Schema readerSchema) throws IOException { - HFileScanner scanner = reader.getScanner(false, true); + byte[] value = null; KeyValue kv = new KeyValue(key.getBytes(), null, null, null); - if (scanner.seekTo(kv) == 0) { - Cell c = scanner.getKeyValue(); - byte[] keyBytes = Arrays.copyOfRange(c.getRowArray(), c.getRowOffset(), c.getRowOffset() + c.getRowLength()); - R record = getRecordFromCell(c, getSchema(), readerSchema); + + synchronized (this) { + if (keyScanner == null) { + keyScanner = reader.getScanner(true, true); + } + + if (keyScanner.seekTo(kv) == 0) { + Cell c = keyScanner.getKeyValue(); + // Extract the byte value before releasing the lock since we cannot hold on to the returned cell afterwards + value = Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength()); + } + } + + if (value != null) { + R record = (R)HoodieAvroUtils.bytesToAvro(value, getSchema(), readerSchema); return Option.of(record); } @@ -232,12 +246,13 @@ public class HoodieHFileReader implements HoodieFileRea } @Override - public void close() { + public synchronized void close() { try { reader.close(); reader = null; + keyScanner = null; } catch (IOException e) { - e.printStackTrace(); + throw new HoodieIOException("Error closing the hfile reader", e); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 32856065a..eeede6f41 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -23,7 +23,6 @@ import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieLogFile; @@ -46,7 +45,6 @@ import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -73,19 +71,23 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { // Metadata table's timeline and metaclient private HoodieTableMetaClient metaClient; private List latestFileSystemMetadataSlices; + // should we reuse the open file handles, across calls + private final boolean reuse; + // Readers for the base and log file which store the metadata private transient HoodieFileReader baseFileReader; private transient HoodieMetadataMergedLogRecordScanner logRecordScanner; - public HoodieBackedTableMetadata(Configuration conf, HoodieMetadataConfig metadataConfig, + public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath, String spillableMapDirectory) { - this(new HoodieLocalEngineContext(conf), metadataConfig, datasetBasePath, spillableMapDirectory); + this(engineContext, metadataConfig, datasetBasePath, spillableMapDirectory, false); } public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, - String datasetBasePath, String spillableMapDirectory) { + String datasetBasePath, String spillableMapDirectory, boolean reuse) { super(engineContext, metadataConfig, datasetBasePath, spillableMapDirectory); + this.reuse = reuse; initIfNeeded(); } @@ -112,13 +114,12 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { @Override protected Option> getRecordByKeyFromMetadata(String key) { + + openReadersIfNeededOrThrow(); try { List timings = new ArrayList<>(); HoodieTimer timer = new HoodieTimer().startTimer(); - openFileSliceIfNeeded(); - timings.add(timer.endTimer()); - timer.startTimer(); // Retrieve record from base file HoodieRecord hoodieRecord = null; if (baseFileReader != null) { @@ -147,82 +148,115 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { } } timings.add(timer.endTimer()); - LOG.info(String.format("Metadata read for key %s took [open, baseFileRead, logMerge] %s ms", key, timings)); + LOG.info(String.format("Metadata read for key %s took [baseFileRead, logMerge] %s ms", key, timings)); return Option.ofNullable(hoodieRecord); } catch (IOException ioe) { throw new HoodieIOException("Error merging records from metadata table for key :" + key, ioe); } finally { - closeIfNeeded(); + if (!reuse) { + closeOrThrow(); + } + } + } + + private void openReadersIfNeededOrThrow() { + try { + openReadersIfNeeded(); + } catch (IOException e) { + throw new HoodieIOException("Error opening readers to the Metadata Table: ", e); } } /** - * Open readers to the base and log files. + * Returns a new pair of readers to the base and log files. */ - private synchronized void openFileSliceIfNeeded() throws IOException { - if (metadataConfig.enableReuse() && baseFileReader != null) { - // we will reuse what's open. + private void openReadersIfNeeded() throws IOException { + if (reuse && (baseFileReader != null || logRecordScanner != null)) { + // quickly exit out without synchronizing if reusing and readers are already open return; } - // Metadata is in sync till the latest completed instant on the dataset - HoodieTimer timer = new HoodieTimer().startTimer(); - String latestInstantTime = getLatestDatasetInstantTime(); - ValidationUtils.checkArgument(latestFileSystemMetadataSlices.size() == 1, "must be at-least one validata metadata file slice"); + // we always force synchronization, if reuse=false, to handle concurrent close() calls as well. + synchronized (this) { + if (baseFileReader != null || logRecordScanner != null) { + return; + } - // If the base file is present then create a reader - Option basefile = latestFileSystemMetadataSlices.get(0).getBaseFile(); - if (basefile.isPresent()) { - String basefilePath = basefile.get().getPath(); - baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath)); - LOG.info("Opened metadata base file from " + basefilePath + " at instant " + basefile.get().getCommitTime()); + final long baseFileOpenMs; + final long logScannerOpenMs; + + // Metadata is in sync till the latest completed instant on the dataset + HoodieTimer timer = new HoodieTimer().startTimer(); + String latestInstantTime = getLatestDatasetInstantTime(); + ValidationUtils.checkArgument(latestFileSystemMetadataSlices.size() == 1, "must be at-least one valid metadata file slice"); + + // If the base file is present then create a reader + Option basefile = latestFileSystemMetadataSlices.get(0).getBaseFile(); + if (basefile.isPresent()) { + String basefilePath = basefile.get().getPath(); + baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath)); + baseFileOpenMs = timer.endTimer(); + LOG.info(String.format("Opened metadata base file from %s at instant %s in %d ms", basefilePath, + basefile.get().getCommitTime(), baseFileOpenMs)); + } else { + baseFileOpenMs = 0; + timer.endTimer(); + } + + // Open the log record scanner using the log files from the latest file slice + timer.startTimer(); + List logFilePaths = latestFileSystemMetadataSlices.get(0).getLogFiles() + .sorted(HoodieLogFile.getLogFileComparator()) + .map(o -> o.getPath().toString()) + .collect(Collectors.toList()); + Option lastInstant = metaClient.getActiveTimeline().filterCompletedInstants().lastInstant(); + String latestMetaInstantTimestamp = lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); + + // Load the schema + Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema()); + logRecordScanner = new HoodieMetadataMergedLogRecordScanner(metaClient.getFs(), metadataBasePath, logFilePaths, + schema, latestMetaInstantTimestamp, MAX_MEMORY_SIZE_IN_BYTES, BUFFER_SIZE, spillableMapDirectory, null); + + logScannerOpenMs = timer.endTimer(); + LOG.info(String.format("Opened metadata log files from %s at instant (dataset instant=%s, metadata instant=%s) in %d ms", + logFilePaths, latestInstantTime, latestMetaInstantTimestamp, logScannerOpenMs)); + + metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs + logScannerOpenMs)); } - - // Open the log record scanner using the log files from the latest file slice - List logFilePaths = latestFileSystemMetadataSlices.get(0).getLogFiles() - .sorted(HoodieLogFile.getLogFileComparator()) - .map(o -> o.getPath().toString()) - .collect(Collectors.toList()); - Option lastInstant = metaClient.getActiveTimeline().filterCompletedInstants().lastInstant(); - String latestMetaInstantTimestamp = lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); - - // Load the schema - Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema()); - logRecordScanner = new HoodieMetadataMergedLogRecordScanner(metaClient.getFs(), metadataBasePath, - logFilePaths, schema, latestMetaInstantTimestamp, MAX_MEMORY_SIZE_IN_BYTES, BUFFER_SIZE, - spillableMapDirectory, null); - - LOG.info("Opened metadata log files from " + logFilePaths + " at instant " + latestInstantTime - + "(dataset instant=" + latestInstantTime + ", metadata instant=" + latestMetaInstantTimestamp + ")"); - - metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, timer.endTimer())); } - private void closeIfNeeded() { + private void close(HoodieFileReader localFileReader, HoodieMetadataMergedLogRecordScanner localLogScanner) { try { - if (!metadataConfig.enableReuse()) { - close(); + if (localFileReader != null) { + localFileReader.close(); + } + if (localLogScanner != null) { + localLogScanner.close(); } } catch (Exception e) { throw new HoodieException("Error closing resources during metadata table merge", e); } } + private void closeOrThrow() { + try { + close(); + } catch (Exception e) { + throw new HoodieException("Error closing metadata table readers", e); + } + } + @Override - public void close() throws Exception { - if (baseFileReader != null) { - baseFileReader.close(); - baseFileReader = null; - } - if (logRecordScanner != null) { - logRecordScanner.close(); - logRecordScanner = null; - } + public synchronized void close() throws Exception { + close(baseFileReader, logRecordScanner); + baseFileReader = null; + logRecordScanner = null; } /** * Return an ordered list of instants which have not been synced to the Metadata Table. */ + @Override protected List findInstantsToSync() { initIfNeeded(); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java index 56c3cd2cc..9e6222a87 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java @@ -72,8 +72,13 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable { static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath, String spillableMapPath) { + return create(engineContext, metadataConfig, datasetBasePath, spillableMapPath, false); + } + + static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath, + String spillableMapPath, boolean reuse) { if (metadataConfig.useFileListingMetadata()) { - return new HoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath); + return new HoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath, reuse); } else { return new FileSystemBackedTableMetadata(engineContext, new SerializableConfiguration(engineContext.getHadoopConf()), datasetBasePath, metadataConfig.shouldAssumeDatePartitioning());