[HUDI-3728] Set the sort operator parallelism for flink bucket bulk insert (#5154)
This commit is contained in:
@@ -18,16 +18,12 @@
|
|||||||
|
|
||||||
package org.apache.hudi.sink.bucket;
|
package org.apache.hudi.sink.bucket;
|
||||||
|
|
||||||
import org.apache.hudi.common.model.FileSlice;
|
|
||||||
import org.apache.hudi.common.model.HoodieKey;
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.common.model.HoodieRecordLocation;
|
import org.apache.hudi.common.model.HoodieRecordLocation;
|
||||||
import org.apache.hudi.common.table.timeline.HoodieInstant;
|
|
||||||
import org.apache.hudi.common.util.Option;
|
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.index.bucket.BucketIdentifier;
|
import org.apache.hudi.index.bucket.BucketIdentifier;
|
||||||
import org.apache.hudi.sink.StreamWriteFunction;
|
import org.apache.hudi.sink.StreamWriteFunction;
|
||||||
import org.apache.hudi.table.HoodieFlinkTable;
|
|
||||||
|
|
||||||
import org.apache.flink.configuration.Configuration;
|
import org.apache.flink.configuration.Configuration;
|
||||||
import org.apache.flink.runtime.state.FunctionInitializationContext;
|
import org.apache.flink.runtime.state.FunctionInitializationContext;
|
||||||
@@ -39,12 +35,9 @@ import org.slf4j.LoggerFactory;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import static java.util.stream.Collectors.toList;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A stream write function with bucket hash index.
|
* A stream write function with bucket hash index.
|
||||||
*
|
*
|
||||||
@@ -58,18 +51,14 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
|||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(BucketStreamWriteFunction.class);
|
private static final Logger LOG = LoggerFactory.getLogger(BucketStreamWriteFunction.class);
|
||||||
|
|
||||||
private int maxParallelism;
|
|
||||||
|
|
||||||
private int parallelism;
|
private int parallelism;
|
||||||
|
|
||||||
private int bucketNum;
|
private int bucketNum;
|
||||||
|
|
||||||
private transient HoodieFlinkTable<?> table;
|
|
||||||
|
|
||||||
private String indexKeyFields;
|
private String indexKeyFields;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* BucketID should be load in this task.
|
* BucketID should be loaded in this task.
|
||||||
*/
|
*/
|
||||||
private Set<Integer> bucketToLoad;
|
private Set<Integer> bucketToLoad;
|
||||||
|
|
||||||
@@ -86,6 +75,11 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
|||||||
*/
|
*/
|
||||||
private Set<String> incBucketIndex;
|
private Set<String> incBucketIndex;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns whether this is an empty table.
|
||||||
|
*/
|
||||||
|
private boolean isEmptyTable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a BucketStreamWriteFunction.
|
* Constructs a BucketStreamWriteFunction.
|
||||||
*
|
*
|
||||||
@@ -102,17 +96,15 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
|||||||
this.indexKeyFields = config.getString(FlinkOptions.INDEX_KEY_FIELD);
|
this.indexKeyFields = config.getString(FlinkOptions.INDEX_KEY_FIELD);
|
||||||
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
|
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
|
||||||
this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
|
this.parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
|
||||||
this.maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
|
this.bucketToLoad = getBucketToLoad();
|
||||||
this.bucketToLoad = new HashSet<>();
|
|
||||||
this.bucketIndex = new HashMap<>();
|
this.bucketIndex = new HashMap<>();
|
||||||
this.incBucketIndex = new HashSet<>();
|
this.incBucketIndex = new HashSet<>();
|
||||||
getBucketToLoad();
|
this.isEmptyTable = !this.metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initializeState(FunctionInitializationContext context) throws Exception {
|
public void initializeState(FunctionInitializationContext context) throws Exception {
|
||||||
super.initializeState(context);
|
super.initializeState(context);
|
||||||
this.table = this.writeClient.getHoodieTable();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -129,19 +121,19 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
|||||||
final HoodieRecordLocation location;
|
final HoodieRecordLocation location;
|
||||||
|
|
||||||
bootstrapIndexIfNeed(partition);
|
bootstrapIndexIfNeed(partition);
|
||||||
Map<Integer, String> bucketToFileIdMap = bucketIndex.get(partition);
|
Map<Integer, String> bucketToFileId = bucketIndex.computeIfAbsent(partition, p -> new HashMap<>());
|
||||||
final int bucketNum = BucketIdentifier.getBucketId(hoodieKey, indexKeyFields, this.bucketNum);
|
final int bucketNum = BucketIdentifier.getBucketId(hoodieKey, indexKeyFields, this.bucketNum);
|
||||||
final String partitionBucketId = BucketIdentifier.partitionBucketIdStr(hoodieKey.getPartitionPath(), bucketNum);
|
final String bucketId = partition + bucketNum;
|
||||||
|
|
||||||
if (incBucketIndex.contains(partitionBucketId)) {
|
if (incBucketIndex.contains(bucketId)) {
|
||||||
location = new HoodieRecordLocation("I", bucketToFileIdMap.get(bucketNum));
|
location = new HoodieRecordLocation("I", bucketToFileId.get(bucketNum));
|
||||||
} else if (bucketToFileIdMap.containsKey(bucketNum)) {
|
} else if (bucketToFileId.containsKey(bucketNum)) {
|
||||||
location = new HoodieRecordLocation("U", bucketToFileIdMap.get(bucketNum));
|
location = new HoodieRecordLocation("U", bucketToFileId.get(bucketNum));
|
||||||
} else {
|
} else {
|
||||||
String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
|
String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
|
||||||
location = new HoodieRecordLocation("I", newFileId);
|
location = new HoodieRecordLocation("I", newFileId);
|
||||||
bucketToFileIdMap.put(bucketNum,newFileId);
|
bucketToFileId.put(bucketNum, newFileId);
|
||||||
incBucketIndex.add(partitionBucketId);
|
incBucketIndex.add(bucketId);
|
||||||
}
|
}
|
||||||
record.unseal();
|
record.unseal();
|
||||||
record.setCurrentLocation(location);
|
record.setCurrentLocation(location);
|
||||||
@@ -153,39 +145,32 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
|||||||
* Bootstrap bucket info from existing file system,
|
* Bootstrap bucket info from existing file system,
|
||||||
* bucketNum % totalParallelism == this taskID belongs to this task.
|
* bucketNum % totalParallelism == this taskID belongs to this task.
|
||||||
*/
|
*/
|
||||||
private void getBucketToLoad() {
|
private Set<Integer> getBucketToLoad() {
|
||||||
|
Set<Integer> bucketToLoad = new HashSet<>();
|
||||||
for (int i = 0; i < bucketNum; i++) {
|
for (int i = 0; i < bucketNum; i++) {
|
||||||
int partitionOfBucket = BucketIdentifier.mod(i, parallelism);
|
int partitionOfBucket = BucketIdentifier.mod(i, parallelism);
|
||||||
if (partitionOfBucket == taskID) {
|
if (partitionOfBucket == taskID) {
|
||||||
LOG.info(String.format("Bootstrapping index. Adding bucket %s , "
|
|
||||||
+ "Current parallelism: %s , Max parallelism: %s , Current task id: %s",
|
|
||||||
i, parallelism, maxParallelism, taskID));
|
|
||||||
bucketToLoad.add(i);
|
bucketToLoad.add(i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bucketToLoad.forEach(bucket -> LOG.info(String.format("bucketToLoad contains %s", bucket)));
|
LOG.info("Bucket number that belongs to task [{}/{}]: {}", taskID, parallelism, bucketToLoad);
|
||||||
|
return bucketToLoad;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get partition_bucket -> fileID mapping from the existing hudi table.
|
* Get partition_bucket -> fileID mapping from the existing hudi table.
|
||||||
* This is a required operation for each restart to avoid having duplicate file ids for one bucket.
|
* This is a required operation for each restart to avoid having duplicate file ids for one bucket.
|
||||||
*/
|
*/
|
||||||
private void bootstrapIndexIfNeed(String partition) throws IOException {
|
private void bootstrapIndexIfNeed(String partition) {
|
||||||
if (bucketIndex.containsKey(partition)) {
|
if (isEmptyTable || bucketIndex.containsKey(partition)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
Option<HoodieInstant> latestCommitTime = table.getHoodieView().getTimeline().filterCompletedInstants().lastInstant();
|
LOG.info(String.format("Loading Hoodie Table %s, with path %s", this.metaClient.getTableConfig().getTableName(),
|
||||||
if (!latestCommitTime.isPresent()) {
|
this.metaClient.getBasePath() + "/" + partition));
|
||||||
bucketIndex.put(partition, new HashMap<>());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
LOG.info(String.format("Loading Hoodie Table %s, with path %s", table.getMetaClient().getTableConfig().getTableName(),
|
|
||||||
table.getMetaClient().getBasePath() + "/" + partition));
|
|
||||||
|
|
||||||
// Load existing fileID belongs to this task
|
// Load existing fileID belongs to this task
|
||||||
Map<Integer, String> bucketToFileIDMap = new HashMap<>();
|
Map<Integer, String> bucketToFileIDMap = new HashMap<>();
|
||||||
List<FileSlice> fileSlices = table.getHoodieView().getLatestFileSlices(partition).collect(toList());
|
this.writeClient.getHoodieTable().getHoodieView().getLatestFileSlices(partition).forEach(fileSlice -> {
|
||||||
for (FileSlice fileSlice : fileSlices) {
|
|
||||||
String fileID = fileSlice.getFileId();
|
String fileID = fileSlice.getFileId();
|
||||||
int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID);
|
int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID);
|
||||||
if (bucketToLoad.contains(bucketNumber)) {
|
if (bucketToLoad.contains(bucketNumber)) {
|
||||||
@@ -198,7 +183,7 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
|||||||
bucketToFileIDMap.put(bucketNumber, fileID);
|
bucketToFileIDMap.put(bucketNumber, fileID);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
bucketIndex.put(partition, bucketToFileIDMap);
|
bucketIndex.put(partition, bucketToFileIDMap);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -98,10 +98,12 @@ public class Pipelines {
|
|||||||
InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(rowTypeWithFileId);
|
InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(rowTypeWithFileId);
|
||||||
dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getRecordKey)
|
dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getRecordKey)
|
||||||
.map(record -> BucketBulkInsertWriterHelper.rowWithFileId(rowDataKeyGen, record, indexKeyFields, bucketNum),
|
.map(record -> BucketBulkInsertWriterHelper.rowWithFileId(rowDataKeyGen, record, indexKeyFields, bucketNum),
|
||||||
typeInfo);
|
typeInfo)
|
||||||
|
.setParallelism(dataStream.getParallelism()); // same parallelism as source
|
||||||
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
|
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
|
||||||
SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
|
SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
|
||||||
dataStream = dataStream.transform("file_sorter", typeInfo, sortOperatorGen.createSortOperator());
|
dataStream = dataStream.transform("file_sorter", typeInfo, sortOperatorGen.createSortOperator())
|
||||||
|
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle
|
||||||
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
|
ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
|
||||||
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
|
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user