diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 54a4a62eb..0954414eb 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -251,9 +251,14 @@ public class BucketAssignFunction> } @Override - public void notifyCheckpointComplete(long l) { + public void notifyCheckpointComplete(long checkpointId) { // Refresh the table state when there are new commits. - this.bucketAssigner.refreshTable(); + this.bucketAssigner.reload(checkpointId); + } + + @Override + public void close() throws Exception { + this.bucketAssigner.close(); } /** diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java index fab21d94c..e2f32937b 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java @@ -18,15 +18,10 @@ package org.apache.hudi.sink.partitioner; -import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieRecordLocation; -import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.sink.partitioner.profile.WriteProfile; +import org.apache.hudi.sink.partitioner.profile.WriteProfiles; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.BucketInfo; import org.apache.hudi.table.action.commit.BucketType; @@ -38,12 +33,9 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; /** * Bucket assigner that assigns the data buffer of one checkpoint into buckets. @@ -57,7 +49,7 @@ import java.util.stream.Collectors; *

Use {partition}_{fileId} as the bucket identifier, so that the bucket is unique * within and among partitions. */ -public class BucketAssigner { +public class BucketAssigner implements AutoCloseable { private static final Logger LOG = LogManager.getLogger(BucketAssigner.class); /** @@ -75,38 +67,20 @@ public class BucketAssigner { */ private final HashMap bucketInfoMap; - protected HoodieTable table; - - /** - * Fink engine context. - */ - private final HoodieFlinkEngineContext context; - /** * The write config. */ protected final HoodieWriteConfig config; /** - * The average record size. + * Write profile. */ - private final long averageRecordSize; + private final WriteProfile writeProfile; /** - * Total records to write for each bucket based on - * the config option {@link org.apache.hudi.config.HoodieStorageConfig#PARQUET_FILE_MAX_BYTES}. + * Partition path to small file assign mapping. */ - private final long insertRecordsPerBucket; - - /** - * Partition path to small files mapping. - */ - private final Map> partitionSmallFilesMap; - - /** - * Bucket ID(partition + fileId) -> small file assign state. - */ - private final Map smallFileAssignStates; + private final Map smallFileAssignMap; /** * Bucket ID(partition + fileId) -> new file assign state. @@ -116,25 +90,16 @@ public class BucketAssigner { public BucketAssigner( int taskID, int numTasks, - HoodieFlinkEngineContext context, + WriteProfile profile, HoodieWriteConfig config) { - bucketInfoMap = new HashMap<>(); - partitionSmallFilesMap = new HashMap<>(); - smallFileAssignStates = new HashMap<>(); - newFileAssignStates = new HashMap<>(); this.taskID = taskID; this.numTasks = numTasks; - this.context = context; this.config = config; - this.table = HoodieFlinkTable.create(this.config, this.context); - averageRecordSize = averageBytesPerRecord( - table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(), - config); - LOG.info("AvgRecordSize => " + averageRecordSize); - insertRecordsPerBucket = config.shouldAutoTuneInsertSplits() - ? config.getParquetMaxFileSize() / averageRecordSize - : config.getCopyOnWriteInsertSplitSize(); - LOG.info("InsertRecordsPerBucket => " + insertRecordsPerBucket); + this.writeProfile = profile; + + this.bucketInfoMap = new HashMap<>(); + this.smallFileAssignMap = new HashMap<>(); + this.newFileAssignStates = new HashMap<>(); } /** @@ -143,8 +108,6 @@ public class BucketAssigner { */ public void reset() { bucketInfoMap.clear(); - partitionSmallFilesMap.clear(); - smallFileAssignStates.clear(); newFileAssignStates.clear(); } @@ -160,25 +123,20 @@ public class BucketAssigner { public BucketInfo addInsert(String partitionPath) { // for new inserts, compute buckets depending on how many records we have for each partition - List smallFiles = getSmallFilesForPartition(partitionPath); + SmallFileAssign smallFileAssign = getSmallFileAssign(partitionPath); // first try packing this into one of the smallFiles - for (SmallFile smallFile : smallFiles) { - final String key = StreamerUtil.generateBucketKey(partitionPath, smallFile.location.getFileId()); - SmallFileAssignState assignState = smallFileAssignStates.get(key); - assert assignState != null; - if (assignState.canAssign()) { - assignState.assign(); - // create a new bucket or re-use an existing bucket - BucketInfo bucketInfo; - if (bucketInfoMap.containsKey(key)) { - // Assigns an inserts to existing update bucket - bucketInfo = bucketInfoMap.get(key); - } else { - bucketInfo = addUpdate(partitionPath, smallFile.location.getFileId()); - } - return bucketInfo; + if (smallFileAssign != null && smallFileAssign.assign()) { + final String key = StreamerUtil.generateBucketKey(partitionPath, smallFileAssign.getFileId()); + // create a new bucket or reuse an existing bucket + BucketInfo bucketInfo; + if (bucketInfoMap.containsKey(key)) { + // Assigns an inserts to existing update bucket + bucketInfo = bucketInfoMap.get(key); + } else { + bucketInfo = addUpdate(partitionPath, smallFileAssign.getFileId()); } + return bucketInfo; } // if we have anything more, create new insert buckets, like normal @@ -193,66 +151,37 @@ public class BucketAssigner { BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath); final String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix()); bucketInfoMap.put(key, bucketInfo); - newFileAssignStates.put(partitionPath, new NewFileAssignState(bucketInfo.getFileIdPrefix(), insertRecordsPerBucket)); + newFileAssignStates.put(partitionPath, new NewFileAssignState(bucketInfo.getFileIdPrefix(), writeProfile.getRecordsPerBucket())); return bucketInfo; } - private List getSmallFilesForPartition(String partitionPath) { - if (partitionSmallFilesMap.containsKey(partitionPath)) { - return partitionSmallFilesMap.get(partitionPath); + private SmallFileAssign getSmallFileAssign(String partitionPath) { + if (smallFileAssignMap.containsKey(partitionPath)) { + return smallFileAssignMap.get(partitionPath); } - List smallFiles = smallFilesOfThisTask(getSmallFiles(partitionPath)); + List smallFiles = smallFilesOfThisTask(writeProfile.getSmallFiles(partitionPath)); if (smallFiles.size() > 0) { LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles); - partitionSmallFilesMap.put(partitionPath, smallFiles); - smallFiles.forEach(smallFile -> - smallFileAssignStates.put( - StreamerUtil.generateBucketKey(partitionPath, smallFile.location.getFileId()), - new SmallFileAssignState(config.getParquetMaxFileSize(), smallFile, averageRecordSize))); - return smallFiles; + SmallFileAssignState[] states = smallFiles.stream() + .map(smallFile -> new SmallFileAssignState(config.getParquetMaxFileSize(), smallFile, writeProfile.getAvgSize())) + .toArray(SmallFileAssignState[]::new); + SmallFileAssign assign = new SmallFileAssign(states); + smallFileAssignMap.put(partitionPath, assign); + return assign; } - return Collections.emptyList(); + return null; } /** * Refresh the table state like TableFileSystemView and HoodieTimeline. */ - public void refreshTable() { - this.table = HoodieFlinkTable.create(this.config, this.context); + public void reload(long checkpointId) { + this.smallFileAssignMap.clear(); + this.writeProfile.reload(checkpointId); } public HoodieTable getTable() { - return table; - } - - /** - * Returns a list of small files in the given partition path. - */ - protected List getSmallFiles(String partitionPath) { - - // smallFiles only for partitionPath - List smallFileLocations = new ArrayList<>(); - - HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants(); - - if (!commitTimeline.empty()) { // if we have some commits - HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); - List allFiles = table.getBaseFileOnlyView() - .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList()); - - for (HoodieBaseFile file : allFiles) { - // filter out the corrupted files. - if (file.getFileSize() < config.getParquetSmallFileLimit() && file.getFileSize() > 0) { - String filename = file.getFileName(); - SmallFile sf = new SmallFile(); - sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); - sf.sizeBytes = file.getFileSize(); - smallFileLocations.add(sf); - } - } - } - - return smallFileLocations; + return this.writeProfile.getTable(); } private List smallFilesOfThisTask(List smallFiles) { @@ -264,34 +193,58 @@ public class BucketAssigner { return smallFilesOfThisTask; } + public void close() { + reset(); + WriteProfiles.clean(config.getBasePath()); + } + /** - * Obtains the average record size based on records written during previous commits. Used for estimating how many - * records pack into one file. + * Assigns the record to one of the small files under one partition. + * + *

The tool is initialized with an array of {@link SmallFileAssignState}s. + * A pointer points to the current small file we are ready to assign, + * if the current small file can not be assigned anymore (full assigned), the pointer + * move to next small file. + *

+   *       |  ->
+   *       V
+   *   | smallFile_1 | smallFile_2 | smallFile_3 | ... | smallFile_N |
+   * 
+ * + *

If all the small files are full assigned, a flag {@code noSpace} was marked to true, and + * we can return early for future check. */ - protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) { - long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate(); - long fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit()); - try { - if (!commitTimeline.empty()) { - // Go over the reverse ordered commits to get a more recent estimate of average record size. - Iterator instants = commitTimeline.getReverseOrderedInstants().iterator(); - while (instants.hasNext()) { - HoodieInstant instant = instants.next(); - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); - long totalBytesWritten = commitMetadata.fetchTotalBytesWritten(); - long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten(); - if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) { - avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten); - break; - } - } - } - } catch (Throwable t) { - // make this fail safe. - LOG.error("Error trying to compute average bytes/record ", t); + private static class SmallFileAssign { + final SmallFileAssignState[] states; + int assignIdx = 0; + boolean noSpace = false; + + SmallFileAssign(SmallFileAssignState[] states) { + this.states = states; + } + + public boolean assign() { + if (noSpace) { + return false; + } + SmallFileAssignState state = states[assignIdx]; + if (!state.canAssign()) { + assignIdx += 1; + if (assignIdx >= states.length) { + noSpace = true; + return false; + } + // move to next slot if possible + state = states[assignIdx]; + assert state.canAssign(); + } + state.assign(); + return true; + } + + public String getFileId() { + return states[assignIdx].fileId; } - return avgSize; } /** @@ -301,10 +254,12 @@ public class BucketAssigner { private static class SmallFileAssignState { long assigned; long totalUnassigned; + final String fileId; SmallFileAssignState(long parquetMaxFileSize, SmallFile smallFile, long averageRecordSize) { this.assigned = 0; this.totalUnassigned = (parquetMaxFileSize - smallFile.sizeBytes) / averageRecordSize; + this.fileId = smallFile.location.getFileId(); } public boolean canAssign() { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java index 1c28e6d18..354879059 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigners.java @@ -21,7 +21,8 @@ package org.apache.hudi.sink.partitioner; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.sink.partitioner.delta.DeltaBucketAssigner; +import org.apache.hudi.sink.partitioner.profile.WriteProfile; +import org.apache.hudi.sink.partitioner.profile.WriteProfiles; /** * Utilities for {@code BucketAssigner}. @@ -35,7 +36,7 @@ public abstract class BucketAssigners { * * @param taskID The task ID * @param numTasks The number of tasks - * @param isOverwrite Whether the write operation is OVERWRITE + * @param overwrite Whether the write operation is OVERWRITE * @param tableType The table type * @param context The engine context * @param config The configuration @@ -44,20 +45,12 @@ public abstract class BucketAssigners { public static BucketAssigner create( int taskID, int numTasks, - boolean isOverwrite, + boolean overwrite, HoodieTableType tableType, HoodieFlinkEngineContext context, HoodieWriteConfig config) { - if (isOverwrite) { - return new OverwriteBucketAssigner(taskID, numTasks, context, config); - } - switch (tableType) { - case COPY_ON_WRITE: - return new BucketAssigner(taskID, numTasks, context, config); - case MERGE_ON_READ: - return new DeltaBucketAssigner(taskID, numTasks, context, config); - default: - throw new AssertionError(); - } + boolean delta = tableType.equals(HoodieTableType.MERGE_ON_READ); + WriteProfile writeProfile = WriteProfiles.singleton(overwrite, delta, config, context); + return new BucketAssigner(taskID, numTasks, writeProfile, config); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/delta/DeltaBucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java similarity index 90% rename from hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/delta/DeltaBucketAssigner.java rename to hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java index deb8250e5..8f8c692e7 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/delta/DeltaBucketAssigner.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.sink.partitioner.delta; +package org.apache.hudi.sink.partitioner.profile; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.fs.FSUtils; @@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.sink.partitioner.BucketAssigner; import org.apache.hudi.table.action.commit.SmallFile; import java.util.ArrayList; @@ -34,22 +33,18 @@ import java.util.List; import java.util.stream.Collectors; /** - * BucketAssigner for MERGE_ON_READ table type, this allows auto correction of small parquet files to larger ones + * WriteProfile for MERGE_ON_READ table type, this allows auto correction of small parquet files to larger ones * without the need for an index in the logFile. * *

Note: assumes the index can always index log files for Flink write. */ -public class DeltaBucketAssigner extends BucketAssigner { - public DeltaBucketAssigner( - int taskID, - int numTasks, - HoodieFlinkEngineContext context, - HoodieWriteConfig config) { - super(taskID, numTasks, context, config); +public class DeltaWriteProfile extends WriteProfile { + public DeltaWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) { + super(config, context); } @Override - protected List getSmallFiles(String partitionPath) { + protected List smallFilesProfile(String partitionPath) { // smallFiles only for partitionPath List smallFileLocations = new ArrayList<>(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/OverwriteBucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/OverwriteWriteProfile.java similarity index 68% rename from hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/OverwriteBucketAssigner.java rename to hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/OverwriteWriteProfile.java index 7e2320e0b..8b084462c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/OverwriteBucketAssigner.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/OverwriteWriteProfile.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hudi.sink.partitioner; +package org.apache.hudi.sink.partitioner.profile; import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.config.HoodieWriteConfig; @@ -26,22 +26,18 @@ import java.util.Collections; import java.util.List; /** - * BucketAssigner for INSERT OVERWRITE and INSERT OVERWRITE TABLE operations, - * this assigner always skip the existing small files because of the 'OVERWRITE' semantics. + * WriteProfile for INSERT OVERWRITE and INSERT OVERWRITE TABLE operations, + * this WriteProfile always skip the existing small files because of the 'OVERWRITE' semantics. * *

Note: assumes the index can always index log files for Flink write. */ -public class OverwriteBucketAssigner extends BucketAssigner { - public OverwriteBucketAssigner( - int taskID, - int numTasks, - HoodieFlinkEngineContext context, - HoodieWriteConfig config) { - super(taskID, numTasks, context, config); +public class OverwriteWriteProfile extends WriteProfile { + public OverwriteWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) { + super(config, context); } @Override - protected List getSmallFiles(String partitionPath) { + protected List smallFilesProfile(String partitionPath) { return Collections.emptyList(); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java new file mode 100644 index 000000000..3aee176e2 --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.partitioner.profile; + +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.sink.partitioner.BucketAssigner; +import org.apache.hudi.table.HoodieFlinkTable; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.commit.SmallFile; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Profiling of write statistics for {@link BucketAssigner}, + * such as the average record size and small files. + * + *

The profile is re-constructed when there are new commits on the timeline. + */ +public class WriteProfile { + private static final Logger LOG = LoggerFactory.getLogger(WriteProfile.class); + + /** + * The write config. + */ + protected final HoodieWriteConfig config; + + /** + * The hoodie table. + */ + protected final HoodieTable table; + + /** + * The average record size. + */ + private long avgSize = -1L; + + /** + * Total records to write for each bucket based on + * the config option {@link org.apache.hudi.config.HoodieStorageConfig#PARQUET_FILE_MAX_BYTES}. + */ + private long recordsPerBucket; + + /** + * Partition path to small files mapping. + */ + private final Map> smallFilesMap; + + /** + * Checkpoint id to avoid redundant reload. + */ + private long reloadedCheckpointId; + + public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) { + this.config = config; + this.smallFilesMap = new HashMap<>(); + this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize(); + this.table = HoodieFlinkTable.create(config, context); + // profile the record statistics on construction + recordProfile(); + } + + public long getAvgSize() { + return avgSize; + } + + public long getRecordsPerBucket() { + return recordsPerBucket; + } + + public HoodieTable getTable() { + return table; + } + + /** + * Obtains the average record size based on records written during previous commits. Used for estimating how many + * records pack into one file. + */ + private long averageBytesPerRecord() { + long avgSize = config.getCopyOnWriteRecordSizeEstimate(); + long fileSizeThreshold = (long) (config.getRecordSizeEstimationThreshold() * config.getParquetSmallFileLimit()); + HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants(); + try { + if (!commitTimeline.empty()) { + // Go over the reverse ordered commits to get a more recent estimate of average record size. + Iterator instants = commitTimeline.getReverseOrderedInstants().iterator(); + while (instants.hasNext()) { + HoodieInstant instant = instants.next(); + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); + long totalBytesWritten = commitMetadata.fetchTotalBytesWritten(); + long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten(); + if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) { + avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten); + break; + } + } + } + LOG.info("AvgRecordSize => " + avgSize); + } catch (Throwable t) { + // make this fail safe. + LOG.error("Error trying to compute average bytes/record ", t); + } + return avgSize; + } + + /** + * Returns a list of small files in the given partition path. + * + *

Note: This method should be thread safe. + */ + public synchronized List getSmallFiles(String partitionPath) { + // lookup the cache first + if (smallFilesMap.containsKey(partitionPath)) { + return smallFilesMap.get(partitionPath); + } + List smallFiles = smallFilesProfile(partitionPath); + this.smallFilesMap.put(partitionPath, smallFiles); + return smallFiles; + } + + /** + * Returns a list of small files in the given partition path from the latest filesystem view. + */ + protected List smallFilesProfile(String partitionPath) { + // smallFiles only for partitionPath + List smallFileLocations = new ArrayList<>(); + + HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants(); + + if (!commitTimeline.empty()) { // if we have some commits + HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); + List allFiles = table.getBaseFileOnlyView() + .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList()); + + for (HoodieBaseFile file : allFiles) { + // filter out the corrupted files. + if (file.getFileSize() < config.getParquetSmallFileLimit() && file.getFileSize() > 0) { + String filename = file.getFileName(); + SmallFile sf = new SmallFile(); + sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); + sf.sizeBytes = file.getFileSize(); + smallFileLocations.add(sf); + } + } + } + + return smallFileLocations; + } + + private void recordProfile() { + this.avgSize = averageBytesPerRecord(); + if (config.shouldAllowMultiWriteOnSameInstant()) { + this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize; + LOG.info("InsertRecordsPerBucket => " + recordsPerBucket); + } + } + + /** + * Reload the write profile, should do once for each checkpoint. + * + *

We do these things: i). reload the timeline; ii). re-construct the record profile; + * iii) clean the small files cache. + * + *

Note: This method should be thread safe. + */ + public synchronized void reload(long checkpointId) { + if (this.reloadedCheckpointId >= checkpointId) { + // already reloaded + return; + } + recordProfile(); + this.smallFilesMap.clear(); + this.table.getMetaClient().reloadActiveTimeline(); + this.reloadedCheckpointId = checkpointId; + } +} diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java new file mode 100644 index 000000000..cf99ab32f --- /dev/null +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.partitioner.profile; + +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.config.HoodieWriteConfig; + +import java.util.HashMap; +import java.util.Map; + +/** + * Factory for {@link WriteProfile}. + */ +public class WriteProfiles { + private static final Map PROFILES = new HashMap<>(); + + private WriteProfiles() {} + + public static synchronized WriteProfile singleton( + boolean overwrite, + boolean delta, + HoodieWriteConfig config, + HoodieFlinkEngineContext context) { + return PROFILES.computeIfAbsent(config.getBasePath(), + k -> getWriteProfile(overwrite, delta, config, context)); + } + + private static WriteProfile getWriteProfile( + boolean overwrite, + boolean delta, + HoodieWriteConfig config, + HoodieFlinkEngineContext context) { + if (overwrite) { + return new OverwriteWriteProfile(config, context); + } else if (delta) { + return new DeltaWriteProfile(config, context); + } else { + return new WriteProfile(config, context); + } + } + + public static void clean(String path) { + PROFILES.remove(path); + } +} diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java index 33457b509..1c895b633 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java @@ -23,6 +23,7 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.sink.partitioner.profile.WriteProfile; import org.apache.hudi.table.action.commit.BucketInfo; import org.apache.hudi.table.action.commit.BucketType; import org.apache.hudi.table.action.commit.SmallFile; @@ -311,7 +312,6 @@ public class TestBucketAssigner { * Mock BucketAssigner that can specify small files explicitly. */ static class MockBucketAssigner extends BucketAssigner { - private final Map> smallFilesMap; MockBucketAssigner( HoodieFlinkEngineContext context, @@ -332,12 +332,23 @@ public class TestBucketAssigner { HoodieFlinkEngineContext context, HoodieWriteConfig config, Map> smallFilesMap) { - super(taskID, numTasks, context, config); + super(taskID, numTasks, new MockWriteProfile(config, context, smallFilesMap), config); + } + } + + /** + * Mock WriteProfile that can specify small files explicitly. + */ + static class MockWriteProfile extends WriteProfile { + private final Map> smallFilesMap; + + public MockWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context, Map> smallFilesMap) { + super(config, context); this.smallFilesMap = smallFilesMap; } @Override - protected List getSmallFiles(String partitionPath) { + protected List smallFilesProfile(String partitionPath) { if (this.smallFilesMap.containsKey(partitionPath)) { return this.smallFilesMap.get(partitionPath); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index c0dbfce62..b21f8bb1c 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -23,15 +23,12 @@ import org.apache.hudi.streamer.FlinkStreamerConfig; import org.apache.hudi.utils.factory.CollectSinkTableFactory; import org.apache.hudi.utils.factory.ContinuousFileSourceFactory; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.RowType; -import org.apache.flink.types.Row; import java.util.Map; import java.util.Objects; @@ -59,13 +56,6 @@ public class TestConfigurations { ROW_DATA_TYPE.getChildren().toArray(new DataType[0])) .build(); - public static final TypeInformation ROW_TYPE_INFO = Types.ROW( - Types.STRING, - Types.STRING, - Types.INT, - Types.LOCAL_DATE_TIME, - Types.STRING); - public static String getCreateHoodieTableDDL(String tableName, Map options) { String createTable = "create table " + tableName + "(\n" + " uuid varchar(20),\n" diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java index 68e183b87..6dc9325fb 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/factory/CollectSinkTableFactory.java @@ -18,10 +18,9 @@ package org.apache.hudi.utils.factory; -import org.apache.hudi.utils.TestConfigurations; - import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -35,6 +34,8 @@ import org.apache.flink.table.connector.sink.SinkFunctionProvider; import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.utils.TypeConversions; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; @@ -112,8 +113,10 @@ public class CollectSinkTableFactory implements DynamicTableSinkFactory { @Override public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { + final DataType rowType = schema.toPhysicalRowDataType(); + final RowTypeInfo rowTypeInfo = (RowTypeInfo) TypeConversions.fromDataTypeToLegacyInfo(rowType); DataStructureConverter converter = context.createDataStructureConverter(schema.toPhysicalRowDataType()); - return SinkFunctionProvider.of(new CollectSinkFunction(converter)); + return SinkFunctionProvider.of(new CollectSinkFunction(converter, rowTypeInfo)); } @Override @@ -131,14 +134,16 @@ public class CollectSinkTableFactory implements DynamicTableSinkFactory { private static final long serialVersionUID = 1L; private final DynamicTableSink.DataStructureConverter converter; + private final RowTypeInfo rowTypeInfo; protected transient ListState resultState; protected transient List localResult; private int taskID; - protected CollectSinkFunction(DynamicTableSink.DataStructureConverter converter) { + protected CollectSinkFunction(DynamicTableSink.DataStructureConverter converter, RowTypeInfo rowTypeInfo) { this.converter = converter; + this.rowTypeInfo = rowTypeInfo; } @Override @@ -151,7 +156,7 @@ public class CollectSinkTableFactory implements DynamicTableSinkFactory { @Override public void initializeState(FunctionInitializationContext context) throws Exception { this.resultState = context.getOperatorStateStore().getListState( - new ListStateDescriptor<>("sink-results", TestConfigurations.ROW_TYPE_INFO)); + new ListStateDescriptor<>("sink-results", rowTypeInfo)); this.localResult = new ArrayList<>(); if (context.isRestored()) { for (Row value : resultState.get()) {