[HUDI-4101] BucketIndexPartitioner should take partition path for better dispersion (#5590)
This commit is contained in:
@@ -73,7 +73,7 @@ public class BucketIdentifier implements Serializable {
|
|||||||
.map(p -> p.split(":"))
|
.map(p -> p.split(":"))
|
||||||
.collect(Collectors.toMap(p -> p[0], p -> p[1]));
|
.collect(Collectors.toMap(p -> p[0], p -> p[1]));
|
||||||
return indexKeyFields.stream()
|
return indexKeyFields.stream()
|
||||||
.map(f -> recordKeyPairs.get(f)).collect(Collectors.toList());
|
.map(recordKeyPairs::get).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static String partitionBucketIdStr(String partition, int bucketId) {
|
public static String partitionBucketIdStr(String partition, int bucketId) {
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.sink.bulk;
|
package org.apache.hudi.sink.bulk;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.common.util.Option;
|
import org.apache.hudi.common.util.Option;
|
||||||
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
|
import org.apache.hudi.common.util.PartitionPathEncodeUtils;
|
||||||
import org.apache.hudi.common.util.StringUtils;
|
import org.apache.hudi.common.util.StringUtils;
|
||||||
@@ -127,6 +128,10 @@ public class RowDataKeyGen implements Serializable {
|
|||||||
keyGeneratorOpt);
|
keyGeneratorOpt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public HoodieKey getHoodieKey(RowData rowData) {
|
||||||
|
return new HoodieKey(getRecordKey(rowData), getPartitionPath(rowData));
|
||||||
|
}
|
||||||
|
|
||||||
public String getRecordKey(RowData rowData) {
|
public String getRecordKey(RowData rowData) {
|
||||||
if (this.simpleRecordKey) {
|
if (this.simpleRecordKey) {
|
||||||
return getRecordKey(recordKeyFieldGetter.getFieldOrNull(rowData), this.recordKeyFields[0]);
|
return getRecordKey(recordKeyFieldGetter.getFieldOrNull(rowData), this.recordKeyFields[0]);
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.sink.partitioner;
|
package org.apache.hudi.sink.partitioner;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.index.bucket.BucketIdentifier;
|
import org.apache.hudi.index.bucket.BucketIdentifier;
|
||||||
|
|
||||||
import org.apache.flink.api.common.functions.Partitioner;
|
import org.apache.flink.api.common.functions.Partitioner;
|
||||||
@@ -28,7 +29,7 @@ import org.apache.flink.api.common.functions.Partitioner;
|
|||||||
*
|
*
|
||||||
* @param <T> The type of obj to hash
|
* @param <T> The type of obj to hash
|
||||||
*/
|
*/
|
||||||
public class BucketIndexPartitioner<T extends String> implements Partitioner<T> {
|
public class BucketIndexPartitioner<T extends HoodieKey> implements Partitioner<T> {
|
||||||
|
|
||||||
private final int bucketNum;
|
private final int bucketNum;
|
||||||
private final String indexKeyFields;
|
private final String indexKeyFields;
|
||||||
@@ -39,8 +40,9 @@ public class BucketIndexPartitioner<T extends String> implements Partitioner<T>
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int partition(String key, int numPartitions) {
|
public int partition(HoodieKey key, int numPartitions) {
|
||||||
int curBucket = BucketIdentifier.getBucketId(key, indexKeyFields, bucketNum);
|
int curBucket = BucketIdentifier.getBucketId(key, indexKeyFields, bucketNum);
|
||||||
return BucketIdentifier.mod(curBucket, numPartitions);
|
int globalHash = (key.getPartitionPath() + curBucket).hashCode() & Integer.MAX_VALUE;
|
||||||
|
return BucketIdentifier.mod(globalHash, numPartitions);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.sink.utils;
|
package org.apache.hudi.sink.utils;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.configuration.OptionsResolver;
|
import org.apache.hudi.configuration.OptionsResolver;
|
||||||
@@ -96,13 +97,13 @@ public class Pipelines {
|
|||||||
String indexKeys = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
|
String indexKeys = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
|
||||||
int numBuckets = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
|
int numBuckets = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
|
||||||
|
|
||||||
BucketIndexPartitioner<String> partitioner = new BucketIndexPartitioner<>(numBuckets, indexKeys);
|
BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(numBuckets, indexKeys);
|
||||||
RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType);
|
RowDataKeyGen keyGen = RowDataKeyGen.instance(conf, rowType);
|
||||||
RowType rowTypeWithFileId = BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
|
RowType rowTypeWithFileId = BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
|
||||||
InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(rowTypeWithFileId);
|
InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(rowTypeWithFileId);
|
||||||
|
|
||||||
Map<String, String> bucketIdToFileId = new HashMap<>();
|
Map<String, String> bucketIdToFileId = new HashMap<>();
|
||||||
dataStream = dataStream.partitionCustom(partitioner, keyGen::getRecordKey)
|
dataStream = dataStream.partitionCustom(partitioner, keyGen::getHoodieKey)
|
||||||
.map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo)
|
.map(record -> BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, indexKeys, numBuckets), typeInfo)
|
||||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle
|
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); // same parallelism as write task to avoid shuffle
|
||||||
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
|
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
|
||||||
@@ -319,8 +320,8 @@ public class Pipelines {
|
|||||||
WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);
|
WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);
|
||||||
int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
|
int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
|
||||||
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
|
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
|
||||||
BucketIndexPartitioner<String> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
|
BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
|
||||||
return dataStream.partitionCustom(partitioner, HoodieRecord::getRecordKey)
|
return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
|
||||||
.transform("bucket_write", TypeInformation.of(Object.class), operatorFactory)
|
.transform("bucket_write", TypeInformation.of(Object.class), operatorFactory)
|
||||||
.uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
|
.uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
|
||||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
|
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
|
||||||
|
|||||||
Reference in New Issue
Block a user