[minor] Cosmetic changes following HUDI-3315 (#4934)
This commit is contained in:
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieKey;
|
|||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
@@ -39,7 +40,7 @@ public class BucketIdentifier {
|
|||||||
public static int getBucketId(HoodieKey hoodieKey, String indexKeyFields, int numBuckets) {
|
public static int getBucketId(HoodieKey hoodieKey, String indexKeyFields, int numBuckets) {
|
||||||
List<String> hashKeyFields;
|
List<String> hashKeyFields;
|
||||||
if (!hoodieKey.getRecordKey().contains(":")) {
|
if (!hoodieKey.getRecordKey().contains(":")) {
|
||||||
hashKeyFields = Arrays.asList(hoodieKey.getRecordKey());
|
hashKeyFields = Collections.singletonList(hoodieKey.getRecordKey());
|
||||||
} else {
|
} else {
|
||||||
Map<String, String> recordKeyPairs = Arrays.stream(hoodieKey.getRecordKey().split(","))
|
Map<String, String> recordKeyPairs = Arrays.stream(hoodieKey.getRecordKey().split(","))
|
||||||
.map(p -> p.split(":"))
|
.map(p -> p.split(":"))
|
||||||
|
|||||||
@@ -41,6 +41,15 @@ import java.util.List;
|
|||||||
|
|
||||||
import static java.util.stream.Collectors.toList;
|
import static java.util.stream.Collectors.toList;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A stream write function with bucket hash index.
|
||||||
|
*
|
||||||
|
* <p>The task holds a fresh new local index: {(partition + bucket number) &rarr fileId} mapping, this index
|
||||||
|
* is used for deciding whether the incoming records in an UPDATE or INSERT.
|
||||||
|
* The index is local because different partition paths have separate items in the index.
|
||||||
|
*
|
||||||
|
* @param <I> the input type
|
||||||
|
*/
|
||||||
public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(BucketStreamWriteFunction.class);
|
private static final Logger LOG = LoggerFactory.getLogger(BucketStreamWriteFunction.class);
|
||||||
@@ -51,11 +60,11 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
|||||||
|
|
||||||
private int bucketNum;
|
private int bucketNum;
|
||||||
|
|
||||||
protected transient HoodieFlinkTable table;
|
private transient HoodieFlinkTable<?> table;
|
||||||
|
|
||||||
private String indexKeyFields;
|
private String indexKeyFields;
|
||||||
|
|
||||||
private HashMap<String, String> bucketToFileIDMap;
|
private final HashMap<String, String> bucketToFileIDMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a BucketStreamWriteFunction.
|
* Constructs a BucketStreamWriteFunction.
|
||||||
@@ -64,7 +73,7 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
|||||||
*/
|
*/
|
||||||
public BucketStreamWriteFunction(Configuration config) {
|
public BucketStreamWriteFunction(Configuration config) {
|
||||||
super(config);
|
super(config);
|
||||||
this.bucketToFileIDMap = new HashMap<String, String>();
|
this.bucketToFileIDMap = new HashMap<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -127,7 +136,7 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
|||||||
bucketToLoad.add(i);
|
bucketToLoad.add(i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bucketToLoad.stream().forEach(bucket -> LOG.info(String.format("bucketToLoad contains %s", bucket)));
|
bucketToLoad.forEach(bucket -> LOG.info(String.format("bucketToLoad contains %s", bucket)));
|
||||||
|
|
||||||
LOG.info(String.format("Loading Hoodie Table %s, with path %s", table.getMetaClient().getTableConfig().getTableName(),
|
LOG.info(String.format("Loading Hoodie Table %s, with path %s", table.getMetaClient().getTableConfig().getTableName(),
|
||||||
table.getMetaClient().getBasePath()));
|
table.getMetaClient().getBasePath()));
|
||||||
@@ -146,7 +155,7 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
|||||||
LOG.info(String.format("Should load this partition bucket %s with fileID %s", partitionBucketId, fileID));
|
LOG.info(String.format("Should load this partition bucket %s with fileID %s", partitionBucketId, fileID));
|
||||||
if (bucketToFileIDMap.containsKey(partitionBucketId)) {
|
if (bucketToFileIDMap.containsKey(partitionBucketId)) {
|
||||||
throw new RuntimeException(String.format("Duplicate fileID %s from partitionBucket %s found "
|
throw new RuntimeException(String.format("Duplicate fileID %s from partitionBucket %s found "
|
||||||
+ "during the fileGroupPerPartitionedBucketState initialization.", fileID, partitionBucketId));
|
+ "during the BucketStreamWriteFunction index bootstrap.", fileID, partitionBucketId));
|
||||||
} else {
|
} else {
|
||||||
LOG.info(String.format("Adding fileID %s to the partition bucket %s.", fileID, partitionBucketId));
|
LOG.info(String.format("Adding fileID %s to the partition bucket %s.", fileID, partitionBucketId));
|
||||||
bucketToFileIDMap.put(partitionBucketId, fileID);
|
bucketToFileIDMap.put(partitionBucketId, fileID);
|
||||||
|
|||||||
@@ -22,6 +22,11 @@ import org.apache.flink.configuration.Configuration;
|
|||||||
import org.apache.hudi.sink.common.AbstractWriteOperator;
|
import org.apache.hudi.sink.common.AbstractWriteOperator;
|
||||||
import org.apache.hudi.sink.common.WriteOperatorFactory;
|
import org.apache.hudi.sink.common.WriteOperatorFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Operator for {@link BucketStreamWriteFunction}.
|
||||||
|
*
|
||||||
|
* @param <I> The input type
|
||||||
|
*/
|
||||||
public class BucketStreamWriteOperator<I> extends AbstractWriteOperator<I> {
|
public class BucketStreamWriteOperator<I> extends AbstractWriteOperator<I> {
|
||||||
|
|
||||||
public BucketStreamWriteOperator(Configuration conf) {
|
public BucketStreamWriteOperator(Configuration conf) {
|
||||||
|
|||||||
@@ -22,7 +22,13 @@ import org.apache.flink.api.common.functions.Partitioner;
|
|||||||
import org.apache.hudi.common.model.HoodieKey;
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.index.bucket.BucketIdentifier;
|
import org.apache.hudi.index.bucket.BucketIdentifier;
|
||||||
|
|
||||||
public class BucketIndexPartitioner implements Partitioner {
|
/**
|
||||||
|
* Bucket index input partitioner.
|
||||||
|
* The fields to hash can be a subset of the primary key fields.
|
||||||
|
*
|
||||||
|
* @param <T> The type of obj to hash
|
||||||
|
*/
|
||||||
|
public class BucketIndexPartitioner<T extends HoodieKey> implements Partitioner<T> {
|
||||||
|
|
||||||
private final int bucketNum;
|
private final int bucketNum;
|
||||||
private final String indexKeyFields;
|
private final String indexKeyFields;
|
||||||
@@ -33,8 +39,8 @@ public class BucketIndexPartitioner implements Partitioner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int partition(Object key, int numPartitions) {
|
public int partition(HoodieKey key, int numPartitions) {
|
||||||
int curBucket = BucketIdentifier.getBucketId((HoodieKey) key, indexKeyFields, bucketNum);
|
int curBucket = BucketIdentifier.getBucketId(key, indexKeyFields, bucketNum);
|
||||||
return BucketIdentifier.mod(curBucket, numPartitions);
|
return BucketIdentifier.mod(curBucket, numPartitions);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
package org.apache.hudi.sink.utils;
|
package org.apache.hudi.sink.utils;
|
||||||
|
|
||||||
|
import org.apache.hudi.common.model.HoodieKey;
|
||||||
import org.apache.hudi.common.model.HoodieRecord;
|
import org.apache.hudi.common.model.HoodieRecord;
|
||||||
import org.apache.hudi.configuration.FlinkOptions;
|
import org.apache.hudi.configuration.FlinkOptions;
|
||||||
import org.apache.hudi.configuration.OptionsResolver;
|
import org.apache.hudi.configuration.OptionsResolver;
|
||||||
@@ -276,10 +277,10 @@ public class Pipelines {
|
|||||||
WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);
|
WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);
|
||||||
int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
|
int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
|
||||||
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
|
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
|
||||||
BucketIndexPartitioner partitioner = new BucketIndexPartitioner(bucketNum, indexKeyFields);
|
BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
|
||||||
return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
|
return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
|
||||||
.transform("hoodie_bucket_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
.transform("bucket_write", TypeInformation.of(Object.class), operatorFactory)
|
||||||
.uid("uid_hoodie_bucket_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
|
.uid("uid_bucket_write" + conf.getString(FlinkOptions.TABLE_NAME))
|
||||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
|
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
|
||||||
} else {
|
} else {
|
||||||
WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
|
WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
|
||||||
@@ -294,8 +295,8 @@ public class Pipelines {
|
|||||||
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
|
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(defaultParallelism))
|
||||||
// shuffle by fileId(bucket id)
|
// shuffle by fileId(bucket id)
|
||||||
.keyBy(record -> record.getCurrentLocation().getFileId())
|
.keyBy(record -> record.getCurrentLocation().getFileId())
|
||||||
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
|
.transform("stream_write", TypeInformation.of(Object.class), operatorFactory)
|
||||||
.uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
|
.uid("uid_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
|
||||||
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
|
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user