1
0

[HUDI-2285][HUDI-2476] Metadata table synchronous design. Rebased and Squashed from pull/3426 (#3590)

* [HUDI-2285] Adding Synchronous updates to metadata before completion of commits in data timelime.

- This patch adds synchronous updates to metadata table. In other words, every write is first committed to metadata table followed by data table. While reading metadata table, we ignore any delta commits that are present only in metadata table and not in data table timeline.
- Compaction of metadata table is fenced by the condition that we trigger compaction only when there are no inflight requests in datatable. This ensures that all base files in metadata table is always in sync with data table(w/o any holes) and only there could be some extra invalid commits among delta log files in metadata table.
- Due to this, archival of data table also fences itself up until compacted instant in metadata table.
All writes to metadata table happens within the datatable lock. So, metadata table works in one writer mode only. This might be tough to loosen since all writers write to same FILES partition and so, will result in a conflict anyways.
- As part of this, have added acquiring locks in data table for those operations which were not before while committing (rollback, clean, compaction, cluster). To note, we were not doing any conflict resolution. All we are doing here is to commit by taking a lock. So that all writes to metadata table is always a single writer. 
- Also added building block to add buckets for partitions, which will be leveraged by other indexes like record level index, etc. For now, FILES partition has only one bucket. In general, any number of buckets per partition is allowed and each partition has a fixed fileId prefix with incremental suffix for each bucket within each partition.
Have fixed [HUDI-2476]. This fix is about retrying a failed compaction if it succeeded in metadata for first time, but failed w/ data table.
- Enabling metadata table by default.
- Adding more tests for metadata table

Co-authored-by: Prashant Wason <pwason@uber.com>
This commit is contained in:
Sivabalan Narayanan
2021-10-06 00:17:52 -04:00
committed by GitHub
parent 46808dcb1f
commit 5f32162a2f
101 changed files with 3329 additions and 2069 deletions

View File

@@ -41,24 +41,10 @@ public final class HoodieMetadataConfig extends HoodieConfig {
// Enable the internal Metadata Table which saves file listings
public static final ConfigProperty<Boolean> ENABLE = ConfigProperty
.key(METADATA_PREFIX + ".enable")
.defaultValue(false)
.defaultValue(true)
.sinceVersion("0.7.0")
.withDocumentation("Enable the internal metadata table which serves table metadata like level file listings");
// Enable syncing the Metadata Table
public static final ConfigProperty<Boolean> SYNC_ENABLE = ConfigProperty
.key(METADATA_PREFIX + ".sync.enable")
.defaultValue(true)
.sinceVersion("0.9.0")
.withDocumentation("Enable syncing of metadata table from actions on the dataset");
// Validate contents of Metadata Table on each access against the actual filesystem
public static final ConfigProperty<Boolean> VALIDATE_ENABLE = ConfigProperty
.key(METADATA_PREFIX + ".validate")
.defaultValue(false)
.sinceVersion("0.7.0")
.withDocumentation("Validate contents of metadata table on each access; e.g against the actual listings from lake storage");
public static final boolean DEFAULT_METADATA_ENABLE_FOR_READERS = false;
// Enable metrics for internal Metadata Table
@@ -149,14 +135,6 @@ public final class HoodieMetadataConfig extends HoodieConfig {
return getBoolean(ENABLE);
}
public boolean enableSync() {
return enabled() && getBoolean(HoodieMetadataConfig.SYNC_ENABLE);
}
public boolean validateFileListingMetadata() {
return getBoolean(VALIDATE_ENABLE);
}
public boolean enableMetrics() {
return getBoolean(METRICS_ENABLE);
}
@@ -186,21 +164,11 @@ public final class HoodieMetadataConfig extends HoodieConfig {
return this;
}
public Builder enableSync(boolean enable) {
metadataConfig.setValue(SYNC_ENABLE, String.valueOf(enable));
return this;
}
public Builder enableMetrics(boolean enableMetrics) {
metadataConfig.setValue(METRICS_ENABLE, String.valueOf(enableMetrics));
return this;
}
public Builder validate(boolean validate) {
metadataConfig.setValue(VALIDATE_ENABLE, String.valueOf(validate));
return this;
}
public Builder withInsertParallelism(int parallelism) {
metadataConfig.setValue(INSERT_PARALLELISM_VALUE, String.valueOf(parallelism));
return this;
@@ -258,16 +226,6 @@ public final class HoodieMetadataConfig extends HoodieConfig {
*/
@Deprecated
public static final boolean DEFAULT_METADATA_ENABLE = ENABLE.defaultValue();
/**
* @deprecated Use {@link #VALIDATE_ENABLE} and its methods.
*/
@Deprecated
public static final String METADATA_VALIDATE_PROP = VALIDATE_ENABLE.key();
/**
* @deprecated Use {@link #VALIDATE_ENABLE} and its methods.
*/
@Deprecated
public static final boolean DEFAULT_METADATA_VALIDATE = VALIDATE_ENABLE.defaultValue();
/**
* @deprecated Use {@link #METRICS_ENABLE} and its methods.

View File

@@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
@@ -271,11 +270,10 @@ public class FSUtils {
}
public static List<String> getAllPartitionPaths(HoodieEngineContext engineContext, String basePathStr,
boolean useFileListingFromMetadata, boolean verifyListings,
boolean useFileListingFromMetadata,
boolean assumeDatePartitioning) {
HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
.enable(useFileListingFromMetadata)
.validate(verifyListings)
.withAssumeDatePartitioning(assumeDatePartitioning)
.build();
try (HoodieTableMetadata tableMetadata = HoodieTableMetadata.create(engineContext, metadataConfig, basePathStr,
@@ -537,15 +535,6 @@ public class FSUtils {
return recovered;
}
public static void deleteInstantFile(FileSystem fs, String metaPath, HoodieInstant instant) {
try {
LOG.warn("try to delete instant file: " + instant);
fs.delete(new Path(metaPath, instant.getFileName()), false);
} catch (IOException e) {
throw new HoodieIOException("Could not delete instant file" + instant.getFileName(), e);
}
}
public static void createPathIfNotExists(FileSystem fs, Path partitionPath) throws IOException {
if (!fs.exists(partitionPath)) {
fs.mkdirs(partitionPath);

View File

@@ -32,7 +32,9 @@ public enum HoodieTableVersion {
// 0.6.0 onwards
ONE(1),
// 0.9.0 onwards
TWO(2);
TWO(2),
// 0.10.0 onwards
THREE(3);
private final int versionCode;
@@ -45,7 +47,7 @@ public enum HoodieTableVersion {
}
public static HoodieTableVersion current() {
return TWO;
return THREE;
}
public static HoodieTableVersion versionFromCode(int versionCode) {

View File

@@ -56,6 +56,8 @@ public interface HoodieLogFormat {
String UNKNOWN_WRITE_TOKEN = "1-0-1";
String DEFAULT_WRITE_TOKEN = "0-0-0";
/**
* Writer interface to allow appending block to this file format.
*/

View File

@@ -181,52 +181,61 @@ public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordScanner
protected ExternalSpillableMap.DiskMapType diskMapType = HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue();
protected boolean isBitCaskDiskMapCompressionEnabled = HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue();
// incremental filtering
private Option<InstantRange> instantRange = Option.empty();
protected Option<InstantRange> instantRange = Option.empty();
// auto scan default true
private boolean autoScan = true;
// operation field default false
private boolean withOperationField = false;
@Override
public Builder withFileSystem(FileSystem fs) {
this.fs = fs;
return this;
}
@Override
public Builder withBasePath(String basePath) {
this.basePath = basePath;
return this;
}
@Override
public Builder withLogFilePaths(List<String> logFilePaths) {
this.logFilePaths = logFilePaths;
return this;
}
@Override
public Builder withReaderSchema(Schema schema) {
this.readerSchema = schema;
return this;
}
@Override
public Builder withLatestInstantTime(String latestInstantTime) {
this.latestInstantTime = latestInstantTime;
return this;
}
@Override
public Builder withReadBlocksLazily(boolean readBlocksLazily) {
this.readBlocksLazily = readBlocksLazily;
return this;
}
@Override
public Builder withReverseReader(boolean reverseReader) {
this.reverseReader = reverseReader;
return this;
}
@Override
public Builder withBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}
@Override
public Builder withInstantRange(Option<InstantRange> instantRange) {
this.instantRange = instantRange;
return this;

View File

@@ -27,6 +27,7 @@ import org.apache.hudi.exception.HoodieIOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -172,6 +173,14 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline {
deleteInstantFile(instant);
}
public static void deleteInstantFile(FileSystem fs, String metaPath, HoodieInstant instant) {
try {
fs.delete(new Path(metaPath, instant.getFileName()), false);
} catch (IOException e) {
throw new HoodieIOException("Could not delete instant file" + instant.getFileName(), e);
}
}
public void deletePendingIfExists(HoodieInstant.State state, String action, String instantStr) {
HoodieInstant instant = new HoodieInstant(state, action, instantStr);
ValidationUtils.checkArgument(!instant.isCompleted());

View File

@@ -229,7 +229,14 @@ public class HoodieDefaultTimeline implements HoodieTimeline {
*/
public HoodieTimeline getRollbackTimeline() {
return new HoodieDefaultTimeline(filterInstantsByAction(ROLLBACK_ACTION),
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
(Function<HoodieInstant, Option<byte[]>> & Serializable) this::getInstantDetails);
}
/**
* Get only the rollback and restore action (inflight and completed) in the active timeline.
*/
public HoodieTimeline getRollbackAndRestoreTimeline() {
return getTimelineOfActions(CollectionUtils.createSet(ROLLBACK_ACTION, RESTORE_ACTION));
}
/**

View File

@@ -26,12 +26,8 @@ import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieMetadataException;
@@ -42,13 +38,10 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public abstract class BaseTableMetadata implements HoodieTableMetadata {
@@ -59,22 +52,21 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
protected final transient HoodieEngineContext engineContext;
protected final SerializableConfiguration hadoopConf;
protected final String datasetBasePath;
protected final HoodieTableMetaClient datasetMetaClient;
protected final String dataBasePath;
protected final HoodieTableMetaClient dataMetaClient;
protected final Option<HoodieMetadataMetrics> metrics;
protected final HoodieMetadataConfig metadataConfig;
// Directory used for Spillable Map when merging records
protected final String spillableMapDirectory;
protected boolean enabled;
private TimelineMergedTableMetadata timelineMergedMetadata;
protected BaseTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig,
String datasetBasePath, String spillableMapDirectory) {
String dataBasePath, String spillableMapDirectory) {
this.engineContext = engineContext;
this.hadoopConf = new SerializableConfiguration(engineContext.getHadoopConf());
this.datasetBasePath = datasetBasePath;
this.datasetMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(datasetBasePath).build();
this.dataBasePath = dataBasePath;
this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(dataBasePath).build();
this.spillableMapDirectory = spillableMapDirectory;
this.metadataConfig = metadataConfig;
@@ -104,7 +96,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
throw new HoodieMetadataException("Failed to retrieve list of partition from metadata", e);
}
}
return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, datasetBasePath,
return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath,
metadataConfig.shouldAssumeDatePartitioning()).getAllPartitionPaths();
}
@@ -129,7 +121,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
}
}
return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, datasetBasePath, metadataConfig.shouldAssumeDatePartitioning())
return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath, metadataConfig.shouldAssumeDatePartitioning())
.getAllFilesInPartition(partitionPath);
}
@@ -149,7 +141,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
}
}
return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, datasetBasePath, metadataConfig.shouldAssumeDatePartitioning())
return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, dataBasePath, metadataConfig.shouldAssumeDatePartitioning())
.getAllFilesInPartitions(partitionPaths);
}
@@ -158,7 +150,7 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
*/
protected List<String> fetchAllPartitionPaths() throws IOException {
HoodieTimer timer = new HoodieTimer().startTimer();
Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getMergedRecordByKey(RECORDKEY_PARTITION_LIST);
Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getRecordByKeyFromMetadata(RECORDKEY_PARTITION_LIST, MetadataPartitionType.FILES.partitionPath());
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer()));
List<String> partitions = Collections.emptyList();
@@ -176,28 +168,6 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
}
}
if (metadataConfig.validateFileListingMetadata()) {
// Validate the Metadata Table data by listing the partitions from the file system
timer.startTimer();
FileSystemBackedTableMetadata fileSystemBackedTableMetadata = new FileSystemBackedTableMetadata(getEngineContext(),
hadoopConf, datasetBasePath, metadataConfig.shouldAssumeDatePartitioning());
List<String> actualPartitions = fileSystemBackedTableMetadata.getAllPartitionPaths();
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_PARTITIONS_STR, timer.endTimer()));
Collections.sort(actualPartitions);
Collections.sort(partitions);
if (!actualPartitions.equals(partitions)) {
LOG.error("Validation of metadata partition list failed. Lists do not match.");
LOG.error("Partitions from metadata: " + Arrays.toString(partitions.toArray()));
LOG.error("Partitions from file system: " + Arrays.toString(actualPartitions.toArray()));
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0));
}
// Return the direct listing as it should be correct
partitions = actualPartitions;
}
LOG.info("Listed partitions from metadata: #partitions=" + partitions.size());
return partitions;
}
@@ -208,13 +178,13 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
* @param partitionPath The absolute path of the partition
*/
FileStatus[] fetchAllFilesInPartition(Path partitionPath) throws IOException {
String partitionName = FSUtils.getRelativePartitionPath(new Path(datasetBasePath), partitionPath);
String partitionName = FSUtils.getRelativePartitionPath(new Path(dataBasePath), partitionPath);
if (partitionName.isEmpty()) {
partitionName = NON_PARTITIONED_NAME;
}
HoodieTimer timer = new HoodieTimer().startTimer();
Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getMergedRecordByKey(partitionName);
Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = getRecordByKeyFromMetadata(partitionName, MetadataPartitionType.FILES.partitionPath());
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
FileStatus[] statuses = {};
@@ -226,101 +196,11 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
statuses = hoodieRecord.get().getData().getFileStatuses(hadoopConf.get(), partitionPath);
}
if (metadataConfig.validateFileListingMetadata()) {
// Validate the Metadata Table data by listing the partitions from the file system
timer.startTimer();
String partitionPathStr = FSUtils.getRelativePartitionPath(new Path(datasetMetaClient.getBasePath()), partitionPath);
String latestDataInstantTime = getLatestDatasetInstantTime();
HoodieTableFileSystemView dataFsView = new HoodieTableFileSystemView(datasetMetaClient, datasetMetaClient.getActiveTimeline());
List<FileStatus> directStatuses = dataFsView.getAllFileSlices(partitionPathStr).flatMap(slice -> {
List<FileStatus> paths = new ArrayList<>();
slice.getBaseFile().ifPresent(baseFile -> {
if (HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, latestDataInstantTime)) {
paths.add(baseFile.getFileStatus());
}
});
//TODO(metadata): this will remain problematic; no way to know the commit time based on log file written
slice.getLogFiles().forEach(logFile -> paths.add(logFile.getFileStatus()));
return paths.stream();
}).collect(Collectors.toList());
List<String> directFilenames = directStatuses.stream()
.map(fileStatus -> fileStatus.getPath().getName()).sorted()
.collect(Collectors.toList());
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_FILES_STR, timer.endTimer()));
List<String> metadataFilenames = Arrays.stream(statuses)
.map(s -> s.getPath().getName()).sorted()
.collect(Collectors.toList());
if (!metadataFilenames.equals(directFilenames)) {
LOG.error("Validation of metadata file listing for partition " + partitionName + " failed.");
LOG.error("File list from metadata: " + Arrays.toString(metadataFilenames.toArray()));
LOG.error("File list from direct listing: " + Arrays.toString(directFilenames.toArray()));
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.VALIDATE_ERRORS_STR, 0));
}
// Return the direct listing as it should be correct
statuses = directStatuses.toArray(new FileStatus[0]);
}
LOG.info("Listed file in partition from metadata: partition=" + partitionName + ", #files=" + statuses.length);
return statuses;
}
/**
* Retrieve the merged {@code HoodieRecord} mapped to the given key.
*
* @param key The key of the record
*/
private Option<HoodieRecord<HoodieMetadataPayload>> getMergedRecordByKey(String key) {
Option<HoodieRecord<HoodieMetadataPayload>> mergedRecord;
Option<HoodieRecord<HoodieMetadataPayload>> metadataHoodieRecord = getRecordByKeyFromMetadata(key);
// Retrieve record from unsynced timeline instants
Option<HoodieRecord<HoodieMetadataPayload>> timelineHoodieRecord = timelineMergedMetadata.getRecordByKey(key);
if (timelineHoodieRecord.isPresent()) {
if (metadataHoodieRecord.isPresent()) {
HoodieRecordPayload mergedPayload = timelineHoodieRecord.get().getData().preCombine(metadataHoodieRecord.get().getData());
mergedRecord = Option.of(new HoodieRecord(metadataHoodieRecord.get().getKey(), mergedPayload));
} else {
mergedRecord = timelineHoodieRecord;
}
} else {
mergedRecord = metadataHoodieRecord;
}
return mergedRecord;
}
protected abstract Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key);
protected void openTimelineScanner(HoodieActiveTimeline metadataTableTimeline) {
if (timelineMergedMetadata == null) {
List<HoodieInstant> unSyncedInstants = findInstantsToSyncForReader();
timelineMergedMetadata =
new TimelineMergedTableMetadata(datasetMetaClient, metadataTableTimeline, unSyncedInstants, getUpdateTime(), null);
}
}
/**
* Return the instants which are not-synced to the {@code HoodieTableMetadata}.
*
* This is the list of all completed but un-synched instants.
*/
protected abstract List<HoodieInstant> findInstantsToSyncForReader();
/**
* Return the instants which are not-synced to the {@code HoodieTableMetadataWriter}.
*
* This is the list of all completed but un-synched instants which do not have any incomplete instants in between them.
*/
protected abstract List<HoodieInstant> findInstantsToSyncForWriter();
@Override
public boolean isInSync() {
return enabled && findInstantsToSyncForWriter().isEmpty();
}
protected abstract Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key, String partitionName);
protected HoodieEngineContext getEngineContext() {
return engineContext != null ? engineContext : new HoodieLocalEngineContext(hadoopConf.get());
@@ -330,15 +210,8 @@ public abstract class BaseTableMetadata implements HoodieTableMetadata {
return metadataConfig;
}
protected String getLatestDatasetInstantTime() {
return datasetMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant()
protected String getLatestDataInstantTime() {
return dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant()
.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
}
public Option<String> getReaderTime() {
if (timelineMergedMetadata == null) {
return Option.empty();
}
return timelineMergedMetadata.getSyncedInstantTime();
}
}

View File

@@ -126,13 +126,13 @@ public class FileSystemBackedTableMetadata implements HoodieTableMetadata {
}
@Override
public Option<String> getUpdateTime() {
public Option<String> getSyncedInstantTime() {
throw new UnsupportedOperationException();
}
@Override
public boolean isInSync() {
return true;
public Option<String> getLatestCompactionTime() {
throw new UnsupportedOperationException();
}
@Override

View File

@@ -20,6 +20,8 @@ package org.apache.hudi.metadata;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
@@ -32,9 +34,9 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.SpillableMapUtils;
@@ -42,6 +44,7 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -56,15 +59,15 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* Table metadata provided by an internal DFS backed Hudi metadata table.
*
* If the metadata table does not exist, RPC calls are used to retrieve file listings from the file system.
* No updates are applied to the table and it is not synced.
*/
public class HoodieBackedTableMetadata extends BaseTableMetadata {
@@ -72,16 +75,13 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
private String metadataBasePath;
// Metadata table's timeline and metaclient
private HoodieTableMetaClient metaClient;
private HoodieTableConfig tableConfig;
private List<FileSlice> latestFileSystemMetadataSlices;
private HoodieTableMetaClient metadataMetaClient;
private HoodieTableConfig metadataTableConfig;
// should we reuse the open file handles, across calls
private final boolean reuse;
// Readers for the base and log file which store the metadata
private transient HoodieFileReader<GenericRecord> baseFileReader;
private transient HoodieMetadataMergedLogRecordScanner logRecordScanner;
// Readers for latest file slice corresponding to file groups in the metadata partition of interest
private Map<String, Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner>> partitionReaders = new ConcurrentHashMap<>();
public HoodieBackedTableMetadata(HoodieEngineContext engineContext, HoodieMetadataConfig metadataConfig,
String datasetBasePath, String spillableMapDirectory) {
@@ -96,40 +96,37 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
}
private void initIfNeeded() {
this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(dataBasePath);
if (!enabled) {
LOG.info("Metadata table is disabled for " + datasetBasePath);
} else if (this.metaClient == null) {
this.metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(datasetBasePath);
if (!HoodieTableMetadata.isMetadataTable(metadataBasePath)) {
LOG.info("Metadata table is disabled.");
}
} else if (this.metadataMetaClient == null) {
try {
this.metaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(metadataBasePath).build();
this.tableConfig = metaClient.getTableConfig();
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, metaClient.getActiveTimeline());
latestFileSystemMetadataSlices = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList());
this.metadataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(metadataBasePath).build();
this.metadataTableConfig = metadataMetaClient.getTableConfig();
} catch (TableNotFoundException e) {
LOG.warn("Metadata table was not found at path " + metadataBasePath);
this.enabled = false;
this.metaClient = null;
this.tableConfig = null;
this.metadataMetaClient = null;
this.metadataTableConfig = null;
} catch (Exception e) {
LOG.error("Failed to initialize metadata table at path " + metadataBasePath, e);
this.enabled = false;
this.metaClient = null;
this.tableConfig = null;
}
if (enabled) {
openTimelineScanner(metaClient.getActiveTimeline());
this.metadataMetaClient = null;
this.metadataTableConfig = null;
}
}
}
@Override
protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key) {
openReadersIfNeededOrThrow();
protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKeyFromMetadata(String key, String partitionName) {
Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner> readers = openReadersIfNeeded(key, partitionName);
try {
List<Long> timings = new ArrayList<>();
HoodieTimer timer = new HoodieTimer().startTimer();
HoodieFileReader baseFileReader = readers.getKey();
HoodieMetadataMergedLogRecordScanner logRecordScanner = readers.getRight();
// Retrieve record from base file
HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
@@ -137,10 +134,10 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
HoodieTimer readTimer = new HoodieTimer().startTimer();
Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
if (baseRecord.isPresent()) {
hoodieRecord = tableConfig.populateMetaFields()
? SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), tableConfig.getPayloadClass(), tableConfig.getPreCombineField(), false)
: SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), tableConfig.getPayloadClass(), tableConfig.getPreCombineField(),
Pair.of(tableConfig.getRecordKeyFieldProp(), tableConfig.getPartitionFieldProp()), false);
hoodieRecord = metadataTableConfig.populateMetaFields()
? SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(), false)
: SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(),
Pair.of(metadataTableConfig.getRecordKeyFieldProp(), metadataTableConfig.getPartitionFieldProp()), false);
metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
}
}
@@ -167,173 +164,173 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
throw new HoodieIOException("Error merging records from metadata table for key :" + key, ioe);
} finally {
if (!reuse) {
closeOrThrow();
close(partitionName);
}
}
}
private void openReadersIfNeededOrThrow() {
try {
openReadersIfNeeded();
} catch (IOException e) {
throw new HoodieIOException("Error opening readers to the Metadata Table: ", e);
}
}
/**
* Returns a new pair of readers to the base and log files.
*/
private void openReadersIfNeeded() throws IOException {
if (reuse && (baseFileReader != null || logRecordScanner != null)) {
// quickly exit out without synchronizing if reusing and readers are already open
return;
}
private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner> openReadersIfNeeded(String key, String partitionName) {
return partitionReaders.computeIfAbsent(partitionName, k -> {
try {
final long baseFileOpenMs;
final long logScannerOpenMs;
HoodieFileReader baseFileReader = null;
HoodieMetadataMergedLogRecordScanner logRecordScanner = null;
// we always force synchronization, if reuse=false, to handle concurrent close() calls as well.
synchronized (this) {
if (baseFileReader != null || logRecordScanner != null) {
return;
// Metadata is in sync till the latest completed instant on the dataset
HoodieTimer timer = new HoodieTimer().startTimer();
List<FileSlice> latestFileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName);
ValidationUtils.checkArgument(latestFileSlices.size() == 1, String.format("Invalid number of file slices: found=%d, required=%d", latestFileSlices.size(), 1));
final FileSlice slice = latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, latestFileSlices.size()));
// Open base file reader
Pair<HoodieFileReader, Long> baseFileReaderOpenTimePair = getBaseFileReader(slice, timer);
baseFileReader = baseFileReaderOpenTimePair.getKey();
baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
// Open the log record scanner using the log files from the latest file slice
Pair<HoodieMetadataMergedLogRecordScanner, Long> logRecordScannerOpenTimePair = getLogRecordScanner(slice);
logRecordScanner = logRecordScannerOpenTimePair.getKey();
logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs + logScannerOpenMs));
return Pair.of(baseFileReader, logRecordScanner);
} catch (IOException e) {
throw new HoodieIOException("Error opening readers for metadata table partition " + partitionName, e);
}
final long baseFileOpenMs;
final long logScannerOpenMs;
// Metadata is in sync till the latest completed instant on the dataset
HoodieTimer timer = new HoodieTimer().startTimer();
String latestInstantTime = getLatestDatasetInstantTime();
ValidationUtils.checkArgument(latestFileSystemMetadataSlices.size() == 1, "must be at-least one valid metadata file slice");
// If the base file is present then create a reader
Option<HoodieBaseFile> basefile = latestFileSystemMetadataSlices.get(0).getBaseFile();
if (basefile.isPresent()) {
String basefilePath = basefile.get().getPath();
baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
baseFileOpenMs = timer.endTimer();
LOG.info(String.format("Opened metadata base file from %s at instant %s in %d ms", basefilePath,
basefile.get().getCommitTime(), baseFileOpenMs));
} else {
baseFileOpenMs = 0;
timer.endTimer();
}
// Open the log record scanner using the log files from the latest file slice
timer.startTimer();
List<String> logFilePaths = latestFileSystemMetadataSlices.get(0).getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
.map(o -> o.getPath().toString())
.collect(Collectors.toList());
Option<HoodieInstant> lastInstant = metaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
String latestMetaInstantTimestamp = lastInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
// Load the schema
Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder()
.withFileSystem(metaClient.getFs())
.withBasePath(metadataBasePath)
.withLogFilePaths(logFilePaths)
.withReaderSchema(schema)
.withLatestInstantTime(latestMetaInstantTimestamp)
.withMaxMemorySizeInBytes(MAX_MEMORY_SIZE_IN_BYTES)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableMapDirectory)
.withDiskMapType(commonConfig.getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
.build();
logScannerOpenMs = timer.endTimer();
LOG.info(String.format("Opened metadata log files from %s at instant (dataset instant=%s, metadata instant=%s) in %d ms",
logFilePaths, latestInstantTime, latestMetaInstantTimestamp, logScannerOpenMs));
metrics.ifPresent(metrics -> metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs + logScannerOpenMs));
}
});
}
private void close(HoodieFileReader localFileReader, HoodieMetadataMergedLogRecordScanner localLogScanner) {
try {
if (localFileReader != null) {
localFileReader.close();
}
if (localLogScanner != null) {
localLogScanner.close();
}
} catch (Exception e) {
throw new HoodieException("Error closing resources during metadata table merge", e);
}
}
private void closeOrThrow() {
try {
close();
} catch (Exception e) {
throw new HoodieException("Error closing metadata table readers", e);
}
}
@Override
public synchronized void close() throws Exception {
close(baseFileReader, logRecordScanner);
baseFileReader = null;
logRecordScanner = null;
}
/**
* Return the timestamp of the latest synced instant.
*/
@Override
public Option<String> getUpdateTime() {
if (!enabled) {
return Option.empty();
}
HoodieActiveTimeline timeline = metaClient.reloadActiveTimeline();
return timeline.getDeltaCommitTimeline().filterCompletedInstants()
.lastInstant().map(HoodieInstant::getTimestamp);
}
/**
* Return an ordered list of instants which have not been synced to the Metadata Table.
*/
@Override
protected List<HoodieInstant> findInstantsToSyncForReader() {
return findInstantsToSync(true);
}
/**
* Return an ordered list of instants which have not been synced to the Metadata Table.
*/
@Override
protected List<HoodieInstant> findInstantsToSyncForWriter() {
return findInstantsToSync(false);
}
/**
* Return an ordered list of instants which have not been synced to the Metadata Table.
*/
private List<HoodieInstant> findInstantsToSync(boolean ignoreIncompleteInstants) {
initIfNeeded();
// if there are no instants yet, return empty list, since there is nothing to sync here.
if (!enabled || !metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent()) {
return Collections.EMPTY_LIST;
}
// All instants on the data timeline, which are greater than the last deltacommit instant on metadata timeline
// are candidates for sync. We only consider delta-commit instants as each actions on dataset leads to a
// deltacommit on the metadata table.
String latestMetadataInstantTime = metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
.lastInstant().get().getTimestamp();
HoodieDefaultTimeline candidateTimeline = datasetMetaClient.getActiveTimeline().findInstantsAfter(latestMetadataInstantTime, Integer.MAX_VALUE);
Option<HoodieInstant> earliestIncompleteInstant = ignoreIncompleteInstants ? Option.empty()
: candidateTimeline.filterInflightsAndRequested().firstInstant();
if (earliestIncompleteInstant.isPresent()) {
return candidateTimeline.filterCompletedInstants()
.findInstantsBefore(earliestIncompleteInstant.get().getTimestamp())
.getInstants().collect(Collectors.toList());
private Pair<HoodieFileReader, Long> getBaseFileReader(FileSlice slice, HoodieTimer timer) throws IOException {
HoodieFileReader baseFileReader = null;
Long baseFileOpenMs;
// If the base file is present then create a reader
Option<HoodieBaseFile> basefile = slice.getBaseFile();
if (basefile.isPresent()) {
String basefilePath = basefile.get().getPath();
baseFileReader = HoodieFileReaderFactory.getFileReader(hadoopConf.get(), new Path(basefilePath));
baseFileOpenMs = timer.endTimer();
LOG.info(String.format("Opened metadata base file from %s at instant %s in %d ms", basefilePath,
basefile.get().getCommitTime(), baseFileOpenMs));
} else {
return candidateTimeline.filterCompletedInstants()
.getInstants().collect(Collectors.toList());
baseFileOpenMs = 0L;
timer.endTimer();
}
return Pair.of(baseFileReader, baseFileOpenMs);
}
private Set<String> getValidInstantTimestamps() {
// Only those log files which have a corresponding completed instant on the dataset should be read
// This is because the metadata table is updated before the dataset instants are committed.
HoodieActiveTimeline datasetTimeline = dataMetaClient.getActiveTimeline();
Set<String> validInstantTimestamps = datasetTimeline.filterCompletedInstants().getInstants()
.map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
// For any rollbacks and restores, we cannot neglect the instants that they are rolling back.
// The rollback instant should be more recent than the start of the timeline for it to have rolled back any
// instant which we have a log block for.
final String earliestInstantTime = validInstantTimestamps.isEmpty() ? SOLO_COMMIT_TIMESTAMP : Collections.min(validInstantTimestamps);
datasetTimeline.getRollbackAndRestoreTimeline().filterCompletedInstants().getInstants()
.filter(instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN, earliestInstantTime))
.forEach(instant -> {
validInstantTimestamps.addAll(getRollbackedCommits(instant, datasetTimeline));
});
// SOLO_COMMIT_TIMESTAMP is used during bootstrap so it is a valid timestamp
validInstantTimestamps.add(SOLO_COMMIT_TIMESTAMP);
return validInstantTimestamps;
}
private Pair<HoodieMetadataMergedLogRecordScanner, Long> getLogRecordScanner(FileSlice slice) {
HoodieTimer timer = new HoodieTimer().startTimer();
List<String> logFilePaths = slice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
.map(o -> o.getPath().toString())
.collect(Collectors.toList());
// Only those log files which have a corresponding completed instant on the dataset should be read
// This is because the metadata table is updated before the dataset instants are committed.
Set<String> validInstantTimestamps = getValidInstantTimestamps();
Option<HoodieInstant> latestMetadataInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
String latestMetadataInstantTime = latestMetadataInstant.map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
// Load the schema
Schema schema = HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
HoodieCommonConfig commonConfig = HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
HoodieMetadataMergedLogRecordScanner logRecordScanner = HoodieMetadataMergedLogRecordScanner.newBuilder()
.withFileSystem(metadataMetaClient.getFs())
.withBasePath(metadataBasePath)
.withLogFilePaths(logFilePaths)
.withReaderSchema(schema)
.withLatestInstantTime(latestMetadataInstantTime)
.withMaxMemorySizeInBytes(MAX_MEMORY_SIZE_IN_BYTES)
.withBufferSize(BUFFER_SIZE)
.withSpillableMapBasePath(spillableMapDirectory)
.withDiskMapType(commonConfig.getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
.withLogBlockTimestamps(validInstantTimestamps)
.build();
Long logScannerOpenMs = timer.endTimer();
LOG.info(String.format("Opened %d metadata log files (dataset instant=%s, metadata instant=%s) in %d ms",
logFilePaths.size(), getLatestDataInstantTime(), latestMetadataInstantTime, logScannerOpenMs));
return Pair.of(logRecordScanner, logScannerOpenMs);
}
/**
* Returns a list of commits which were rolled back as part of a Rollback or Restore operation.
*
* @param instant The Rollback operation to read
* @param timeline instant of timeline from dataset.
*/
private List<String> getRollbackedCommits(HoodieInstant instant, HoodieActiveTimeline timeline) {
try {
if (instant.getAction().equals(HoodieTimeline.ROLLBACK_ACTION)) {
HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
timeline.getInstantDetails(instant).get());
return rollbackMetadata.getCommitsRollback();
}
List<String> rollbackedCommits = new LinkedList<>();
if (instant.getAction().equals(HoodieTimeline.RESTORE_ACTION)) {
// Restore is made up of several rollbacks
HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
timeline.getInstantDetails(instant).get());
restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> {
rms.forEach(rm -> rollbackedCommits.addAll(rm.getCommitsRollback()));
});
}
return rollbackedCommits;
} catch (IOException e) {
throw new HoodieMetadataException("Error retrieving rollback commits for instant " + instant, e);
}
}
@Override
public void close() {
for (String partitionName : partitionReaders.keySet()) {
close(partitionName);
}
partitionReaders.clear();
}
private synchronized void close(String partitionName) {
Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner> readers = partitionReaders.remove(partitionName);
if (readers != null) {
try {
if (readers.getKey() != null) {
readers.getKey().close();
}
if (readers.getValue() != null) {
readers.getValue().close();
}
} catch (Exception e) {
throw new HoodieException("Error closing resources during metadata table merge", e);
}
}
}
@@ -345,11 +342,33 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata {
return hadoopConf;
}
public HoodieTableMetaClient getMetaClient() {
return metaClient;
public HoodieTableMetaClient getMetadataMetaClient() {
return metadataMetaClient;
}
public Map<String, String> stats() {
return metrics.map(m -> m.getStats(true, metaClient, this)).orElse(new HashMap<>());
return metrics.map(m -> m.getStats(true, metadataMetaClient, this)).orElse(new HashMap<>());
}
@Override
public Option<String> getSyncedInstantTime() {
if (metadataMetaClient != null) {
Option<HoodieInstant> latestInstant = metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant();
if (latestInstant.isPresent()) {
return Option.of(latestInstant.get().getTimestamp());
}
}
return Option.empty();
}
@Override
public Option<String> getLatestCompactionTime() {
if (metadataMetaClient != null) {
Option<HoodieInstant> latestCompaction = metadataMetaClient.getActiveTimeline().getCommitTimeline().filterCompletedInstants().lastInstant();
if (latestCompaction.isPresent()) {
return Option.of(latestCompaction.get().getTimestamp());
}
}
return Option.empty();
}
}

