1
0

[HUDI-1638] Some improvements to BucketAssignFunction (#2600)

- The #initializeState executes before #open, thus, the
  #checkPartitionsLoaded may see null `initialPartitionsToLoad`
  - Only load the existing partitions
This commit is contained in:
Danny Chan
2021-02-25 14:33:21 +08:00
committed by GitHub
parent 97864a48c1
commit 06dc7c7fd8

View File

@@ -55,6 +55,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@@ -108,7 +109,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
* All the partition paths when the task starts. It is used to help checking whether all the partitions * All the partition paths when the task starts. It is used to help checking whether all the partitions
* are loaded into the state. * are loaded into the state.
*/ */
private transient List<String> initialPartitionsToLoad; private transient Set<String> initialPartitionsToLoad;
/** /**
* State to book-keep which partition is loaded into the index state {@code indexState}. * State to book-keep which partition is loaded into the index state {@code indexState}.
@@ -136,15 +137,10 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
new SerializableConfiguration(this.hadoopConf), new SerializableConfiguration(this.hadoopConf),
new FlinkTaskContextSupplier(getRuntimeContext())); new FlinkTaskContextSupplier(getRuntimeContext()));
this.bucketAssigner = new BucketAssigner(context, writeConfig); this.bucketAssigner = new BucketAssigner(context, writeConfig);
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(this.context,
this.conf.getString(FlinkOptions.PATH), false, false, false); // initialize and check the partitions load state
final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks(); loadInitialPartitions();
final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks(); checkPartitionsLoaded();
final int taskID = getRuntimeContext().getIndexOfThisSubtask();
// reference: org.apache.flink.streaming.api.datastream.KeyedStream
this.initialPartitionsToLoad = allPartitionPaths.stream()
.filter(partition -> KeyGroupRangeAssignment.assignKeyToParallelOperator(partition, maxParallelism, parallelism) == taskID)
.collect(Collectors.toList());
} }
@Override @Override
@@ -163,9 +159,6 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
MapStateDescriptor<String, Integer> partitionLoadStateDesc = MapStateDescriptor<String, Integer> partitionLoadStateDesc =
new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT); new MapStateDescriptor<>("partitionLoadState", Types.STRING, Types.INT);
partitionLoadState = context.getKeyedStateStore().getMapState(partitionLoadStateDesc); partitionLoadState = context.getKeyedStateStore().getMapState(partitionLoadStateDesc);
if (context.isRestored()) {
checkPartitionsLoaded();
}
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@@ -178,7 +171,9 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
final HoodieKey hoodieKey = record.getKey(); final HoodieKey hoodieKey = record.getKey();
final BucketInfo bucketInfo; final BucketInfo bucketInfo;
final HoodieRecordLocation location; final HoodieRecordLocation location;
if (!allPartitionsLoaded && !partitionLoadState.contains(hoodieKey.getPartitionPath())) { if (!allPartitionsLoaded
&& initialPartitionsToLoad.contains(hoodieKey.getPartitionPath()) // this is an existing partition
&& !partitionLoadState.contains(hoodieKey.getPartitionPath())) {
// If the partition records are never loaded, load the records first. // If the partition records are never loaded, load the records first.
loadRecords(hoodieKey.getPartitionPath()); loadRecords(hoodieKey.getPartitionPath());
} }
@@ -244,6 +239,21 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
partitionLoadState.put(partitionPath, 0); partitionLoadState.put(partitionPath, 0);
} }
/**
* Loads the existing partitions for this task.
*/
private void loadInitialPartitions() {
List<String> allPartitionPaths = FSUtils.getAllPartitionPaths(this.context,
this.conf.getString(FlinkOptions.PATH), false, false, false);
final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
final int taskID = getRuntimeContext().getIndexOfThisSubtask();
// reference: org.apache.flink.streaming.api.datastream.KeyedStream
this.initialPartitionsToLoad = allPartitionPaths.stream()
.filter(partition -> KeyGroupRangeAssignment.assignKeyToParallelOperator(partition, maxParallelism, parallelism) == taskID)
.collect(Collectors.toSet());
}
/** /**
* Checks whether all the partitions of the table are loaded into the state, * Checks whether all the partitions of the table are loaded into the state,
* set the flag {@code allPartitionsLoaded} to true if it is. * set the flag {@code allPartitionsLoaded} to true if it is.
@@ -271,6 +281,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>>
public void clearIndexState() { public void clearIndexState() {
this.allPartitionsLoaded = false; this.allPartitionsLoaded = false;
this.indexState.clear(); this.indexState.clear();
loadInitialPartitions();
} }
@VisibleForTesting @VisibleForTesting