diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java index 5b005540a..ac898a8e6 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/MetadataCommand.java @@ -24,6 +24,7 @@ import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.utils.SparkUtil; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -147,7 +148,8 @@ public class MetadataCommand implements CommandMarker { public String stats() throws IOException { HoodieCLI.getTableMetaClient(); HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build(); - HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(HoodieCLI.conf, config, HoodieCLI.basePath, "/tmp"); + HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(HoodieCLI.conf), + config, HoodieCLI.basePath, "/tmp"); Map stats = metadata.stats(); StringBuffer out = new StringBuffer("\n"); @@ -197,7 +199,7 @@ public class MetadataCommand implements CommandMarker { final String partition) throws IOException { HoodieCLI.getTableMetaClient(); HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build(); - HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(HoodieCLI.conf, config, HoodieCLI.basePath, "/tmp"); + HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp"); StringBuffer out = new StringBuffer("\n"); if (!metaReader.enabled()) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 74ffdfc65..4e72f1871 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -107,9 +107,8 @@ public abstract class HoodieTable implem this.hadoopConfiguration = context.getHadoopConf(); this.context = context; - // disable reuse of resources, given there is no close() called on the executors ultimately HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(config.getMetadataConfig().getProps()) - .enableReuse(false).build(); + .build(); this.metadata = HoodieTableMetadata.create(context, metadataConfig, config.getBasePath(), FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java index 5809ab210..52308593f 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/metadata/TestHoodieBackedMetadata.java @@ -973,7 +973,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness { .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build()) .withMetadataConfig(HoodieMetadataConfig.newBuilder() .enable(useFileListingMetadata) - .enableReuse(false) .enableMetrics(enableMetrics) .enableFallback(false).build()) .withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 6346a65fb..1eff82f0c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -67,10 +67,6 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig { public static final String CLEANER_COMMITS_RETAINED_PROP = METADATA_PREFIX + ".cleaner.commits.retained"; public static final int DEFAULT_CLEANER_COMMITS_RETAINED = 3; - // Controls whether or no the base file open/log merges are reused per API call - public static final String ENABLE_REUSE_PROP = METADATA_PREFIX + ".reuse.enable"; - public static final String DEFAULT_ENABLE_REUSE = "true"; - // Controls whether or not, upon failure to fetch from metadata table, should fallback to listing. public static final String ENABLE_FALLBACK_PROP = METADATA_PREFIX + ".fallback.enable"; public static final String DEFAULT_ENABLE_FALLBACK = "true"; @@ -105,10 +101,6 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig { return Boolean.parseBoolean(props.getProperty(METADATA_ENABLE_PROP)); } - public boolean enableReuse() { - return Boolean.parseBoolean(props.getProperty(ENABLE_REUSE_PROP)); - } - public boolean enableFallback() { return Boolean.parseBoolean(props.getProperty(ENABLE_FALLBACK_PROP)); } @@ -151,11 +143,6 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig { return this; } - public Builder enableReuse(boolean reuse) { - props.setProperty(ENABLE_REUSE_PROP, String.valueOf(reuse)); - return this; - } - public Builder enableFallback(boolean fallback) { props.setProperty(ENABLE_FALLBACK_PROP, String.valueOf(fallback)); return this; @@ -233,8 +220,6 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig { HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING); setDefaultOnCondition(props, !props.containsKey(ENABLE_FALLBACK_PROP), ENABLE_FALLBACK_PROP, DEFAULT_ENABLE_FALLBACK); - setDefaultOnCondition(props, !props.containsKey(ENABLE_REUSE_PROP), ENABLE_REUSE_PROP, - DEFAULT_ENABLE_REUSE); setDefaultOnCondition(props, !props.containsKey(DIRECTORY_FILTER_REGEX), DIRECTORY_FILTER_REGEX, DEFAULT_DIRECTORY_FILTER_REGEX); return config; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java index 0c218ee44..f89c2a670 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java @@ -214,7 +214,7 @@ public class FileSystemViewManager { final FileSystemViewStorageConfig config, final String basePath) { return createViewManager(context, metadataConfig, config, - () -> HoodieTableMetadata.create(context, metadataConfig, basePath, config.getSpillableDir())); + () -> HoodieTableMetadata.create(context, metadataConfig, basePath, config.getSpillableDir(), true)); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java index 1d7692953..b954e57e7 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java @@ -56,6 +56,9 @@ public class HoodieHFileReader implements HoodieFileRea private Configuration conf; private HFile.Reader reader; private Schema schema; + // Scanner used to read individual keys. This is cached to prevent the overhead of opening the scanner for each + // key retrieval. + private HFileScanner keyScanner; public static final String KEY_SCHEMA = "schema"; public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter"; @@ -140,7 +143,7 @@ public class HoodieHFileReader implements HoodieFileRea public List> readAllRecords(Schema writerSchema, Schema readerSchema) throws IOException { List> recordList = new LinkedList<>(); try { - HFileScanner scanner = reader.getScanner(false, false); + final HFileScanner scanner = reader.getScanner(false, false); if (scanner.seekTo()) { do { Cell c = scanner.getKeyValue(); @@ -174,7 +177,7 @@ public class HoodieHFileReader implements HoodieFileRea // To handle when hasNext() is called multiple times for idempotency and/or the first time if (this.next == null && !this.eof) { if (!scanner.isSeeked() && scanner.seekTo()) { - this.next = (R)getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema); + this.next = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema); } } return this.next != null; @@ -194,7 +197,7 @@ public class HoodieHFileReader implements HoodieFileRea } R retVal = this.next; if (scanner.next()) { - this.next = (R)getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema); + this.next = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema); } else { this.next = null; this.eof = true; @@ -209,12 +212,23 @@ public class HoodieHFileReader implements HoodieFileRea @Override public Option getRecordByKey(String key, Schema readerSchema) throws IOException { - HFileScanner scanner = reader.getScanner(false, true); + byte[] value = null; KeyValue kv = new KeyValue(key.getBytes(), null, null, null); - if (scanner.seekTo(kv) == 0) { - Cell c = scanner.getKeyValue(); - byte[] keyBytes = Arrays.copyOfRange(c.getRowArray(), c.getRowOffset(), c.getRowOffset() + c.getRowLength()); - R record = getRecordFromCell(c, getSchema(), readerSchema); + + synchronized (this) { + if (keyScanner == null) { + keyScanner = reader.getScanner(true, true); + } + + if (keyScanner.seekTo(kv) == 0) { + Cell c = keyScanner.getKeyValue(); + // Extract the byte value before releasing the lock since we cannot hold on to the returned cell afterwards + value = Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength()); + } + } + + if (value != null) { + R record = (R)HoodieAvroUtils.bytesToAvro(value, getSchema(), readerSchema); return Option.of(record); } @@ -232,12 +246,13 @@ public class HoodieHFileReader implements HoodieFileRea } @Override - public void close() { + public synchronized void close() { try { reader.close(); reader = null; + keyScanner = null; } catch (IOException e) { - e.printStackTrace(); + throw new HoodieIOException("Error closing the hfile reader", e); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 32856065a..eeede6f41 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -23,7 +23,6 @@ import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.model.FileSlice; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieLogFile; @@ -46,7 +45,6 @@ import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -73,19 +71,23 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { // Metadata table's timeline and metaclient private HoodieTableMetaClient metaClient; private List latestFileSystemMetadataSlices; + // should we reuse the open file handles, across calls + private final boolean reuse; + // Readers for the base and log file which store the metadata private transient HoodieFileReader baseFileReader; private transient HoodieMetadataMergedLogRecordScanner logRecordScanner; - public HoodieBackedTableMetadata(Configuration conf, HoodieMetadataConfig metadataConfig, + public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath, String spillableMapDirectory) { - this(new HoodieLocalEngineContext(conf), metadataConfig, datasetBasePath, spillableMapDirectory); + this(engineContext, metadataConfig, datasetBasePath, spillableMapDirectory, false); } public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, - String datasetBasePath, String spillableMapDirectory) { + String datasetBasePath, String spillableMapDirectory, boolean reuse) { super(engineContext, metadataConfig, datasetBasePath, spillableMapDirectory); + this.reuse = reuse; initIfNeeded(); } @@ -112,13 +114,12 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { @Override protected Option> getRecordByKeyFromMetadata(String key) { + + openReadersIfNeededOrThrow(); try { List timings = new ArrayList<>(); HoodieTimer timer = new HoodieTimer().startTimer(); - openFileSliceIfNeeded(); - timings.add(timer.endTimer()); - timer.startTimer(); // Retrieve record from base file HoodieRecord hoodieRecord = null; if (baseFileReader != null) { @@ -147,82 +148,115 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { } } timings.add(timer.endTimer()); - LOG.info(String.format("Metadata read for key %s took [open, baseFileRead, logMerge] %s ms", key, timings)); + LOG.info(String.format("Metadata read for key %s took [baseFileRead, logMerge] %s ms", key, timings)); return Option.ofNullable(hoodieRecord); } catch (IOException ioe) { throw new HoodieIOException("Error merging records from metadata table for key :" + key, ioe); } finally { - closeIfNeeded(); + if (!reuse) { + closeOrThrow(); + } + } + } + + private void openReadersIfNeededOrThrow() { + try { + openReadersIfNeeded(); + } catch (IOException e) { + throw new HoodieIOException("Error opening readers to the Metadata Table: ", e); } } /** - * Open readers to the base and log files. + * Returns a new pair of readers to the base and log files. */ - private synchronized void openFileSliceIfNeeded() throws IOException { - if (metadataConfig.enableReuse() && baseFileReader != null) { - // we will reuse what's open. + private void openReadersIfNeeded() throws IOException { + if (reuse && (baseFileReader != null || logRecordScanner != null)) { + // quickly exit out without synchronizing if reusing and readers are already open return; } - // Metadata is in sync till the latest completed instant on the dataset - HoodieTimer timer = new HoodieTimer().startTimer(); - String latestInstantTime = getLatestDatasetInstantTime(); - ValidationUtils.checkArgument(latestFileSystemMetadataSlices.size() == 1, "must be at-least one validata metadata file slice"); + // we always force synchronization, if reuse=false, to handle concurrent close() calls as well. + synchronized (this) { + if (baseFileReader != null || logRecordScanner != null) { + return; + } - // If the base file is present then create a reader - Option basefile = latestFileSystemMetadataSlices.get(0).getBaseFile(); - if (basefile.isPresent()) { - String basefilePath = basefile.get().getPath(); - baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath)); - LOG.info("Opened metadata base file from " + basefilePath + " at instant " + basefile.get().getCommitTime()); + final long baseFileOpenMs; + final long logScannerOpenMs; + + // Metadata is in sync till the latest completed instant on the dataset + HoodieTimer timer = new HoodieTimer().startTimer(); + String latestInstantTime = getLatestDatasetInstantTime(); + ValidationUtils.checkArgument(latestFileSystemMetadataSlices.size() == 1, "must be at-least one valid metadata file slice"); + + // If the base file is present then create a reader + Option basefile = latestFileSystemMetadataSlices.get(0).getBaseFile(); + if (basefile.isPresent()) { + String basefilePath = basefile.get().getPath(); + baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath)); + baseFileOpenMs = timer.endTimer(); + LOG.info(String.format("Opened metadata base file from %s at instant %s in %d ms", basefilePath, + basefile.get().getCommitTime(), baseFileOpenMs)); + } else { + baseFileOpenMs = 0; + timer.endTimer(); + } + + // Open the log record scanner using the log files from the latest file slice + timer.startTimer(); + List logFilePaths = latestFileSystemMetadataSlices.get(0).getLogFiles() + .sorted(HoodieLogFile.getLogFileComparator()) + .map(o -> o.getPath().toString()) + .collect(Collectors.toList()); + Option lastInstant = metaClient.getActiveTimeline().filterCompletedInstants().lastInstant(); + String latestMetaInstantTimestamp = lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); + + // Load the schema + Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema()); + logRecordScanner = new HoodieMetadataMergedLogRecordScanner(metaClient.getFs(), metadataBasePath, logFilePaths, + schema, latestMetaInstantTimestamp, MAX_MEMORY_SIZE_IN_BYTES, BUFFER_SIZE, spillableMapDirectory, null); + + logScannerOpenMs = timer.endTimer(); + LOG.info(String.format("Opened metadata log files from %s at instant (dataset instant=%s, metadata instant=%s) in %d ms", + logFilePaths, latestInstantTime, latestMetaInstantTimestamp, logScannerOpenMs)); + + metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs + logScannerOpenMs)); } - - // Open the log record scanner using the log files from the latest file slice - List logFilePaths = latestFileSystemMetadataSlices.get(0).getLogFiles() - .sorted(HoodieLogFile.getLogFileComparator()) - .map(o -> o.getPath().toString()) - .collect(Collectors.toList()); - Option lastInstant = metaClient.getActiveTimeline().filterCompletedInstants().lastInstant(); - String latestMetaInstantTimestamp = lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP); - - // Load the schema - Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema()); - logRecordScanner = new HoodieMetadataMergedLogRecordScanner(metaClient.getFs(), metadataBasePath, - logFilePaths, schema, latestMetaInstantTimestamp, MAX_MEMORY_SIZE_IN_BYTES, BUFFER_SIZE, - spillableMapDirectory, null); - - LOG.info("Opened metadata log files from " + logFilePaths + " at instant " + latestInstantTime - + "(dataset instant=" + latestInstantTime + ", metadata instant=" + latestMetaInstantTimestamp + ")"); - - metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, timer.endTimer())); } - private void closeIfNeeded() { + private void close(HoodieFileReader localFileReader, HoodieMetadataMergedLogRecordScanner localLogScanner) { try { - if (!metadataConfig.enableReuse()) { - close(); + if (localFileReader != null) { + localFileReader.close(); + } + if (localLogScanner != null) { + localLogScanner.close(); } } catch (Exception e) { throw new HoodieException("Error closing resources during metadata table merge", e); } } + private void closeOrThrow() { + try { + close(); + } catch (Exception e) { + throw new HoodieException("Error closing metadata table readers", e); + } + } + @Override - public void close() throws Exception { - if (baseFileReader != null) { - baseFileReader.close(); - baseFileReader = null; - } - if (logRecordScanner != null) { - logRecordScanner.close(); - logRecordScanner = null; - } + public synchronized void close() throws Exception { + close(baseFileReader, logRecordScanner); + baseFileReader = null; + logRecordScanner = null; } /** * Return an ordered list of instants which have not been synced to the Metadata Table. */ + @Override protected List findInstantsToSync() { initIfNeeded(); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java index 56c3cd2cc..9e6222a87 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadata.java @@ -72,8 +72,13 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable { static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath, String spillableMapPath) { + return create(engineContext, metadataConfig, datasetBasePath, spillableMapPath, false); + } + + static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath, + String spillableMapPath, boolean reuse) { if (metadataConfig.useFileListingMetadata()) { - return new HoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath); + return new HoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath, reuse); } else { return new FileSystemBackedTableMetadata(engineContext, new SerializableConfiguration(engineContext.getHadoopConf()), datasetBasePath, metadataConfig.shouldAssumeDatePartitioning());