From 6763b45dd4be89a62f57623f60798d1f751abca4 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Thu, 17 Jun 2021 19:10:34 +0800 Subject: [PATCH] [HUDI-2030] Add metadata cache to WriteProfile to reduce IO (#3090) Keeps same number of instant metadata cache and refresh the cache on new commits. --- .../partitioner/profile/WriteProfile.java | 76 +++++++++++++------ .../partitioner/profile/WriteProfiles.java | 27 +++++++ .../sink/partitioner/TestBucketAssigner.java | 25 ++++++ 3 files changed, 106 insertions(+), 22 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java index 71d0d836a..3ccfb26d1 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java @@ -33,6 +33,7 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.SmallFile; import org.apache.hudi.util.StreamerUtil; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.core.fs.Path; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -44,7 +45,10 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Profiling of write statistics for {@link BucketAssigner}, @@ -101,6 +105,11 @@ public class WriteProfile { */ private final Configuration hadoopConf; + /** + * Metadata cache to reduce IO of metadata files. + */ + private final Map metadataCache; + public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) { this.config = config; this.basePath = new Path(config.getBasePath()); @@ -108,6 +117,7 @@ public class WriteProfile { this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize(); this.table = HoodieFlinkTable.create(config, context); this.hadoopConf = StreamerUtil.getHadoopConf(); + this.metadataCache = new HashMap<>(); // profile the record statistics on construction recordProfile(); } @@ -132,27 +142,28 @@ public class WriteProfile { long avgSize = config.getCopyOnWriteRecordSizeEstimate(); long fileSizeThreshold = (long) (config.getRecordSizeEstimationThreshold() * config.getParquetSmallFileLimit()); HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants(); - try { - if (!commitTimeline.empty()) { - // Go over the reverse ordered commits to get a more recent estimate of average record size. - Iterator instants = commitTimeline.getReverseOrderedInstants().iterator(); - while (instants.hasNext()) { - HoodieInstant instant = instants.next(); - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); - long totalBytesWritten = commitMetadata.fetchTotalBytesWritten(); - long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten(); - if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) { - avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten); - break; - } + if (!commitTimeline.empty()) { + // Go over the reverse ordered commits to get a more recent estimate of average record size. + Iterator instants = commitTimeline.getReverseOrderedInstants().iterator(); + while (instants.hasNext()) { + HoodieInstant instant = instants.next(); + final HoodieCommitMetadata commitMetadata = + this.metadataCache.computeIfAbsent( + instant.getTimestamp(), + k -> WriteProfiles.getCommitMetadataSafely(config.getTableName(), basePath, instant, commitTimeline) + .orElse(null)); + if (commitMetadata == null) { + continue; + } + long totalBytesWritten = commitMetadata.fetchTotalBytesWritten(); + long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten(); + if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) { + avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten); + break; } } - LOG.info("AvgRecordSize => " + avgSize); - } catch (Throwable t) { - // make this fail safe. - LOG.error("Error trying to compute average bytes/record ", t); } + LOG.info("Refresh average bytes per record => " + avgSize); return avgSize; } @@ -202,21 +213,37 @@ public class WriteProfile { return smallFileLocations; } - protected void initFSViewIfNecessary(HoodieTimeline commitTimeline) { + @VisibleForTesting + public void initFSViewIfNecessary(HoodieTimeline commitTimeline) { if (fsView == null) { + cleanMetadataCache(commitTimeline.getInstants()); List metadataList = commitTimeline.getInstants() - .map(instant -> WriteProfiles.getCommitMetadata(config.getTableName(), basePath, instant, commitTimeline)) + .map(instant -> + this.metadataCache.computeIfAbsent( + instant.getTimestamp(), + k -> WriteProfiles.getCommitMetadataSafely(config.getTableName(), basePath, instant, commitTimeline) + .orElse(null))) + .filter(Objects::nonNull) .collect(Collectors.toList()); FileStatus[] commitFiles = WriteProfiles.getWritePathsOfInstants(basePath, hadoopConf, metadataList); fsView = new HoodieTableFileSystemView(table.getMetaClient(), commitTimeline, commitFiles); } } + /** + * Remove the overdue metadata from the cache + * whose instant does not belong to the given instants {@code instants}. + */ + private void cleanMetadataCache(Stream instants) { + Set timestampSet = instants.map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); + this.metadataCache.keySet().retainAll(timestampSet); + } + private void recordProfile() { this.avgSize = averageBytesPerRecord(); if (config.shouldAllowMultiWriteOnSameInstant()) { this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize; - LOG.info("InsertRecordsPerBucket => " + recordsPerBucket); + LOG.info("Refresh insert records per bucket => " + recordsPerBucket); } } @@ -233,10 +260,15 @@ public class WriteProfile { // already reloaded return; } + this.table.getMetaClient().reloadActiveTimeline(); recordProfile(); this.fsView = null; this.smallFilesMap.clear(); - this.table.getMetaClient().reloadActiveTimeline(); this.reloadedCheckpointId = checkpointId; } + + @VisibleForTesting + public Map getMetadataCache() { + return this.metadataCache; + } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java index 9a8b7d002..3679c8a66 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; @@ -128,6 +129,32 @@ public class WriteProfiles { .collect(Collectors.toList()); } + /** + * Returns the commit metadata of the given instant safely. + * + * @param tableName The table name + * @param basePath The table base path + * @param instant The hoodie instant + * @param timeline The timeline + * + * @return the commit metadata or empty if any error occurs + */ + public static Option getCommitMetadataSafely( + String tableName, + Path basePath, + HoodieInstant instant, + HoodieTimeline timeline) { + byte[] data = timeline.getInstantDetails(instant).get(); + try { + return Option.of(HoodieCommitMetadata.fromBytes(data, HoodieCommitMetadata.class)); + } catch (IOException e) { + // make this fail safe. + LOG.error("Get write metadata for table {} with instant {} and path: {} error", + tableName, instant.getTimestamp(), basePath); + return Option.empty(); + } + } + /** * Returns the commit metadata of the given instant. * diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java index 3efa444b3..1fc6e29b7 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java @@ -23,6 +23,7 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.sink.partitioner.profile.WriteProfile; @@ -332,6 +333,30 @@ public class TestBucketAssigner { smallFiles4.get(0).location.getInstantTime(), is(instant2)); } + @Test + public void testWriteProfileMetadataCache() throws Exception { + WriteProfile writeProfile = new WriteProfile(writeConfig, context); + assertTrue(writeProfile.getMetadataCache().isEmpty(), "Empty table should no have any instant metadata"); + + HoodieTimeline emptyTimeline = writeProfile.getTable().getActiveTimeline(); + + // write 3 instants of data + for (int i = 0; i < 3; i++) { + TestData.writeData(TestData.DATA_SET_INSERT, conf); + } + writeProfile.reload(1); + assertThat("Metadata cache should have same number entries as timeline instants", + writeProfile.getMetadataCache().size(), is(3)); + + writeProfile.getSmallFiles("par1"); + assertThat("The metadata should be reused", + writeProfile.getMetadataCache().size(), is(3)); + + writeProfile.reload(2); + writeProfile.initFSViewIfNecessary(emptyTimeline); + assertTrue(writeProfile.getMetadataCache().isEmpty(), "Metadata cache should be all cleaned"); + } + private static Option getLastCompleteInstant(WriteProfile profile) { return profile.getTable().getMetaClient().getCommitsTimeline() .filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp);