[HUDI-3741] Fix flink bucket index bulk insert generates too many small files (#5164)
This commit is contained in:
@@ -37,6 +37,7 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Helper class for bucket index bulk insert used by Flink.
|
||||
@@ -90,14 +91,16 @@ public class BucketBulkInsertWriterHelper extends BulkInsertWriterHelper {
|
||||
return new SortOperatorGen(rowType, new String[] {FILE_GROUP_META_FIELD});
|
||||
}
|
||||
|
||||
private static String getFileId(RowDataKeyGen keyGen, RowData record, String indexKeyFields, int bucketNum) {
|
||||
private static String getFileId(Map<String, String> bucketIdToFileId, RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets) {
|
||||
String recordKey = keyGen.getRecordKey(record);
|
||||
final int bucketId = BucketIdentifier.getBucketId(recordKey, indexKeyFields, bucketNum);
|
||||
return BucketIdentifier.newBucketFileIdPrefix(bucketId);
|
||||
String partition = keyGen.getPartitionPath(record);
|
||||
final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeys, numBuckets);
|
||||
String bucketId = partition + bucketNum;
|
||||
return bucketIdToFileId.computeIfAbsent(bucketId, k -> BucketIdentifier.newBucketFileIdPrefix(bucketNum));
|
||||
}
|
||||
|
||||
public static RowData rowWithFileId(RowDataKeyGen keyGen, RowData record, String indexKeyFields, int bucketNum) {
|
||||
final String fileId = getFileId(keyGen, record, indexKeyFields, bucketNum);
|
||||
public static RowData rowWithFileId(Map<String, String> bucketIdToFileId, RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets) {
|
||||
final String fileId = getFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets);
|
||||
return GenericRowData.of(StringData.fromString(fileId), record);
|
||||
}
|
||||
|
||||
|
||||
@@ -57,6 +57,9 @@ import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
|
||||
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
|
||||
import org.apache.flink.table.types.logical.RowType;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Utilities to generate all kinds of sub-pipelines.
|
||||
*/
|
||||
@@ -90,16 +93,18 @@ public class Pipelines {
|
||||
public static DataStreamSink<Object> bulkInsert(Configuration conf, RowType rowType, DataStream<RowData> dataStream) {
|
||||
WriteOperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType);
|
||||
if (OptionsResolver.isBucketIndexType(conf)) {
|
||||
int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
|
||||
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
|
||||
BucketIndexPartitioner<String> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
|
||||
RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
|
||||
String indexKeys = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
|
||||
int numBuckets = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
|
||||
|
||||
BucketIndexPartitioner<String> partitioner = new BucketIndexPartitioner<>(numBuckets, indexKeys);
|
||||
RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType);
|
||||
RowType rowTypeWithFileId = BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
|
||||
InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(rowTypeWithFileId);
|
||||
dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getRecordKey)
|
||||
.map(record -> BucketBulkInsertWriterHelper.rowWithFileId(rowDataKeyGen, record, indexKeyFields, bucketNum),
|
||||
typeInfo)
|
||||
.setParallelism(dataStream.getParallelism()); // same parallelism as source
|
||||
|
||||
Map<String, String> bucketIdToFileId = new HashMap<>();
|
||||
dataStream = dataStream.partitionCustom(partitioner, keyGen::getRecordKey)
|
||||
.map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo)
|
||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle
|
||||
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
|
||||
SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
|
||||
dataStream = dataStream.transform("file_sorter", typeInfo, sortOperatorGen.createSortOperator())
|
||||
|
||||
@@ -73,7 +73,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning,
|
||||
// bulk_insert mode
|
||||
final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
|
||||
if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
|
||||
return context.isBounded() ? Pipelines.bulkInsert(conf, rowType, dataStream) : Pipelines.append(conf, rowType, dataStream);
|
||||
return Pipelines.bulkInsert(conf, rowType, dataStream);
|
||||
}
|
||||
|
||||
// Append mode
|
||||
|
||||
@@ -1179,7 +1179,7 @@ public class ITTestHoodieDataSource extends AbstractTestBase {
|
||||
@ParameterizedTest
|
||||
@ValueSource(strings = {"insert", "upsert", "bulk_insert"})
|
||||
void testBuiltinFunctionWithCatalog(String operation) {
|
||||
TableEnvironment tableEnv = streamTableEnv;
|
||||
TableEnvironment tableEnv = batchTableEnv;
|
||||
|
||||
String hudiCatalogDDL = catalog("hudi_" + operation)
|
||||
.catalogPath(tempFile.getAbsolutePath())
|
||||
|
||||
Reference in New Issue
Block a user