1
0

[HUDI-1949] Refactor BucketAssigner to make it more efficient (#3017)

Add a process single class WriteProfile, the record and small files
profile re-construction can be more efficient if we reuse by same
checkpoint id.
This commit is contained in:
Danny Chan
2021-06-02 09:12:35 +08:00
committed by GitHub
parent 83c31e356f
commit bf1cfb5635
10 changed files with 410 additions and 192 deletions

View File

@@ -251,9 +251,14 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
} }
@Override @Override
public void notifyCheckpointComplete(long l) { public void notifyCheckpointComplete(long checkpointId) {
// Refresh the table state when there are new commits. // Refresh the table state when there are new commits.
this.bucketAssigner.refreshTable(); this.bucketAssigner.reload(checkpointId);
}
@Override
public void close() throws Exception {
this.bucketAssigner.close();
} }
/** /**

View File

@@ -18,15 +18,10 @@
package org.apache.hudi.sink.partitioner; package org.apache.hudi.sink.partitioner;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.sink.partitioner.profile.WriteProfile;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BucketInfo; import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType; import org.apache.hudi.table.action.commit.BucketType;
@@ -38,12 +33,9 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
/** /**
* Bucket assigner that assigns the data buffer of one checkpoint into buckets. * Bucket assigner that assigns the data buffer of one checkpoint into buckets.
@@ -57,7 +49,7 @@ import java.util.stream.Collectors;
* <p>Use {partition}_{fileId} as the bucket identifier, so that the bucket is unique * <p>Use {partition}_{fileId} as the bucket identifier, so that the bucket is unique
* within and among partitions. * within and among partitions.
*/ */
public class BucketAssigner { public class BucketAssigner implements AutoCloseable {
private static final Logger LOG = LogManager.getLogger(BucketAssigner.class); private static final Logger LOG = LogManager.getLogger(BucketAssigner.class);
/** /**
@@ -75,38 +67,20 @@ public class BucketAssigner {
*/ */
private final HashMap<String, BucketInfo> bucketInfoMap; private final HashMap<String, BucketInfo> bucketInfoMap;
protected HoodieTable<?, ?, ?, ?> table;
/**
* Fink engine context.
*/
private final HoodieFlinkEngineContext context;
/** /**
* The write config. * The write config.
*/ */
protected final HoodieWriteConfig config; protected final HoodieWriteConfig config;
/** /**
* The average record size. * Write profile.
*/ */
private final long averageRecordSize; private final WriteProfile writeProfile;
/** /**
* Total records to write for each bucket based on * Partition path to small file assign mapping.
* the config option {@link org.apache.hudi.config.HoodieStorageConfig#PARQUET_FILE_MAX_BYTES}.
*/ */
private final long insertRecordsPerBucket; private final Map<String, SmallFileAssign> smallFileAssignMap;
/**
* Partition path to small files mapping.
*/
private final Map<String, List<SmallFile>> partitionSmallFilesMap;
/**
* Bucket ID(partition + fileId) -> small file assign state.
*/
private final Map<String, SmallFileAssignState> smallFileAssignStates;
/** /**
* Bucket ID(partition + fileId) -> new file assign state. * Bucket ID(partition + fileId) -> new file assign state.
@@ -116,25 +90,16 @@ public class BucketAssigner {
public BucketAssigner( public BucketAssigner(
int taskID, int taskID,
int numTasks, int numTasks,
HoodieFlinkEngineContext context, WriteProfile profile,
HoodieWriteConfig config) { HoodieWriteConfig config) {
bucketInfoMap = new HashMap<>();
partitionSmallFilesMap = new HashMap<>();
smallFileAssignStates = new HashMap<>();
newFileAssignStates = new HashMap<>();
this.taskID = taskID; this.taskID = taskID;
this.numTasks = numTasks; this.numTasks = numTasks;
this.context = context;
this.config = config; this.config = config;
this.table = HoodieFlinkTable.create(this.config, this.context); this.writeProfile = profile;
averageRecordSize = averageBytesPerRecord(
table.getMetaClient().getActiveTimeline().getCommitTimeline().filterCompletedInstants(), this.bucketInfoMap = new HashMap<>();
config); this.smallFileAssignMap = new HashMap<>();
LOG.info("AvgRecordSize => " + averageRecordSize); this.newFileAssignStates = new HashMap<>();
insertRecordsPerBucket = config.shouldAutoTuneInsertSplits()
? config.getParquetMaxFileSize() / averageRecordSize
: config.getCopyOnWriteInsertSplitSize();
LOG.info("InsertRecordsPerBucket => " + insertRecordsPerBucket);
} }
/** /**
@@ -143,8 +108,6 @@ public class BucketAssigner {
*/ */
public void reset() { public void reset() {
bucketInfoMap.clear(); bucketInfoMap.clear();
partitionSmallFilesMap.clear();
smallFileAssignStates.clear();
newFileAssignStates.clear(); newFileAssignStates.clear();
} }
@@ -160,26 +123,21 @@ public class BucketAssigner {
public BucketInfo addInsert(String partitionPath) { public BucketInfo addInsert(String partitionPath) {
// for new inserts, compute buckets depending on how many records we have for each partition // for new inserts, compute buckets depending on how many records we have for each partition
List<SmallFile> smallFiles = getSmallFilesForPartition(partitionPath); SmallFileAssign smallFileAssign = getSmallFileAssign(partitionPath);
// first try packing this into one of the smallFiles // first try packing this into one of the smallFiles
for (SmallFile smallFile : smallFiles) { if (smallFileAssign != null && smallFileAssign.assign()) {
final String key = StreamerUtil.generateBucketKey(partitionPath, smallFile.location.getFileId()); final String key = StreamerUtil.generateBucketKey(partitionPath, smallFileAssign.getFileId());
SmallFileAssignState assignState = smallFileAssignStates.get(key); // create a new bucket or reuse an existing bucket
assert assignState != null;
if (assignState.canAssign()) {
assignState.assign();
// create a new bucket or re-use an existing bucket
BucketInfo bucketInfo; BucketInfo bucketInfo;
if (bucketInfoMap.containsKey(key)) { if (bucketInfoMap.containsKey(key)) {
// Assigns an inserts to existing update bucket // Assigns an inserts to existing update bucket
bucketInfo = bucketInfoMap.get(key); bucketInfo = bucketInfoMap.get(key);
} else { } else {
bucketInfo = addUpdate(partitionPath, smallFile.location.getFileId()); bucketInfo = addUpdate(partitionPath, smallFileAssign.getFileId());
} }
return bucketInfo; return bucketInfo;
} }
}
// if we have anything more, create new insert buckets, like normal // if we have anything more, create new insert buckets, like normal
if (newFileAssignStates.containsKey(partitionPath)) { if (newFileAssignStates.containsKey(partitionPath)) {
@@ -193,66 +151,37 @@ public class BucketAssigner {
BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath); BucketInfo bucketInfo = new BucketInfo(BucketType.INSERT, FSUtils.createNewFileIdPfx(), partitionPath);
final String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix()); final String key = StreamerUtil.generateBucketKey(partitionPath, bucketInfo.getFileIdPrefix());
bucketInfoMap.put(key, bucketInfo); bucketInfoMap.put(key, bucketInfo);
newFileAssignStates.put(partitionPath, new NewFileAssignState(bucketInfo.getFileIdPrefix(), insertRecordsPerBucket)); newFileAssignStates.put(partitionPath, new NewFileAssignState(bucketInfo.getFileIdPrefix(), writeProfile.getRecordsPerBucket()));
return bucketInfo; return bucketInfo;
} }
private List<SmallFile> getSmallFilesForPartition(String partitionPath) { private SmallFileAssign getSmallFileAssign(String partitionPath) {
if (partitionSmallFilesMap.containsKey(partitionPath)) { if (smallFileAssignMap.containsKey(partitionPath)) {
return partitionSmallFilesMap.get(partitionPath); return smallFileAssignMap.get(partitionPath);
} }
List<SmallFile> smallFiles = smallFilesOfThisTask(getSmallFiles(partitionPath)); List<SmallFile> smallFiles = smallFilesOfThisTask(writeProfile.getSmallFiles(partitionPath));
if (smallFiles.size() > 0) { if (smallFiles.size() > 0) {
LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles); LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);
partitionSmallFilesMap.put(partitionPath, smallFiles); SmallFileAssignState[] states = smallFiles.stream()
smallFiles.forEach(smallFile -> .map(smallFile -> new SmallFileAssignState(config.getParquetMaxFileSize(), smallFile, writeProfile.getAvgSize()))
smallFileAssignStates.put( .toArray(SmallFileAssignState[]::new);
StreamerUtil.generateBucketKey(partitionPath, smallFile.location.getFileId()), SmallFileAssign assign = new SmallFileAssign(states);
new SmallFileAssignState(config.getParquetMaxFileSize(), smallFile, averageRecordSize))); smallFileAssignMap.put(partitionPath, assign);
return smallFiles; return assign;
} }
return Collections.emptyList(); return null;
} }
/** /**
* Refresh the table state like TableFileSystemView and HoodieTimeline. * Refresh the table state like TableFileSystemView and HoodieTimeline.
*/ */
public void refreshTable() { public void reload(long checkpointId) {
this.table = HoodieFlinkTable.create(this.config, this.context); this.smallFileAssignMap.clear();
this.writeProfile.reload(checkpointId);
} }
public HoodieTable<?, ?, ?, ?> getTable() { public HoodieTable<?, ?, ?, ?> getTable() {
return table; return this.writeProfile.getTable();
}
/**
* Returns a list of small files in the given partition path.
*/
protected List<SmallFile> getSmallFiles(String partitionPath) {
// smallFiles only for partitionPath
List<SmallFile> smallFileLocations = new ArrayList<>();
HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
if (!commitTimeline.empty()) { // if we have some commits
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
for (HoodieBaseFile file : allFiles) {
// filter out the corrupted files.
if (file.getFileSize() < config.getParquetSmallFileLimit() && file.getFileSize() > 0) {
String filename = file.getFileName();
SmallFile sf = new SmallFile();
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
sf.sizeBytes = file.getFileSize();
smallFileLocations.add(sf);
}
}
}
return smallFileLocations;
} }
private List<SmallFile> smallFilesOfThisTask(List<SmallFile> smallFiles) { private List<SmallFile> smallFilesOfThisTask(List<SmallFile> smallFiles) {
@@ -264,34 +193,58 @@ public class BucketAssigner {
return smallFilesOfThisTask; return smallFilesOfThisTask;
} }
public void close() {
reset();
WriteProfiles.clean(config.getBasePath());
}
/** /**
* Obtains the average record size based on records written during previous commits. Used for estimating how many * Assigns the record to one of the small files under one partition.
* records pack into one file. *
* <p> The tool is initialized with an array of {@link SmallFileAssignState}s.
* A pointer points to the current small file we are ready to assign,
* if the current small file can not be assigned anymore (full assigned), the pointer
* move to next small file.
* <pre>
* | ->
* V
* | smallFile_1 | smallFile_2 | smallFile_3 | ... | smallFile_N |
* </pre>
*
* <p>If all the small files are full assigned, a flag {@code noSpace} was marked to true, and
* we can return early for future check.
*/ */
protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) { private static class SmallFileAssign {
long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate(); final SmallFileAssignState[] states;
long fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit()); int assignIdx = 0;
try { boolean noSpace = false;
if (!commitTimeline.empty()) {
// Go over the reverse ordered commits to get a more recent estimate of average record size. SmallFileAssign(SmallFileAssignState[] states) {
Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator(); this.states = states;
while (instants.hasNext()) {
HoodieInstant instant = instants.next();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
break;
} }
public boolean assign() {
if (noSpace) {
return false;
} }
SmallFileAssignState state = states[assignIdx];
if (!state.canAssign()) {
assignIdx += 1;
if (assignIdx >= states.length) {
noSpace = true;
return false;
} }
} catch (Throwable t) { // move to next slot if possible
// make this fail safe. state = states[assignIdx];
LOG.error("Error trying to compute average bytes/record ", t); assert state.canAssign();
}
state.assign();
return true;
}
public String getFileId() {
return states[assignIdx].fileId;
} }
return avgSize;
} }
/** /**
@@ -301,10 +254,12 @@ public class BucketAssigner {
private static class SmallFileAssignState { private static class SmallFileAssignState {
long assigned; long assigned;
long totalUnassigned; long totalUnassigned;
final String fileId;
SmallFileAssignState(long parquetMaxFileSize, SmallFile smallFile, long averageRecordSize) { SmallFileAssignState(long parquetMaxFileSize, SmallFile smallFile, long averageRecordSize) {
this.assigned = 0; this.assigned = 0;
this.totalUnassigned = (parquetMaxFileSize - smallFile.sizeBytes) / averageRecordSize; this.totalUnassigned = (parquetMaxFileSize - smallFile.sizeBytes) / averageRecordSize;
this.fileId = smallFile.location.getFileId();
} }
public boolean canAssign() { public boolean canAssign() {

View File

@@ -21,7 +21,8 @@ package org.apache.hudi.sink.partitioner;
import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sink.partitioner.delta.DeltaBucketAssigner; import org.apache.hudi.sink.partitioner.profile.WriteProfile;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
/** /**
* Utilities for {@code BucketAssigner}. * Utilities for {@code BucketAssigner}.
@@ -35,7 +36,7 @@ public abstract class BucketAssigners {
* *
* @param taskID The task ID * @param taskID The task ID
* @param numTasks The number of tasks * @param numTasks The number of tasks
* @param isOverwrite Whether the write operation is OVERWRITE * @param overwrite Whether the write operation is OVERWRITE
* @param tableType The table type * @param tableType The table type
* @param context The engine context * @param context The engine context
* @param config The configuration * @param config The configuration
@@ -44,20 +45,12 @@ public abstract class BucketAssigners {
public static BucketAssigner create( public static BucketAssigner create(
int taskID, int taskID,
int numTasks, int numTasks,
boolean isOverwrite, boolean overwrite,
HoodieTableType tableType, HoodieTableType tableType,
HoodieFlinkEngineContext context, HoodieFlinkEngineContext context,
HoodieWriteConfig config) { HoodieWriteConfig config) {
if (isOverwrite) { boolean delta = tableType.equals(HoodieTableType.MERGE_ON_READ);
return new OverwriteBucketAssigner(taskID, numTasks, context, config); WriteProfile writeProfile = WriteProfiles.singleton(overwrite, delta, config, context);
} return new BucketAssigner(taskID, numTasks, writeProfile, config);
switch (tableType) {
case COPY_ON_WRITE:
return new BucketAssigner(taskID, numTasks, context, config);
case MERGE_ON_READ:
return new DeltaBucketAssigner(taskID, numTasks, context, config);
default:
throw new AssertionError();
}
} }
} }

View File

@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.sink.partitioner.delta; package org.apache.hudi.sink.partitioner.profile;
import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.FSUtils;
@@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sink.partitioner.BucketAssigner;
import org.apache.hudi.table.action.commit.SmallFile; import org.apache.hudi.table.action.commit.SmallFile;
import java.util.ArrayList; import java.util.ArrayList;
@@ -34,22 +33,18 @@ import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* BucketAssigner for MERGE_ON_READ table type, this allows auto correction of small parquet files to larger ones * WriteProfile for MERGE_ON_READ table type, this allows auto correction of small parquet files to larger ones
* without the need for an index in the logFile. * without the need for an index in the logFile.
* *
* <p>Note: assumes the index can always index log files for Flink write. * <p>Note: assumes the index can always index log files for Flink write.
*/ */
public class DeltaBucketAssigner extends BucketAssigner { public class DeltaWriteProfile extends WriteProfile {
public DeltaBucketAssigner( public DeltaWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
int taskID, super(config, context);
int numTasks,
HoodieFlinkEngineContext context,
HoodieWriteConfig config) {
super(taskID, numTasks, context, config);
} }
@Override @Override
protected List<SmallFile> getSmallFiles(String partitionPath) { protected List<SmallFile> smallFilesProfile(String partitionPath) {
// smallFiles only for partitionPath // smallFiles only for partitionPath
List<SmallFile> smallFileLocations = new ArrayList<>(); List<SmallFile> smallFileLocations = new ArrayList<>();

View File

@@ -16,7 +16,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hudi.sink.partitioner; package org.apache.hudi.sink.partitioner.profile;
import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
@@ -26,22 +26,18 @@ import java.util.Collections;
import java.util.List; import java.util.List;
/** /**
* BucketAssigner for INSERT OVERWRITE and INSERT OVERWRITE TABLE operations, * WriteProfile for INSERT OVERWRITE and INSERT OVERWRITE TABLE operations,
* this assigner always skip the existing small files because of the 'OVERWRITE' semantics. * this WriteProfile always skip the existing small files because of the 'OVERWRITE' semantics.
* *
* <p>Note: assumes the index can always index log files for Flink write. * <p>Note: assumes the index can always index log files for Flink write.
*/ */
public class OverwriteBucketAssigner extends BucketAssigner { public class OverwriteWriteProfile extends WriteProfile {
public OverwriteBucketAssigner( public OverwriteWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
int taskID, super(config, context);
int numTasks,
HoodieFlinkEngineContext context,
HoodieWriteConfig config) {
super(taskID, numTasks, context, config);
} }
@Override @Override
protected List<SmallFile> getSmallFiles(String partitionPath) { protected List<SmallFile> smallFilesProfile(String partitionPath) {
return Collections.emptyList(); return Collections.emptyList();
} }
} }

View File

@@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.partitioner.profile;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sink.partitioner.BucketAssigner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.SmallFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Profiling of write statistics for {@link BucketAssigner},
* such as the average record size and small files.
*
* <p>The profile is re-constructed when there are new commits on the timeline.
*/
public class WriteProfile {
private static final Logger LOG = LoggerFactory.getLogger(WriteProfile.class);
/**
* The write config.
*/
protected final HoodieWriteConfig config;
/**
* The hoodie table.
*/
protected final HoodieTable<?, ?, ?, ?> table;
/**
* The average record size.
*/
private long avgSize = -1L;
/**
* Total records to write for each bucket based on
* the config option {@link org.apache.hudi.config.HoodieStorageConfig#PARQUET_FILE_MAX_BYTES}.
*/
private long recordsPerBucket;
/**
* Partition path to small files mapping.
*/
private final Map<String, List<SmallFile>> smallFilesMap;
/**
* Checkpoint id to avoid redundant reload.
*/
private long reloadedCheckpointId;
public WriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context) {
this.config = config;
this.smallFilesMap = new HashMap<>();
this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize();
this.table = HoodieFlinkTable.create(config, context);
// profile the record statistics on construction
recordProfile();
}
public long getAvgSize() {
return avgSize;
}
public long getRecordsPerBucket() {
return recordsPerBucket;
}
public HoodieTable<?, ?, ?, ?> getTable() {
return table;
}
/**
* Obtains the average record size based on records written during previous commits. Used for estimating how many
* records pack into one file.
*/
private long averageBytesPerRecord() {
long avgSize = config.getCopyOnWriteRecordSizeEstimate();
long fileSizeThreshold = (long) (config.getRecordSizeEstimationThreshold() * config.getParquetSmallFileLimit());
HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
try {
if (!commitTimeline.empty()) {
// Go over the reverse ordered commits to get a more recent estimate of average record size.
Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator();
while (instants.hasNext()) {
HoodieInstant instant = instants.next();
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten);
break;
}
}
}
LOG.info("AvgRecordSize => " + avgSize);
} catch (Throwable t) {
// make this fail safe.
LOG.error("Error trying to compute average bytes/record ", t);
}
return avgSize;
}
/**
* Returns a list of small files in the given partition path.
*
* <p>Note: This method should be thread safe.
*/
public synchronized List<SmallFile> getSmallFiles(String partitionPath) {
// lookup the cache first
if (smallFilesMap.containsKey(partitionPath)) {
return smallFilesMap.get(partitionPath);
}
List<SmallFile> smallFiles = smallFilesProfile(partitionPath);
this.smallFilesMap.put(partitionPath, smallFiles);
return smallFiles;
}
/**
* Returns a list of small files in the given partition path from the latest filesystem view.
*/
protected List<SmallFile> smallFilesProfile(String partitionPath) {
// smallFiles only for partitionPath
List<SmallFile> smallFileLocations = new ArrayList<>();
HoodieTimeline commitTimeline = table.getMetaClient().getCommitsTimeline().filterCompletedInstants();
if (!commitTimeline.empty()) { // if we have some commits
HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
List<HoodieBaseFile> allFiles = table.getBaseFileOnlyView()
.getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList());
for (HoodieBaseFile file : allFiles) {
// filter out the corrupted files.
if (file.getFileSize() < config.getParquetSmallFileLimit() && file.getFileSize() > 0) {
String filename = file.getFileName();
SmallFile sf = new SmallFile();
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
sf.sizeBytes = file.getFileSize();
smallFileLocations.add(sf);
}
}
}
return smallFileLocations;
}
private void recordProfile() {
this.avgSize = averageBytesPerRecord();
if (config.shouldAllowMultiWriteOnSameInstant()) {
this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize;
LOG.info("InsertRecordsPerBucket => " + recordsPerBucket);
}
}
/**
* Reload the write profile, should do once for each checkpoint.
*
* <p>We do these things: i). reload the timeline; ii). re-construct the record profile;
* iii) clean the small files cache.
*
* <p>Note: This method should be thread safe.
*/
public synchronized void reload(long checkpointId) {
if (this.reloadedCheckpointId >= checkpointId) {
// already reloaded
return;
}
recordProfile();
this.smallFilesMap.clear();
this.table.getMetaClient().reloadActiveTimeline();
this.reloadedCheckpointId = checkpointId;
}
}