View File

@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.log.InstantRange;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
@@ -37,15 +38,17 @@ import org.apache.hudi.common.util.collection.ExternalSpillableMap;
* useful in limiting memory usage when only a small subset of updates records are to be read.
*/
public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordScanner {
// Set of all record keys that are to be read in memory
private Set<String> mergeKeyFilter;
private HoodieMetadataMergedLogRecordScanner(FileSystem fs, String basePath, List<String> logFilePaths,
Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes, int bufferSize,
String spillableMapBasePath, Set<String> mergeKeyFilter,
ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled) {
ExternalSpillableMap.DiskMapType diskMapType, boolean isBitCaskDiskMapCompressionEnabled,
Option<InstantRange> instantRange) {
super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, maxMemorySizeInBytes, false, false, bufferSize,
spillableMapBasePath, Option.empty(), false, diskMapType, isBitCaskDiskMapCompressionEnabled, false);
spillableMapBasePath, instantRange, false, diskMapType, isBitCaskDiskMapCompressionEnabled, false);
this.mergeKeyFilter = mergeKeyFilter;
performScan();
@@ -88,59 +91,71 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS
public static class Builder extends HoodieMergedLogRecordScanner.Builder {
private Set<String> mergeKeyFilter = Collections.emptySet();
@Override
public Builder withFileSystem(FileSystem fs) {
this.fs = fs;
return this;
}
@Override
public Builder withBasePath(String basePath) {
this.basePath = basePath;
return this;
}
@Override
public Builder withLogFilePaths(List<String> logFilePaths) {
this.logFilePaths = logFilePaths;
return this;
}
@Override
public Builder withReaderSchema(Schema schema) {
this.readerSchema = schema;
return this;
}
@Override
public Builder withLatestInstantTime(String latestInstantTime) {
this.latestInstantTime = latestInstantTime;
return this;
}
@Override
public Builder withReadBlocksLazily(boolean readBlocksLazily) {
throw new UnsupportedOperationException();
}
@Override
public Builder withReverseReader(boolean reverseReader) {
throw new UnsupportedOperationException();
}
@Override
public Builder withBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}
@Override
public Builder withMaxMemorySizeInBytes(Long maxMemorySizeInBytes) {
this.maxMemorySizeInBytes = maxMemorySizeInBytes;
return this;
}
@Override
public Builder withSpillableMapBasePath(String spillableMapBasePath) {
this.spillableMapBasePath = spillableMapBasePath;
return this;
}
@Override
public Builder withDiskMapType(ExternalSpillableMap.DiskMapType diskMapType) {
this.diskMapType = diskMapType;
return this;
}
@Override
public Builder withBitCaskDiskMapCompressionEnabled(boolean isBitCaskDiskMapCompressionEnabled) {
this.isBitCaskDiskMapCompressionEnabled = isBitCaskDiskMapCompressionEnabled;
return this;
@@ -151,11 +166,33 @@ public class HoodieMetadataMergedLogRecordScanner extends HoodieMergedLogRecordS
return this;
}
public Builder withLogBlockTimestamps(Set<String> validLogBlockTimestamps) {
withInstantRange(Option.of(new ExplicitMatchRange(validLogBlockTimestamps)));
return this;
}
@Override
public HoodieMetadataMergedLogRecordScanner build() {
return new HoodieMetadataMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
latestInstantTime, maxMemorySizeInBytes, bufferSize, spillableMapBasePath, mergeKeyFilter,
diskMapType, isBitCaskDiskMapCompressionEnabled);
diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange);
}
}
/**
* Class to assist in checking if an instant is part of a set of instants.
*/
private static class ExplicitMatchRange extends InstantRange {
Set<String> instants;
public ExplicitMatchRange(Set<String> instants) {
super(Collections.min(instants), Collections.max(instants));
this.instants = instants;
}
@Override
public boolean isInRange(String instant) {
return this.instants.contains(instant);
}
}
}

