[HUDI-3758] Fix duplicate fileId error in MOR table type with flink bucket hash Index (#5185)
* fix duplicate fileId with bucket Index * replace to load FileGroup from FileSystemView
This commit is contained in:
@@ -75,11 +75,6 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
|||||||
*/
|
*/
|
||||||
private Set<String> incBucketIndex;
|
private Set<String> incBucketIndex;
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns whether this is an empty table.
|
|
||||||
*/
|
|
||||||
private boolean isEmptyTable;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructs a BucketStreamWriteFunction.
|
* Constructs a BucketStreamWriteFunction.
|
||||||
*
|
*
|
||||||
@@ -99,7 +94,6 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
|||||||
this.bucketToLoad = getBucketToLoad();
|
this.bucketToLoad = getBucketToLoad();
|
||||||
this.bucketIndex = new HashMap<>();
|
this.bucketIndex = new HashMap<>();
|
||||||
this.incBucketIndex = new HashSet<>();
|
this.incBucketIndex = new HashSet<>();
|
||||||
this.isEmptyTable = !this.metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().isPresent();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -162,7 +156,7 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
|||||||
* This is a required operation for each restart to avoid having duplicate file ids for one bucket.
|
* This is a required operation for each restart to avoid having duplicate file ids for one bucket.
|
||||||
*/
|
*/
|
||||||
private void bootstrapIndexIfNeed(String partition) {
|
private void bootstrapIndexIfNeed(String partition) {
|
||||||
if (isEmptyTable || bucketIndex.containsKey(partition)) {
|
if (bucketIndex.containsKey(partition)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
LOG.info(String.format("Loading Hoodie Table %s, with path %s", this.metaClient.getTableConfig().getTableName(),
|
LOG.info(String.format("Loading Hoodie Table %s, with path %s", this.metaClient.getTableConfig().getTableName(),
|
||||||
@@ -170,8 +164,8 @@ public class BucketStreamWriteFunction<I> extends StreamWriteFunction<I> {
|
|||||||
|
|
||||||
// Load existing fileID belongs to this task
|
// Load existing fileID belongs to this task
|
||||||
Map<Integer, String> bucketToFileIDMap = new HashMap<>();
|
Map<Integer, String> bucketToFileIDMap = new HashMap<>();
|
||||||
this.writeClient.getHoodieTable().getHoodieView().getLatestFileSlices(partition).forEach(fileSlice -> {
|
this.writeClient.getHoodieTable().getFileSystemView().getAllFileGroups(partition).forEach(fileGroup -> {
|
||||||
String fileID = fileSlice.getFileId();
|
String fileID = fileGroup.getFileGroupId().getFileId();
|
||||||
int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID);
|
int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileID);
|
||||||
if (bucketToLoad.contains(bucketNumber)) {
|
if (bucketToLoad.contains(bucketNumber)) {
|
||||||
LOG.info(String.format("Should load this partition bucket %s with fileID %s", bucketNumber, fileID));
|
LOG.info(String.format("Should load this partition bucket %s with fileID %s", bucketNumber, fileID));
|
||||||
|
|||||||
Reference in New Issue
Block a user