View File

@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.sink.partitioner.profile;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.config.HoodieWriteConfig;
import java.util.HashMap;
import java.util.Map;
/**
* Factory for {@link WriteProfile}.
*/
public class WriteProfiles {
private static final Map<String, WriteProfile> PROFILES = new HashMap<>();
private WriteProfiles() {}
public static synchronized WriteProfile singleton(
boolean overwrite,
boolean delta,
HoodieWriteConfig config,
HoodieFlinkEngineContext context) {
return PROFILES.computeIfAbsent(config.getBasePath(),
k -> getWriteProfile(overwrite, delta, config, context));
}
private static WriteProfile getWriteProfile(
boolean overwrite,
boolean delta,
HoodieWriteConfig config,
HoodieFlinkEngineContext context) {
if (overwrite) {
return new OverwriteWriteProfile(config, context);
} else if (delta) {
return new DeltaWriteProfile(config, context);
} else {
return new WriteProfile(config, context);
}
}
public static void clean(String path) {
PROFILES.remove(path);
}
}

View File

@@ -23,6 +23,7 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.sink.partitioner.profile.WriteProfile;
import org.apache.hudi.table.action.commit.BucketInfo; import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType; import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.action.commit.SmallFile; import org.apache.hudi.table.action.commit.SmallFile;
@@ -311,7 +312,6 @@ public class TestBucketAssigner {
* Mock BucketAssigner that can specify small files explicitly. * Mock BucketAssigner that can specify small files explicitly.
*/ */
static class MockBucketAssigner extends BucketAssigner { static class MockBucketAssigner extends BucketAssigner {
private final Map<String, List<SmallFile>> smallFilesMap;
MockBucketAssigner( MockBucketAssigner(
HoodieFlinkEngineContext context, HoodieFlinkEngineContext context,
@@ -332,12 +332,23 @@ public class TestBucketAssigner {
HoodieFlinkEngineContext context, HoodieFlinkEngineContext context,
HoodieWriteConfig config, HoodieWriteConfig config,
Map<String, List<SmallFile>> smallFilesMap) { Map<String, List<SmallFile>> smallFilesMap) {
super(taskID, numTasks, context, config); super(taskID, numTasks, new MockWriteProfile(config, context, smallFilesMap), config);
}
}
/**
* Mock WriteProfile that can specify small files explicitly.
*/
static class MockWriteProfile extends WriteProfile {
private final Map<String, List<SmallFile>> smallFilesMap;
public MockWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext context, Map<String, List<SmallFile>> smallFilesMap) {
super(config, context);
this.smallFilesMap = smallFilesMap; this.smallFilesMap = smallFilesMap;
} }
@Override @Override
protected List<SmallFile> getSmallFiles(String partitionPath) { protected List<SmallFile> smallFilesProfile(String partitionPath) {
if (this.smallFilesMap.containsKey(partitionPath)) { if (this.smallFilesMap.containsKey(partitionPath)) {
return this.smallFilesMap.get(partitionPath); return this.smallFilesMap.get(partitionPath);
} }

View File

@@ -23,15 +23,12 @@ import org.apache.hudi.streamer.FlinkStreamerConfig;
import org.apache.hudi.utils.factory.CollectSinkTableFactory; import org.apache.hudi.utils.factory.CollectSinkTableFactory;
import org.apache.hudi.utils.factory.ContinuousFileSourceFactory; import org.apache.hudi.utils.factory.ContinuousFileSourceFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer; import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@@ -59,13 +56,6 @@ public class TestConfigurations {
ROW_DATA_TYPE.getChildren().toArray(new DataType[0])) ROW_DATA_TYPE.getChildren().toArray(new DataType[0]))
.build(); .build();
public static final TypeInformation<Row> ROW_TYPE_INFO = Types.ROW(
Types.STRING,
Types.STRING,
Types.INT,
Types.LOCAL_DATE_TIME,
Types.STRING);
public static String getCreateHoodieTableDDL(String tableName, Map<String, String> options) { public static String getCreateHoodieTableDDL(String tableName, Map<String, String> options) {
String createTable = "create table " + tableName + "(\n" String createTable = "create table " + tableName + "(\n"
+ " uuid varchar(20),\n" + " uuid varchar(20),\n"

View File

@@ -18,10 +18,9 @@
package org.apache.hudi.utils.factory; package org.apache.hudi.utils.factory;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -35,6 +34,8 @@ import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row; import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind; import org.apache.flink.types.RowKind;
@@ -112,8 +113,10 @@ public class CollectSinkTableFactory implements DynamicTableSinkFactory {
@Override @Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
final DataType rowType = schema.toPhysicalRowDataType();
final RowTypeInfo rowTypeInfo = (RowTypeInfo) TypeConversions.fromDataTypeToLegacyInfo(rowType);
DataStructureConverter converter = context.createDataStructureConverter(schema.toPhysicalRowDataType()); DataStructureConverter converter = context.createDataStructureConverter(schema.toPhysicalRowDataType());
return SinkFunctionProvider.of(new CollectSinkFunction(converter)); return SinkFunctionProvider.of(new CollectSinkFunction(converter, rowTypeInfo));
} }
@Override @Override
@@ -131,14 +134,16 @@ public class CollectSinkTableFactory implements DynamicTableSinkFactory {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private final DynamicTableSink.DataStructureConverter converter; private final DynamicTableSink.DataStructureConverter converter;
private final RowTypeInfo rowTypeInfo;
protected transient ListState<Row> resultState; protected transient ListState<Row> resultState;
protected transient List<Row> localResult; protected transient List<Row> localResult;
private int taskID; private int taskID;
protected CollectSinkFunction(DynamicTableSink.DataStructureConverter converter) { protected CollectSinkFunction(DynamicTableSink.DataStructureConverter converter, RowTypeInfo rowTypeInfo) {
this.converter = converter; this.converter = converter;
this.rowTypeInfo = rowTypeInfo;
} }
@Override @Override
@@ -151,7 +156,7 @@ public class CollectSinkTableFactory implements DynamicTableSinkFactory {
@Override @Override
public void initializeState(FunctionInitializationContext context) throws Exception { public void initializeState(FunctionInitializationContext context) throws Exception {
this.resultState = context.getOperatorStateStore().getListState( this.resultState = context.getOperatorStateStore().getListState(
new ListStateDescriptor<>("sink-results", TestConfigurations.ROW_TYPE_INFO)); new ListStateDescriptor<>("sink-results", rowTypeInfo));
this.localResult = new ArrayList<>(); this.localResult = new ArrayList<>();
if (context.isRestored()) { if (context.isRestored()) {
for (Row value : resultState.get()) { for (Row value : resultState.get()) {