View File

@@ -41,13 +41,9 @@ public class HoodieMetadataMetrics implements Serializable {
// Metric names
public static final String LOOKUP_PARTITIONS_STR = "lookup_partitions";
public static final String LOOKUP_FILES_STR = "lookup_files";
public static final String VALIDATE_PARTITIONS_STR = "validate_partitions";
public static final String VALIDATE_FILES_STR = "validate_files";
public static final String VALIDATE_ERRORS_STR = "validate_errors";
public static final String SCAN_STR = "scan";
public static final String BASEFILE_READ_STR = "basefile_read";
public static final String INITIALIZE_STR = "initialize";
public static final String SYNC_STR = "sync";
public static final String REBOOTSTRAP_STR = "rebootstrap";
public static final String BOOTSTRAP_ERR_STR = "bootstrap_error";
@@ -57,7 +53,6 @@ public class HoodieMetadataMetrics implements Serializable {
public static final String STAT_COUNT_BASE_FILES = "baseFileCount";
public static final String STAT_COUNT_LOG_FILES = "logFileCount";
public static final String STAT_COUNT_PARTITION = "partitionCount";
public static final String STAT_IN_SYNC = "isInSync";
public static final String STAT_LAST_COMPACTION_TIMESTAMP = "lastCompactionTimestamp";
private static final Logger LOG = LogManager.getLogger(HoodieMetadataMetrics.class);
@@ -82,32 +77,35 @@ public class HoodieMetadataMetrics implements Serializable {
Map<String, String> stats = new HashMap<>();
// Total size of the metadata and count of base/log files
long totalBaseFileSizeInBytes = 0;
long totalLogFileSizeInBytes = 0;
int baseFileCount = 0;
int logFileCount = 0;
List<FileSlice> latestSlices = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()).collect(Collectors.toList());
for (String metadataPartition : MetadataPartitionType.all()) {
List<FileSlice> latestSlices = fsView.getLatestFileSlices(metadataPartition).collect(Collectors.toList());
for (FileSlice slice : latestSlices) {
if (slice.getBaseFile().isPresent()) {
totalBaseFileSizeInBytes += slice.getBaseFile().get().getFileStatus().getLen();
++baseFileCount;
}
Iterator<HoodieLogFile> it = slice.getLogFiles().iterator();
while (it.hasNext()) {
totalLogFileSizeInBytes += it.next().getFileStatus().getLen();
++logFileCount;
// Total size of the metadata and count of base/log files
long totalBaseFileSizeInBytes = 0;
long totalLogFileSizeInBytes = 0;
int baseFileCount = 0;
int logFileCount = 0;
for (FileSlice slice : latestSlices) {
if (slice.getBaseFile().isPresent()) {
totalBaseFileSizeInBytes += slice.getBaseFile().get().getFileStatus().getLen();
++baseFileCount;
}
Iterator<HoodieLogFile> it = slice.getLogFiles().iterator();
while (it.hasNext()) {
totalLogFileSizeInBytes += it.next().getFileSize();
++logFileCount;
}
}
stats.put(metadataPartition + "." + STAT_TOTAL_BASE_FILE_SIZE, String.valueOf(totalBaseFileSizeInBytes));
stats.put(metadataPartition + "." + STAT_TOTAL_LOG_FILE_SIZE, String.valueOf(totalLogFileSizeInBytes));
stats.put(metadataPartition + "." + STAT_COUNT_BASE_FILES, String.valueOf(baseFileCount));
stats.put(metadataPartition + "." + STAT_COUNT_LOG_FILES, String.valueOf(logFileCount));
}
stats.put(HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE, String.valueOf(totalBaseFileSizeInBytes));
stats.put(HoodieMetadataMetrics.STAT_TOTAL_LOG_FILE_SIZE, String.valueOf(totalLogFileSizeInBytes));
stats.put(HoodieMetadataMetrics.STAT_COUNT_BASE_FILES, String.valueOf(baseFileCount));
stats.put(HoodieMetadataMetrics.STAT_COUNT_LOG_FILES, String.valueOf(logFileCount));
if (detailed) {
stats.put(HoodieMetadataMetrics.STAT_COUNT_PARTITION, String.valueOf(tableMetadata.getAllPartitionPaths().size()));
stats.put(HoodieMetadataMetrics.STAT_IN_SYNC, String.valueOf(tableMetadata.isInSync()));
}
return stats;
@@ -121,26 +119,20 @@ public class HoodieMetadataMetrics implements Serializable {
// Update sum of duration and total for count
String countKey = action + ".count";
String durationKey = action + ".totalDuration";
metricsRegistry.add(countKey, 1);
metricsRegistry.add(durationKey, durationInMs);
LOG.info(String.format("Updating metadata metrics (%s=%dms, %s=1)", durationKey, durationInMs, countKey));
incrementMetric(countKey, 1);
incrementMetric(durationKey, durationInMs);
}
public void updateMetrics(long totalBaseFileSizeInBytes, long totalLogFileSizeInBytes, int baseFileCount,
int logFileCount) {
if (metricsRegistry == null) {
return;
public void updateSizeMetrics(HoodieTableMetaClient metaClient, HoodieBackedTableMetadata metadata) {
Map<String, String> stats = getStats(false, metaClient, metadata);
for (Map.Entry<String, String> e : stats.entrySet()) {
incrementMetric(e.getKey(), Long.parseLong(e.getValue()));
}
}
// Set new size and count for metadata table's data files
metricsRegistry.set("basefile.size", totalBaseFileSizeInBytes);
metricsRegistry.set("logfile.size", totalLogFileSizeInBytes);
metricsRegistry.set("basefile.count", baseFileCount);
metricsRegistry.set("logfile.count", logFileCount);
LOG.info(String.format("Updating metadata size metrics (basefile.size=%d, logfile.size=%d, basefile.count=%d, "
+ "logfile.count=%d)", totalBaseFileSizeInBytes, totalLogFileSizeInBytes, baseFileCount, logFileCount));
protected void incrementMetric(String action, long value) {
LOG.info(String.format("Updating metadata metrics (%s=%d) in %s", action, value, metricsRegistry));
metricsRegistry.add(action, value);
}
public Registry registry() {

View File

@@ -23,7 +23,6 @@ import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -105,11 +104,12 @@ public interface HoodieTableMetadata extends Serializable, AutoCloseable {
Map<String, FileStatus[]> getAllFilesInPartitions(List<String> partitionPaths) throws IOException;
/**
* Get the instant time at which Metadata Table was last updated.
*
* This is the timestamp of the Instant on the dataset which was last synced to the Metadata Table.
* Get the instant time to which the metadata is synced w.r.t data timeline.
*/
Option<String> getUpdateTime();
Option<String> getSyncedInstantTime();
boolean isInSync();
/**
* Returns the timestamp of the latest compaction.
*/
Option<String> getLatestCompactionTime();
}

View File

@@ -19,28 +19,30 @@
package org.apache.hudi.metadata;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
@@ -59,56 +61,20 @@ public class HoodieTableMetadataUtil {
private static final Logger LOG = LogManager.getLogger(HoodieTableMetadataUtil.class);
/**
* Converts a timeline instant to metadata table records.
* Delete the metadata table for the dataset. This will be invoked during upgrade/downgrade operation during which no other
* process should be running.
*
* @param datasetMetaClient The meta client associated with the timeline instant
* @param metadataTableTimeline Current timeline of the Metadata Table
* @param instant to fetch and convert to metadata table records
* @return a list of metadata table records
* @throws IOException
* @param basePath base path of the dataset
* @param context instance of {@link HoodieEngineContext}.
*/
public static Option<List<HoodieRecord>> convertInstantToMetaRecords(HoodieTableMetaClient datasetMetaClient,
HoodieActiveTimeline metadataTableTimeline, HoodieInstant instant, Option<String> lastSyncTs) throws IOException {
HoodieTimeline timeline = datasetMetaClient.getActiveTimeline();
Option<List<HoodieRecord>> records = Option.empty();
ValidationUtils.checkArgument(instant.isCompleted(), "Only completed instants can be synced.");
switch (instant.getAction()) {
case HoodieTimeline.CLEAN_ACTION:
HoodieCleanMetadata cleanMetadata = CleanerUtils.getCleanerMetadata(datasetMetaClient, instant);
records = Option.of(convertMetadataToRecords(cleanMetadata, instant.getTimestamp()));
break;
case HoodieTimeline.DELTA_COMMIT_ACTION:
case HoodieTimeline.COMMIT_ACTION:
case HoodieTimeline.COMPACTION_ACTION:
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
timeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
records = Option.of(convertMetadataToRecords(commitMetadata, instant.getTimestamp()));
break;
case HoodieTimeline.ROLLBACK_ACTION:
HoodieRollbackMetadata rollbackMetadata = TimelineMetadataUtils.deserializeHoodieRollbackMetadata(
timeline.getInstantDetails(instant).get());
records = Option.of(convertMetadataToRecords(metadataTableTimeline, rollbackMetadata, instant.getTimestamp(), lastSyncTs));
break;
case HoodieTimeline.RESTORE_ACTION:
HoodieRestoreMetadata restoreMetadata = TimelineMetadataUtils.deserializeHoodieRestoreMetadata(
timeline.getInstantDetails(instant).get());
records = Option.of(convertMetadataToRecords(metadataTableTimeline, restoreMetadata, instant.getTimestamp(), lastSyncTs));
break;
case HoodieTimeline.SAVEPOINT_ACTION:
// Nothing to be done here
break;
case HoodieTimeline.REPLACE_COMMIT_ACTION:
HoodieReplaceCommitMetadata replaceMetadata = HoodieReplaceCommitMetadata.fromBytes(
timeline.getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
// Note: we only add new files created here. Replaced files are removed from metadata later by cleaner.
records = Option.of(convertMetadataToRecords(replaceMetadata, instant.getTimestamp()));
break;
default:
throw new HoodieException("Unknown type of action " + instant.getAction());
public static void deleteMetadataTable(String basePath, HoodieEngineContext context) {
final String metadataTablePath = HoodieTableMetadata.getMetadataTableBasePath(basePath);
FileSystem fs = FSUtils.getFs(metadataTablePath, context.getHadoopConf().get());
try {
fs.delete(new Path(metadataTablePath), true);
} catch (Exception e) {
throw new HoodieMetadataException("Failed to remove metadata table from path " + metadataTablePath, e);
}
return records;
}
/**
@@ -134,12 +100,11 @@ public class HoodieTableMetadataUtil {
return;
}
int offset = partition.equals(NON_PARTITIONED_NAME) ? 0 : partition.length() + 1;
int offset = partition.equals(NON_PARTITIONED_NAME) ? (pathWithPartition.startsWith("/") ? 1 : 0) : partition.length() + 1;
String filename = pathWithPartition.substring(offset);
ValidationUtils.checkState(!newFiles.containsKey(filename), "Duplicate files in HoodieCommitMetadata");
newFiles.put(filename, hoodieWriteStat.getTotalWriteBytes());
});
// New files added to a partition
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(
partition, Option.of(newFiles), Option.empty());
@@ -155,33 +120,6 @@ public class HoodieTableMetadataUtil {
return records;
}
/**
* Finds all files that will be deleted as part of a planned clean and creates metadata table records for them.
*
* @param cleanerPlan from timeline to convert
* @param instantTime
* @return a list of metadata table records
*/
public static List<HoodieRecord> convertMetadataToRecords(HoodieCleanerPlan cleanerPlan, String instantTime) {
List<HoodieRecord> records = new LinkedList<>();
int[] fileDeleteCount = {0};
cleanerPlan.getFilePathsToBeDeletedPerPartition().forEach((partition, deletedPathInfo) -> {
fileDeleteCount[0] += deletedPathInfo.size();
// Files deleted from a partition
List<String> deletedFilenames = deletedPathInfo.stream().map(p -> new Path(p.getFilePath()).getName())
.collect(Collectors.toList());
HoodieRecord record = HoodieMetadataPayload.createPartitionFilesRecord(partition, Option.empty(),
Option.of(deletedFilenames));
records.add(record);
});
LOG.info("Found at " + instantTime + " from CleanerPlan. #partitions_updated=" + records.size()
+ ", #files_deleted=" + fileDeleteCount[0]);
return records;
}
/**
* Finds all files that were deleted as part of a clean and creates metadata table records for them.
*
@@ -192,7 +130,6 @@ public class HoodieTableMetadataUtil {
public static List<HoodieRecord> convertMetadataToRecords(HoodieCleanMetadata cleanMetadata, String instantTime) {
List<HoodieRecord> records = new LinkedList<>();
int[] fileDeleteCount = {0};
cleanMetadata.getPartitionMetadata().forEach((partition, partitionMetadata) -> {
// Files deleted from a partition
List<String> deletedFiles = partitionMetadata.getDeletePathPatterns();
@@ -228,10 +165,17 @@ public class HoodieTableMetadataUtil {
}
public static List<HoodieRecord> convertMetadataToRecords(HoodieActiveTimeline metadataTableTimeline,
HoodieRollbackMetadata rollbackMetadata, String instantTime, Option<String> lastSyncTs) {
HoodieRollbackMetadata rollbackMetadata, String instantTime,
Option<String> lastSyncTs, boolean wasSynced) {
Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
processRollbackMetadata(metadataTableTimeline, rollbackMetadata, partitionToDeletedFiles, partitionToAppendedFiles, lastSyncTs);
if (!wasSynced) {
// Since the instant-being-rolled-back was never committed to the metadata table, the files added there
// need not be deleted. For MOR Table, the rollback appends logBlocks so we need to keep the appended files.
partitionToDeletedFiles.clear();
}
return convertFilesToRecords(partitionToDeletedFiles, partitionToAppendedFiles, instantTime, "Rollback");
}
@@ -249,7 +193,6 @@ public class HoodieTableMetadataUtil {
Map<String, List<String>> partitionToDeletedFiles,
Map<String, Map<String, Long>> partitionToAppendedFiles,
Option<String> lastSyncTs) {
rollbackMetadata.getPartitionMetadata().values().forEach(pm -> {
final String instantToRollback = rollbackMetadata.getCommitsRollback().get(0);
// Has this rollback produced new files?
@@ -368,4 +311,46 @@ public class HoodieTableMetadataUtil {
return records;
}
/**
* Map a record key to a file group in partition of interest.
*
* Note: For hashing, the algorithm is same as String.hashCode() but is being defined here as hashCode()
* implementation is not guaranteed by the JVM to be consistent across JVM versions and implementations.
*
* @param recordKey record key for which the file group index is looked up for.
* @return An integer hash of the given string
*/
public static int mapRecordKeyToFileGroupIndex(String recordKey, int numFileGroups) {
int h = 0;
for (int i = 0; i < recordKey.length(); ++i) {
h = 31 * h + recordKey.charAt(i);
}
return Math.abs(Math.abs(h) % numFileGroups);
}
/**
* Loads the list of file groups for a partition of the Metadata Table with latest file slices.
*
* The list of file slices returned is sorted in the correct order of file group name.
* @param metaClient instance of {@link HoodieTableMetaClient}.
* @param partition The name of the partition whose file groups are to be loaded.
* @return List of latest file slices for all file groups in a given partition.
*/
public static List<FileSlice> loadPartitionFileGroupsWithLatestFileSlices(HoodieTableMetaClient metaClient, String partition) {
LOG.info("Loading file groups for metadata table partition " + partition);
// If there are no commits on the metadata table then the table's default FileSystemView will not return any file
// slices even though we may have initialized them.
HoodieTimeline timeline = metaClient.getActiveTimeline();
if (timeline.empty()) {
final HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieActiveTimeline.createNewInstantTime());
timeline = new HoodieDefaultTimeline(Arrays.asList(instant).stream(), metaClient.getActiveTimeline()::getInstantDetails);
}
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, timeline);
return fsView.getLatestFileSlices(partition).sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId()))
.collect(Collectors.toList());
}
}

View File

@@ -18,16 +18,31 @@
package org.apache.hudi.metadata;
import java.util.Arrays;
import java.util.List;
public enum MetadataPartitionType {
FILES("files");
FILES("files", "files-");
// refers to partition path in metadata table.
private final String partitionPath;
// refers to fileId prefix used for all file groups in this partition.
private final String fileIdPrefix;
MetadataPartitionType(String partitionPath) {
MetadataPartitionType(String partitionPath, String fileIdPrefix) {
this.partitionPath = partitionPath;
this.fileIdPrefix = fileIdPrefix;
}
public String partitionPath() {
return partitionPath;
}
public String getFileIdPrefix() {
return fileIdPrefix;
}
public static List<String> all() {
return Arrays.asList(MetadataPartitionType.FILES.partitionPath());
}
}

View File

@@ -1,130 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.metadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
/**
* Provides functionality to convert timeline instants to table metadata records and then merge by key. Specify
* a filter to limit keys that are merged and stored in memory.
*/
public class TimelineMergedTableMetadata implements Serializable {
private static final Logger LOG = LogManager.getLogger(TimelineMergedTableMetadata.class);
HoodieTableMetaClient metaClient;
private List<HoodieInstant> instants;
private Option<String> lastSyncTs;
private Set<String> mergeKeyFilter;
private HoodieActiveTimeline metadataTableTimeline;
// keep it a simple hash map, so it can be easily passed onto the executors, once merged.
protected final Map<String, HoodieRecord<? extends HoodieRecordPayload>> timelineMergedRecords;
public TimelineMergedTableMetadata(HoodieTableMetaClient metaClient, HoodieActiveTimeline metadataTableTimeline,
List<HoodieInstant> instants, Option<String> lastSyncTs, Set<String> mergeKeyFilter) {
this.metaClient = metaClient;
this.instants = instants;
this.lastSyncTs = lastSyncTs;
this.mergeKeyFilter = mergeKeyFilter != null ? mergeKeyFilter : Collections.emptySet();
this.metadataTableTimeline = metadataTableTimeline;
this.timelineMergedRecords = new HashMap<>();
scan();
}
/**
* Converts instants in scanner to metadata table records and processes each record.
*
* @param
* @throws IOException
*/
private void scan() {
for (HoodieInstant instant : instants) {
try {
Option<List<HoodieRecord>> records = HoodieTableMetadataUtil.convertInstantToMetaRecords(metaClient,
metadataTableTimeline, instant, lastSyncTs);
if (records.isPresent()) {
records.get().forEach(record -> processNextRecord(record));
}
} catch (Exception e) {
LOG.error(String.format("Got exception when processing timeline instant %s", instant.getTimestamp()), e);
throw new HoodieException(String.format("Got exception when processing timeline instant %s", instant.getTimestamp()), e);
}
}
}
/**
* Process metadata table record by merging with existing record if it is a part of the key filter.
*
* @param hoodieRecord
*/
private void processNextRecord(HoodieRecord<? extends HoodieRecordPayload> hoodieRecord) {
String key = hoodieRecord.getRecordKey();
if (mergeKeyFilter.isEmpty() || mergeKeyFilter.contains(key)) {
if (timelineMergedRecords.containsKey(key)) {
// Merge and store the merged record
HoodieRecordPayload combinedValue = hoodieRecord.getData().preCombine(timelineMergedRecords.get(key).getData(), new Properties());
timelineMergedRecords.put(key, new HoodieRecord<>(new HoodieKey(key, hoodieRecord.getPartitionPath()), combinedValue));
} else {
// Put the record as is
timelineMergedRecords.put(key, hoodieRecord);
}
}
}
/**
* Retrieve merged hoodie record for given key.
*
* @param key of the record to retrieve
* @return {@code HoodieRecord} if key was found else {@code Option.empty()}
*/
public Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String key) {
return Option.ofNullable((HoodieRecord) timelineMergedRecords.get(key));
}
/**
* Returns the timestamp of the latest synced instant.
*/
public Option<String> getSyncedInstantTime() {
if (instants.isEmpty()) {
return Option.empty();
}
return Option.of(instants.get(instants.size() - 1).getTimestamp());
}
}