1
0

[HUDI-1552] Improve performance of key lookups from base file in Metadata Table. (#2494)

* [HUDI-1552] Improve performance of key lookups from base file in Metadata Table.

1. Cache the KeyScanner across lookups so that the HFile index does not have to be read for each lookup.
2. Enable block caching in KeyScanner.
3. Move the lock to a limited scope of the code to reduce lock contention.
4. Removed reuse configuration

* Properly close the readers, when metadata table is accessed from executors

 - Passing a reuse boolean into HoodieBackedTableMetadata
 - Preserve the fast return behavior when reusing and opening from multiple threads (no contention)
 - Handle concurrent close() and open readers, for reuse=false, by always synchronizing

Co-authored-by: Vinoth Chandar <vinoth@apache.org>
This commit is contained in:
Prashant Wason
2021-03-15 13:42:57 -07:00
committed by GitHub
parent 76bf2cc790
commit 3b36cb805d
8 changed files with 126 additions and 87 deletions

View File

@@ -24,6 +24,7 @@ import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.utils.SparkUtil;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -147,7 +148,8 @@ public class MetadataCommand implements CommandMarker {
public String stats() throws IOException {
HoodieCLI.getTableMetaClient();
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(HoodieCLI.conf, config, HoodieCLI.basePath, "/tmp");
HoodieBackedTableMetadata metadata = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(HoodieCLI.conf),
config, HoodieCLI.basePath, "/tmp");
Map<String, String> stats = metadata.stats();
StringBuffer out = new StringBuffer("\n");
@@ -197,7 +199,7 @@ public class MetadataCommand implements CommandMarker {
final String partition) throws IOException {
HoodieCLI.getTableMetaClient();
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().enable(true).build();
HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(HoodieCLI.conf, config, HoodieCLI.basePath, "/tmp");
HoodieBackedTableMetadata metaReader = new HoodieBackedTableMetadata(new HoodieLocalEngineContext(HoodieCLI.conf), config, HoodieCLI.basePath, "/tmp");
StringBuffer out = new StringBuffer("\n");
if (!metaReader.enabled()) {

View File

@@ -107,9 +107,8 @@ public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implem
this.hadoopConfiguration = context.getHadoopConf();
this.context = context;
// disable reuse of resources, given there is no close() called on the executors ultimately
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(config.getMetadataConfig().getProps())
.enableReuse(false).build();
.build();
this.metadata = HoodieTableMetadata.create(context, metadataConfig, config.getBasePath(),
FileSystemViewStorageConfig.DEFAULT_VIEW_SPILLABLE_DIR);

View File

@@ -973,7 +973,6 @@ public class TestHoodieBackedMetadata extends HoodieClientTestHarness {
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(useFileListingMetadata)
.enableReuse(false)
.enableMetrics(enableMetrics)
.enableFallback(false).build())
.withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)

View File

@@ -67,10 +67,6 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig {
public static final String CLEANER_COMMITS_RETAINED_PROP = METADATA_PREFIX + ".cleaner.commits.retained";
public static final int DEFAULT_CLEANER_COMMITS_RETAINED = 3;
// Controls whether or no the base file open/log merges are reused per API call
public static final String ENABLE_REUSE_PROP = METADATA_PREFIX + ".reuse.enable";
public static final String DEFAULT_ENABLE_REUSE = "true";
// Controls whether or not, upon failure to fetch from metadata table, should fallback to listing.
public static final String ENABLE_FALLBACK_PROP = METADATA_PREFIX + ".fallback.enable";
public static final String DEFAULT_ENABLE_FALLBACK = "true";
@@ -105,10 +101,6 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig {
return Boolean.parseBoolean(props.getProperty(METADATA_ENABLE_PROP));
}
public boolean enableReuse() {
return Boolean.parseBoolean(props.getProperty(ENABLE_REUSE_PROP));
}
public boolean enableFallback() {
return Boolean.parseBoolean(props.getProperty(ENABLE_FALLBACK_PROP));
}
@@ -151,11 +143,6 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig {
return this;
}
public Builder enableReuse(boolean reuse) {
props.setProperty(ENABLE_REUSE_PROP, String.valueOf(reuse));
return this;
}
public Builder enableFallback(boolean fallback) {
props.setProperty(ENABLE_FALLBACK_PROP, String.valueOf(fallback));
return this;
@@ -233,8 +220,6 @@ public final class HoodieMetadataConfig extends DefaultHoodieConfig {
HOODIE_ASSUME_DATE_PARTITIONING_PROP, DEFAULT_ASSUME_DATE_PARTITIONING);
setDefaultOnCondition(props, !props.containsKey(ENABLE_FALLBACK_PROP), ENABLE_FALLBACK_PROP,
DEFAULT_ENABLE_FALLBACK);
setDefaultOnCondition(props, !props.containsKey(ENABLE_REUSE_PROP), ENABLE_REUSE_PROP,
DEFAULT_ENABLE_REUSE);
setDefaultOnCondition(props, !props.containsKey(DIRECTORY_FILTER_REGEX), DIRECTORY_FILTER_REGEX,
DEFAULT_DIRECTORY_FILTER_REGEX);
return config;

View File

@@ -214,7 +214,7 @@ public class FileSystemViewManager {
final FileSystemViewStorageConfig config,
final String basePath) {
return createViewManager(context, metadataConfig, config,
() -> HoodieTableMetadata.create(context, metadataConfig, basePath, config.getSpillableDir()));
() -> HoodieTableMetadata.create(context, metadataConfig, basePath, config.getSpillableDir(), true));
}
/**

View File

@@ -56,6 +56,9 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
private Configuration conf;
private HFile.Reader reader;
private Schema schema;
// Scanner used to read individual keys. This is cached to prevent the overhead of opening the scanner for each
// key retrieval.
private HFileScanner keyScanner;
public static final String KEY_SCHEMA = "schema";
public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter";
@@ -140,7 +143,7 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
public List<Pair<String, R>> readAllRecords(Schema writerSchema, Schema readerSchema) throws IOException {
List<Pair<String, R>> recordList = new LinkedList<>();
try {
HFileScanner scanner = reader.getScanner(false, false);
final HFileScanner scanner = reader.getScanner(false, false);
if (scanner.seekTo()) {
do {
Cell c = scanner.getKeyValue();
@@ -174,7 +177,7 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
// To handle when hasNext() is called multiple times for idempotency and/or the first time
if (this.next == null && !this.eof) {
if (!scanner.isSeeked() && scanner.seekTo()) {
this.next = (R)getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema);
this.next = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema);
}
}
return this.next != null;
@@ -194,7 +197,7 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
}
R retVal = this.next;
if (scanner.next()) {
this.next = (R)getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema);
this.next = getRecordFromCell(scanner.getKeyValue(), getSchema(), readerSchema);
} else {
this.next = null;
this.eof = true;
@@ -209,12 +212,23 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
@Override
public Option getRecordByKey(String key, Schema readerSchema) throws IOException {
HFileScanner scanner = reader.getScanner(false, true);
byte[] value = null;
KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
if (scanner.seekTo(kv) == 0) {
Cell c = scanner.getKeyValue();
byte[] keyBytes = Arrays.copyOfRange(c.getRowArray(), c.getRowOffset(), c.getRowOffset() + c.getRowLength());
R record = getRecordFromCell(c, getSchema(), readerSchema);
synchronized (this) {
if (keyScanner == null) {
keyScanner = reader.getScanner(true, true);
}
if (keyScanner.seekTo(kv) == 0) {
Cell c = keyScanner.getKeyValue();
// Extract the byte value before releasing the lock since we cannot hold on to the returned cell afterwards
value = Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength());
}
}
if (value != null) {
R record = (R)HoodieAvroUtils.bytesToAvro(value, getSchema(), readerSchema);
return Option.of(record);
}
@@ -232,12 +246,13 @@ public class HoodieHFileReader<R extends IndexedRecord> implements HoodieFileRea
}
@Override
public void close() {
public synchronized void close() {
try {
reader.close();
reader = null;
keyScanner = null;
} catch (IOException e) {
e.printStackTrace();
throw new HoodieIOException("Error closing the hfile reader", e);
}
}

View File

@@ -23,7 +23,6 @@ import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
@@ -46,7 +45,6 @@ import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -73,19 +71,23 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
// Metadata table's timeline and metaclient
private HoodieTableMetaClient metaClient;
private List<FileSlice> latestFileSystemMetadataSlices;
// should we reuse the open file handles, across calls
private final boolean reuse;
// Readers for the base and log file which store the metadata
private transient HoodieFileReader<GenericRecord> baseFileReader;
private transient HoodieMetadataMergedLogRecordScanner logRecordScanner;
public HoodieBackedTableMetadata(Configuration conf, HoodieMetadataConfig metadataConfig,
public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig,
String datasetBasePath, String spillableMapDirectory) {
this(new HoodieLocalEngineContext(conf), metadataConfig, datasetBasePath, spillableMapDirectory);
this(engineContext, metadataConfig, datasetBasePath, spillableMapDirectory, false);
}
public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig,
String datasetBasePath, String spillableMapDirectory) {
String datasetBasePath, String spillableMapDirectory, boolean reuse) {
super(engineContext, metadataConfig, datasetBasePath, spillableMapDirectory);
this.reuse = reuse;
initIfNeeded();
}
@@ -112,13 +114,12 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
@Override
protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key) {
openReadersIfNeededOrThrow();
try {
List<Long> timings = new ArrayList<>();
HoodieTimer timer = new HoodieTimer().startTimer();
openFileSliceIfNeeded();
timings.add(timer.endTimer());
timer.startTimer();
// Retrieve record from base file
HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
if (baseFileReader != null) {
@@ -147,82 +148,115 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
}
}
timings.add(timer.endTimer());
LOG.info(String.format("Metadata read for key %s took [open, baseFileRead, logMerge] %s ms", key, timings));
LOG.info(String.format("Metadata read for key %s took [baseFileRead, logMerge] %s ms", key, timings));
return Option.ofNullable(hoodieRecord);
} catch (IOException ioe) {
throw new HoodieIOException("Error merging records from metadata table for key :" + key, ioe);
} finally {
closeIfNeeded();
if (!reuse) {
closeOrThrow();
}
}
}
private void openReadersIfNeededOrThrow() {
try {
openReadersIfNeeded();
} catch (IOException e) {
throw new HoodieIOException("Error opening readers to the Metadata Table: ", e);
}
}
/**
* Open readers to the base and log files.
* Returns a new pair of readers to the base and log files.
*/
private synchronized void openFileSliceIfNeeded() throws IOException {
if (metadataConfig.enableReuse() && baseFileReader != null) {
// we will reuse what's open.
private void openReadersIfNeeded() throws IOException {
if (reuse && (baseFileReader != null || logRecordScanner != null)) {
// quickly exit out without synchronizing if reusing and readers are already open
return;
}
// Metadata is in sync till the latest completed instant on the dataset
HoodieTimer timer = new HoodieTimer().startTimer();
String latestInstantTime = getLatestDatasetInstantTime();
ValidationUtils.checkArgument(latestFileSystemMetadataSlices.size() == 1, "must be at-least one validata metadata file slice");
// we always force synchronization, if reuse=false, to handle concurrent close() calls as well.
synchronized (this) {
if (baseFileReader != null || logRecordScanner != null) {
return;
}
// If the base file is present then create a reader
Option<HoodieBaseFile> basefile = latestFileSystemMetadataSlices.get(0).getBaseFile();
if (basefile.isPresent()) {
String basefilePath = basefile.get().getPath();
baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
LOG.info("Opened metadata base file from " + basefilePath + " at instant " + basefile.get().getCommitTime());
final long baseFileOpenMs;
final long logScannerOpenMs;
// Metadata is in sync till the latest completed instant on the dataset
HoodieTimer timer = new HoodieTimer().startTimer();
String latestInstantTime = getLatestDatasetInstantTime();
ValidationUtils.checkArgument(latestFileSystemMetadataSlices.size() == 1, "must be at-least one valid metadata file slice");
// If the base file is present then create a reader
Option<HoodieBaseFile> basefile = latestFileSystemMetadataSlices.get(0).getBaseFile();
if (basefile.isPresent()) {
String basefilePath = basefile.get().getPath();
baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
baseFileOpenMs = timer.endTimer();
LOG.info(String.format("Opened metadata base file from %s at instant %s in %d ms", basefilePath,
basefile.get().getCommitTime(), baseFileOpenMs));
} else {
baseFileOpenMs = 0;
timer.endTimer();
}
// Open the log record scanner using the log files from the latest file slice
timer.startTimer();
List<String> logFilePaths = latestFileSystemMetadataSlices.get(0).getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
.map(o -> o.getPath().toString())
.collect(Collectors.toList());
Option<HoodieInstant> lastInstant = metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
String latestMetaInstantTimestamp = lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
// Load the schema
Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
logRecordScanner = new HoodieMetadataMergedLogRecordScanner(metaClient.getFs(), metadataBasePath, logFilePaths,
schema, latestMetaInstantTimestamp, MAX_MEMORY_SIZE_IN_BYTES, BUFFER_SIZE, spillableMapDirectory, null);
logScannerOpenMs = timer.endTimer();
LOG.info(String.format("Opened metadata log files from %s at instant (dataset instant=%s, metadata instant=%s) in %d ms",
logFilePaths, latestInstantTime, latestMetaInstantTimestamp, logScannerOpenMs));
metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs + logScannerOpenMs));
}
// Open the log record scanner using the log files from the latest file slice
List<String> logFilePaths = latestFileSystemMetadataSlices.get(0).getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
.map(o -> o.getPath().toString())
.collect(Collectors.toList());
Option<HoodieInstant> lastInstant = metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
String latestMetaInstantTimestamp = lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
// Load the schema
Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
logRecordScanner = new HoodieMetadataMergedLogRecordScanner(metaClient.getFs(), metadataBasePath,
logFilePaths, schema, latestMetaInstantTimestamp, MAX_MEMORY_SIZE_IN_BYTES, BUFFER_SIZE,
spillableMapDirectory, null);
LOG.info("Opened metadata log files from " + logFilePaths + " at instant " + latestInstantTime
+ "(dataset instant=" + latestInstantTime + ", metadata instant=" + latestMetaInstantTimestamp + ")");
metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, timer.endTimer()));
}
private void closeIfNeeded() {
private void close(HoodieFileReader localFileReader, HoodieMetadataMergedLogRecordScanner localLogScanner) {
try {
if (!metadataConfig.enableReuse()) {
close();
if (localFileReader != null) {
localFileReader.close();
}
if (localLogScanner != null) {
localLogScanner.close();
}
} catch (Exception e) {
throw new HoodieException("Error closing resources during metadata table merge", e);
}
}
private void closeOrThrow() {
try {
close();
} catch (Exception e) {
throw new HoodieException("Error closing metadata table readers", e);
}
}
@Override
public void close() throws Exception {
if (baseFileReader != null) {
baseFileReader.close();
baseFileReader = null;
}
if (logRecordScanner != null) {
logRecordScanner.close();
logRecordScanner = null;
}
public synchronized void close() throws Exception {
close(baseFileReader, logRecordScanner);
baseFileReader = null;
logRecordScanner = null;
}
/**
* Return an ordered list of instants which have not been synced to the Metadata Table.
*/
@Override
protected List<HoodieInstant> findInstantsToSync() {
initIfNeeded();

View File

@@ -72,8 +72,13 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable {
static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath,
String spillableMapPath) {
return create(engineContext, metadataConfig, datasetBasePath, spillableMapPath, false);
}
static HoodieTableMetadata create(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig, String datasetBasePath,
String spillableMapPath, boolean reuse) {
if (metadataConfig.useFileListingMetadata()) {
return new HoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath);
return new HoodieBackedTableMetadata(engineContext, metadataConfig, datasetBasePath, spillableMapPath, reuse);
} else {
return new FileSystemBackedTableMetadata(engineContext, new SerializableConfiguration(engineContext.getHadoopConf()),
datasetBasePath, metadataConfig.shouldAssumeDatePartitioning());