diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java index 4ab36d6a5..ddd95721a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java @@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.regex.Pattern; @@ -39,7 +40,7 @@ public class BucketIdentifier { public static int getBucketId(HoodieKey hoodieKey, String indexKeyFields, int numBuckets) { List hashKeyFields; if (!hoodieKey.getRecordKey().contains(":")) { - hashKeyFields = Arrays.asList(hoodieKey.getRecordKey()); + hashKeyFields = Collections.singletonList(hoodieKey.getRecordKey()); } else { Map recordKeyPairs = Arrays.stream(hoodieKey.getRecordKey().split(",")) .map(p -> p.split(":")) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java index ac5dd630b..128358096 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteFunction.java @@ -41,6 +41,15 @@ import java.util.List; import static java.util.stream.Collectors.toList; +/** + * A stream write function with bucket hash index. + * + *

The task holds a fresh new local index: {(partition + bucket number) &rarr fileId} mapping, this index + * is used for deciding whether the incoming records in an UPDATE or INSERT. + * The index is local because different partition paths have separate items in the index. + * + * @param the input type + */ public class BucketStreamWriteFunction extends StreamWriteFunction { private static final Logger LOG = LoggerFactory.getLogger(BucketStreamWriteFunction.class); @@ -51,11 +60,11 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { private int bucketNum; - protected transient HoodieFlinkTable table; + private transient HoodieFlinkTable table; private String indexKeyFields; - private HashMap bucketToFileIDMap; + private final HashMap bucketToFileIDMap; /** * Constructs a BucketStreamWriteFunction. @@ -64,7 +73,7 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { */ public BucketStreamWriteFunction(Configuration config) { super(config); - this.bucketToFileIDMap = new HashMap(); + this.bucketToFileIDMap = new HashMap<>(); } @Override @@ -127,7 +136,7 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { bucketToLoad.add(i); } } - bucketToLoad.stream().forEach(bucket -> LOG.info(String.format("bucketToLoad contains %s", bucket))); + bucketToLoad.forEach(bucket -> LOG.info(String.format("bucketToLoad contains %s", bucket))); LOG.info(String.format("Loading Hoodie Table %s, with path %s", table.getMetaClient().getTableConfig().getTableName(), table.getMetaClient().getBasePath())); @@ -146,7 +155,7 @@ public class BucketStreamWriteFunction extends StreamWriteFunction { LOG.info(String.format("Should load this partition bucket %s with fileID %s", partitionBucketId, fileID)); if (bucketToFileIDMap.containsKey(partitionBucketId)) { throw new RuntimeException(String.format("Duplicate fileID %s from partitionBucket %s found " - + "during the fileGroupPerPartitionedBucketState initialization.", fileID, partitionBucketId)); + + "during the BucketStreamWriteFunction index bootstrap.", fileID, partitionBucketId)); } else { LOG.info(String.format("Adding fileID %s to the partition bucket %s.", fileID, partitionBucketId)); bucketToFileIDMap.put(partitionBucketId, fileID); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteOperator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteOperator.java index cf4ef3f78..209fe59e4 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/BucketStreamWriteOperator.java @@ -22,6 +22,11 @@ import org.apache.flink.configuration.Configuration; import org.apache.hudi.sink.common.AbstractWriteOperator; import org.apache.hudi.sink.common.WriteOperatorFactory; +/** + * Operator for {@link BucketStreamWriteFunction}. + * + * @param The input type + */ public class BucketStreamWriteOperator extends AbstractWriteOperator { public BucketStreamWriteOperator(Configuration conf) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java index aab527185..ab46b0317 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java @@ -22,7 +22,13 @@ import org.apache.flink.api.common.functions.Partitioner; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.index.bucket.BucketIdentifier; -public class BucketIndexPartitioner implements Partitioner { +/** + * Bucket index input partitioner. + * The fields to hash can be a subset of the primary key fields. + * + * @param The type of obj to hash + */ +public class BucketIndexPartitioner implements Partitioner { private final int bucketNum; private final String indexKeyFields; @@ -33,8 +39,8 @@ public class BucketIndexPartitioner implements Partitioner { } @Override - public int partition(Object key, int numPartitions) { - int curBucket = BucketIdentifier.getBucketId((HoodieKey) key, indexKeyFields, bucketNum); + public int partition(HoodieKey key, int numPartitions) { + int curBucket = BucketIdentifier.getBucketId(key, indexKeyFields, bucketNum); return BucketIdentifier.mod(curBucket, numPartitions); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 09b7369ed..ae8b4f213 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -18,6 +18,7 @@ package org.apache.hudi.sink.utils; +import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.OptionsResolver; @@ -276,10 +277,10 @@ public class Pipelines { WriteOperatorFactory operatorFactory = BucketStreamWriteOperator.getFactory(conf); int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS); String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD); - BucketIndexPartitioner partitioner = new BucketIndexPartitioner(bucketNum, indexKeyFields); + BucketIndexPartitioner partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields); return dataStream.partitionCustom(partitioner, HoodieRecord::getKey) - .transform("hoodie_bucket_stream_write", TypeInformation.of(Object.class), operatorFactory) - .uid("uid_hoodie_bucket_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) + .transform("bucket_write", TypeInformation.of(Object.class), operatorFactory) + .uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); } else { WriteOperatorFactory operatorFactory = StreamWriteOperator.getFactory(conf); @@ -294,8 +295,8 @@ public class Pipelines { .setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism)) // shuffle by fileId(bucket id) .keyBy(record -> record.getCurrentLocation().getFileId()) - .transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory) - .uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) + .transform("stream_write", TypeInformation.of(Object.class), operatorFactory) + .uid("uid_stream_write" + conf.getString(FlinkOptions.TABLE_NAME)) .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)); } }