[HUDI-2030] Add metadata cache to WriteProfile to reduce IO (#3090)
Keeps same number of instant metadata cache and refresh the cache on new commits.
This commit is contained in:
@@ -33,6 +33,7 @@ import org.apache.hudi.table.HoodieTable;
|
|||||||
import org.apache.hudi.table.action.commit.SmallFile;
|
import org.apache.hudi.table.action.commit.SmallFile;
|
||||||
import org.apache.hudi.util.StreamerUtil;
|
import org.apache.hudi.util.StreamerUtil;
|
||||||
|
|
||||||
|
import org.apache.flink.annotation.VisibleForTesting;
|
||||||
import org.apache.flink.core.fs.Path;
|
import org.apache.flink.core.fs.Path;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
@@ -44,7 +45,10 @@ import java.util.HashMap;
|
|||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Profiling of write statistics for {@link BucketAssigner},
|
* Profiling of write statistics for {@link BucketAssigner},
|
||||||
@@ -101,6 +105,11 @@ public class WriteProfile {
|
|||||||
*/
|
*/
|
||||||
private final Configuration hadoopConf;
|
private final Configuration hadoopConf;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Metadata cache to reduce IO of metadata files.
|
||||||
|
*/
|
||||||
|
private final Map<String, HoodieCommitMetadata> metadataCache;
|
||||||
|
|
||||||
public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
|
public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
|
||||||
this.config = config;
|
this.config = config;
|
||||||
this.basePath = new Path(config.getBasePath());
|
this.basePath = new Path(config.getBasePath());
|
||||||
@@ -108,6 +117,7 @@ public class WriteProfile {
|
|||||||
this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize();
|
this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize();
|
||||||
this.table = HoodieFlinkTable.create(config, context);
|
this.table = HoodieFlinkTable.create(config, context);
|
||||||
this.hadoopConf = StreamerUtil.getHadoopConf();
|
this.hadoopConf = StreamerUtil.getHadoopConf();
|
||||||
|
this.metadataCache = new HashMap<>();
|
||||||
// profile the record statistics on construction
|
// profile the record statistics on construction
|
||||||
recordProfile();
|
recordProfile();
|
||||||
}
|
}
|
||||||
@@ -132,27 +142,28 @@ public class WriteProfile {
|
|||||||
long avgSize = config.getCopyOnWriteRecordSizeEstimate();
|
long avgSize = config.getCopyOnWriteRecordSizeEstimate();
|
||||||
long fileSizeThreshold = (long) (config.getRecordSizeEstimationThreshold() * config.getParquetSmallFileLimit());
|
long fileSizeThreshold = (long) (config.getRecordSizeEstimationThreshold() * config.getParquetSmallFileLimit());
|
||||||
HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
|
HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
|
||||||
try {
|
if (!commitTimeline.empty()) {
|
||||||
if (!commitTimeline.empty()) {
|
// Go over the reverse ordered commits to get a more recent estimate of average record size.
|
||||||
// Go over the reverse ordered commits to get a more recent estimate of average record size.
|
Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();
|
||||||
Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();
|
while (instants.hasNext()) {
|
||||||
while (instants.hasNext()) {
|
HoodieInstant instant = instants.next();
|
||||||
HoodieInstant instant = instants.next();
|
final HoodieCommitMetadata commitMetadata =
|
||||||
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
|
this.metadataCache.computeIfAbsent(
|
||||||
.fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
|
instant.getTimestamp(),
|
||||||
long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
|
k -> WriteProfiles.getCommitMetadataSafely(config.getTableName(), basePath, instant, commitTimeline)
|
||||||
long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
|
.orElse(null));
|
||||||
if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
|
if (commitMetadata == null) {
|
||||||
avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
|
continue;
|
||||||
break;
|
}
|
||||||
}
|
long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
|
||||||
|
long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
|
||||||
|
if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
|
||||||
|
avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
LOG.info("AvgRecordSize => " + avgSize);
|
|
||||||
} catch (Throwable t) {
|
|
||||||
// make this fail safe.
|
|
||||||
LOG.error("Error trying to compute average bytes/record ", t);
|
|
||||||
}
|
}
|
||||||
|
LOG.info("Refresh average bytes per record => " + avgSize);
|
||||||
return avgSize;
|
return avgSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -202,21 +213,37 @@ public class WriteProfile {
|
|||||||
return smallFileLocations;
|
return smallFileLocations;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void initFSViewIfNecessary(HoodieTimeline commitTimeline) {
|
@VisibleForTesting
|
||||||
|
public void initFSViewIfNecessary(HoodieTimeline commitTimeline) {
|
||||||
if (fsView == null) {
|
if (fsView == null) {
|
||||||
|
cleanMetadataCache(commitTimeline.getInstants());
|
||||||
List<HoodieCommitMetadata> metadataList = commitTimeline.getInstants()
|
List<HoodieCommitMetadata> metadataList = commitTimeline.getInstants()
|
||||||
.map(instant -> WriteProfiles.getCommitMetadata(config.getTableName(), basePath, instant, commitTimeline))
|
.map(instant ->
|
||||||
|
this.metadataCache.computeIfAbsent(
|
||||||
|
instant.getTimestamp(),
|
||||||
|
k -> WriteProfiles.getCommitMetadataSafely(config.getTableName(), basePath, instant, commitTimeline)
|
||||||
|
.orElse(null)))
|
||||||
|
.filter(Objects::nonNull)
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
FileStatus[] commitFiles = WriteProfiles.getWritePathsOfInstants(basePath, hadoopConf, metadataList);
|
FileStatus[] commitFiles = WriteProfiles.getWritePathsOfInstants(basePath, hadoopConf, metadataList);
|
||||||
fsView = new HoodieTableFileSystemView(table.getMetaClient(), commitTimeline, commitFiles);
|
fsView = new HoodieTableFileSystemView(table.getMetaClient(), commitTimeline, commitFiles);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove the overdue metadata from the cache
|
||||||
|
* whose instant does not belong to the given instants {@code instants}.
|
||||||
|
*/
|
||||||
|
private void cleanMetadataCache(Stream<HoodieInstant> instants) {
|
||||||
|
Set<String> timestampSet = instants.map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
|
||||||
|
this.metadataCache.keySet().retainAll(timestampSet);
|
||||||
|
}
|
||||||
|
|
||||||
private void recordProfile() {
|
private void recordProfile() {
|
||||||
this.avgSize = averageBytesPerRecord();
|
this.avgSize = averageBytesPerRecord();
|
||||||
if (config.shouldAllowMultiWriteOnSameInstant()) {
|
if (config.shouldAllowMultiWriteOnSameInstant()) {
|
||||||
this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize;
|
this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize;
|
||||||
LOG.info("InsertRecordsPerBucket => " + recordsPerBucket);
|
LOG.info("Refresh insert records per bucket => " + recordsPerBucket);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -233,10 +260,15 @@ public class WriteProfile {
|
|||||||
// already reloaded
|
// already reloaded
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
this.table.getMetaClient().reloadActiveTimeline();
|
||||||
recordProfile();
|
recordProfile();
|
||||||
this.fsView = null;
|
this.fsView = null;
|
||||||
this.smallFilesMap.clear();
|
this.smallFilesMap.clear();
|
||||||
this.table.getMetaClient().reloadActiveTimeline();
|
|
||||||
this.reloadedCheckpointId = checkpointId;
|
this.reloadedCheckpointId = checkpointId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public Map<String, HoodieCommitMetadata> getMetadataCache() {
|
||||||
|
return this.metadataCache;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ import org.apache.hudi.common.fs.FSUtils;
|
|||||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.exception.HoodieException;
|
import org.apache.hudi.exception.HoodieException;
|
||||||
|
|
||||||
@@ -128,6 +129,32 @@ public class WriteProfiles {
|
|||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the commit metadata of the given instant safely.
|
||||||
|
*
|
||||||
|
* @param tableName The table name
|
||||||
|
* @param basePath The table base path
|
||||||
|
* @param instant The hoodie instant
|
||||||
|
* @param timeline The timeline
|
||||||
|
*
|
||||||
|
* @return the commit metadata or empty if any error occurs
|
||||||
|
*/
|
||||||
|
public static Option<HoodieCommitMetadata> getCommitMetadataSafely(
|
||||||
|
String tableName,
|
||||||
|
Path basePath,
|
||||||
|
HoodieInstant instant,
|
||||||
|
HoodieTimeline timeline) {
|
||||||
|
byte[] data = timeline.getInstantDetails(instant).get();
|
||||||
|
try {
|
||||||
|
return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class));
|
||||||
|
} catch (IOException e) {
|
||||||
|
// make this fail safe.
|
||||||
|
LOG.error("Get write metadata for table {} with instant {} and path: {} error",
|
||||||
|
tableName, instant.getTimestamp(), basePath);
|
||||||
|
return Option.empty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the commit metadata of the given instant.
|
* Returns the commit metadata of the given instant.
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -23,6 +23,7 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext;
|
|||||||
import org.apache.hudi.common.config.SerializableConfiguration;
|
import org.apache.hudi.common.config.SerializableConfiguration;
|
||||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
||||||
|
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.config.HoodieWriteConfig;
|
import org.apache.hudi.config.HoodieWriteConfig;
|
||||||
import org.apache.hudi.sink.partitioner.profile.WriteProfile;
|
import org.apache.hudi.sink.partitioner.profile.WriteProfile;
|
||||||
@@ -332,6 +333,30 @@ public class TestBucketAssigner {
|
|||||||
smallFiles4.get(0).location.getInstantTime(), is(instant2));
|
smallFiles4.get(0).location.getInstantTime(), is(instant2));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWriteProfileMetadataCache() throws Exception {
|
||||||
|
WriteProfile writeProfile = new WriteProfile(writeConfig, context);
|
||||||
|
assertTrue(writeProfile.getMetadataCache().isEmpty(), "Empty table should no have any instant metadata");
|
||||||
|
|
||||||
|
HoodieTimeline emptyTimeline = writeProfile.getTable().getActiveTimeline();
|
||||||
|
|
||||||
|
// write 3 instants of data
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
TestData.writeData(TestData.DATA_SET_INSERT, conf);
|
||||||
|
}
|
||||||
|
writeProfile.reload(1);
|
||||||
|
assertThat("Metadata cache should have same number entries as timeline instants",
|
||||||
|
writeProfile.getMetadataCache().size(), is(3));
|
||||||
|
|
||||||
|
writeProfile.getSmallFiles("par1");
|
||||||
|
assertThat("The metadata should be reused",
|
||||||
|
writeProfile.getMetadataCache().size(), is(3));
|
||||||
|
|
||||||
|
writeProfile.reload(2);
|
||||||
|
writeProfile.initFSViewIfNecessary(emptyTimeline);
|
||||||
|
assertTrue(writeProfile.getMetadataCache().isEmpty(), "Metadata cache should be all cleaned");
|
||||||
|
}
|
||||||
|
|
||||||
private static Option<String> getLastCompleteInstant(WriteProfile profile) {
|
private static Option<String> getLastCompleteInstant(WriteProfile profile) {
|
||||||
return profile.getTable().getMetaClient().getCommitsTimeline()
|
return profile.getTable().getMetaClient().getCommitsTimeline()
|
||||||
.filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp);
|
.filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp);
|
||||||
|
|||||||
Reference in New Issue
Block a user