diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java index ae646d9e0..5c44581c9 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java @@ -37,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Map; /** * Helper class for bucket index bulk insert used by Flink. @@ -90,14 +91,16 @@ public class BucketBulkInsertWriterHelper extends BulkInsertWriterHelper { return new SortOperatorGen(rowType, new String[] {FILE_GROUP_META_FIELD}); } - private static String getFileId(RowDataKeyGen keyGen, RowData record, String indexKeyFields, int bucketNum) { + private static String getFileId(Map bucketIdToFileId, RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets) { String recordKey = keyGen.getRecordKey(record); - final int bucketId = BucketIdentifier.getBucketId(recordKey, indexKeyFields, bucketNum); - return BucketIdentifier.newBucketFileIdPrefix(bucketId); + String partition = keyGen.getPartitionPath(record); + final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeys, numBuckets); + String bucketId = partition + bucketNum; + return bucketIdToFileId.computeIfAbsent(bucketId, k -> BucketIdentifier.newBucketFileIdPrefix(bucketNum)); } - public static RowData rowWithFileId(RowDataKeyGen keyGen, RowData record, String indexKeyFields, int bucketNum) { - final String fileId = getFileId(keyGen, record, indexKeyFields, bucketNum); + public static RowData rowWithFileId(Map bucketIdToFileId, RowDataKeyGen keyGen, RowData record, String indexKeys, int numBuckets) { + final String fileId = getFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets); return GenericRowData.of(StringData.fromString(fileId), record); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 9f0a81753..28a669075 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -57,6 +57,9 @@ import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import org.apache.flink.table.types.logical.RowType; +import java.util.HashMap; +import java.util.Map; + /** * Utilities to generate all kinds of sub-pipelines. */ @@ -90,16 +93,18 @@ public class Pipelines { public static DataStreamSink bulkInsert(Configuration conf, RowType rowType, DataStream dataStream) { WriteOperatorFactory operatorFactory = BulkInsertWriteOperator.getFactory(conf, rowType); if (OptionsResolver.isBucketIndexType(conf)) { - int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS); - String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD); - BucketIndexPartitioner partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields); - RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType); + String indexKeys = conf.getString(FlinkOptions.INDEX_KEY_FIELD); + int numBuckets = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS); + + BucketIndexPartitioner partitioner = new BucketIndexPartitioner<>(numBuckets, indexKeys); + RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType); RowType rowTypeWithFileId = BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType); InternalTypeInfo typeInfo = InternalTypeInfo.of(rowTypeWithFileId); - dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getRecordKey) - .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(rowDataKeyGen, record, indexKeyFields, bucketNum), - typeInfo) - .setParallelism(dataStream.getParallelism()); // same parallelism as source + + Map bucketIdToFileId = new HashMap<>(); + dataStream = dataStream.partitionCustom(partitioner, keyGen::getRecordKey) + .map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo) + .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) { SortOperatorGen sortOperatorGen = BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId); dataStream = dataStream.transform("file_sorter", typeInfo, sortOperatorGen.createSortOperator()) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java index bbbc67985..ed99e7b4c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java @@ -73,7 +73,7 @@ public class HoodieTableSink implements DynamicTableSink, SupportsPartitioning, // bulk_insert mode final String writeOperation = this.conf.get(FlinkOptions.OPERATION); if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) { - return context.isBounded() ? Pipelines.bulkInsert(conf, rowType, dataStream) : Pipelines.append(conf, rowType, dataStream); + return Pipelines.bulkInsert(conf, rowType, dataStream); } // Append mode diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 8802cac7b..786a45cac 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -1179,7 +1179,7 @@ public class ITTestHoodieDataSource extends AbstractTestBase { @ParameterizedTest @ValueSource(strings = {"insert", "upsert", "bulk_insert"}) void testBuiltinFunctionWithCatalog(String operation) { - TableEnvironment tableEnv = streamTableEnv; + TableEnvironment tableEnv = batchTableEnv; String hudiCatalogDDL = catalog("hudi_" + operation) .catalogPath(tempFile.getAbsolutePath())