[MINOR] Remove InstantGeneratorOperator parallelism limit in HoodieFlinkStreamer and update docs (#2471)
This commit is contained in:
@@ -93,11 +93,10 @@ public class HoodieFlinkStreamer {
|
|||||||
.name("kafka_to_hudi_record")
|
.name("kafka_to_hudi_record")
|
||||||
.uid("kafka_to_hudi_record_uid");
|
.uid("kafka_to_hudi_record_uid");
|
||||||
|
|
||||||
// InstantGenerateOperator helps to emit globally unique instantTime, it must be executed in one parallelism
|
// InstantGenerateOperator helps to emit globally unique instantTime
|
||||||
inputRecords.transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator())
|
inputRecords.transform(InstantGenerateOperator.NAME, TypeInformation.of(HoodieRecord.class), new InstantGenerateOperator())
|
||||||
.name("instant_generator")
|
.name("instant_generator")
|
||||||
.uid("instant_generator_id")
|
.uid("instant_generator_id")
|
||||||
.setParallelism(1)
|
|
||||||
|
|
||||||
// Keyby partition path, to avoid multiple subtasks writing to a partition at the same time
|
// Keyby partition path, to avoid multiple subtasks writing to a partition at the same time
|
||||||
.keyBy(HoodieRecord::getPartitionPath)
|
.keyBy(HoodieRecord::getPartitionPath)
|
||||||
|
|||||||
@@ -57,10 +57,9 @@ import java.util.concurrent.TimeUnit;
|
|||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Operator helps to generate globally unique instant, it must be executed in one parallelism. Before generate a new
|
* Operator helps to generate globally unique instant. Before generate a new instant {@link InstantGenerateOperator}
|
||||||
* instant , {@link InstantGenerateOperator} will always check whether the last instant has completed. if it is
|
* will always check whether the last instant has completed. if it is completed and has records flows in, a new instant
|
||||||
* completed, a new instant will be generated immediately, otherwise, wait and check the state of last instant until
|
* will be generated immediately, otherwise, wait and check the state of last instant until time out and throw an exception.
|
||||||
* time out and throw an exception.
|
|
||||||
*/
|
*/
|
||||||
public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord> implements OneInputStreamOperator<HoodieRecord, HoodieRecord> {
|
public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord> implements OneInputStreamOperator<HoodieRecord, HoodieRecord> {
|
||||||
|
|
||||||
@@ -128,11 +127,11 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
|
|||||||
super.prepareSnapshotPreBarrier(checkpointId);
|
super.prepareSnapshotPreBarrier(checkpointId);
|
||||||
String instantMarkerFileName = String.format("%d%s%d%s%d", indexOfThisSubtask, DELIMITER, checkpointId, DELIMITER, recordCounter.get());
|
String instantMarkerFileName = String.format("%d%s%d%s%d", indexOfThisSubtask, DELIMITER, checkpointId, DELIMITER, recordCounter.get());
|
||||||
Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName);
|
Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName);
|
||||||
// mk marker file by each subtask
|
// create marker file
|
||||||
fs.create(path, true);
|
fs.create(path, true);
|
||||||
LOG.info("Subtask [{}] at checkpoint [{}] created marker file [{}]", indexOfThisSubtask, checkpointId, instantMarkerFileName);
|
LOG.info("Subtask [{}] at checkpoint [{}] created marker file [{}]", indexOfThisSubtask, checkpointId, instantMarkerFileName);
|
||||||
if (isMain) {
|
if (isMain) {
|
||||||
// check whether the last instant is completed, if not, wait 10s and then throws an exception
|
// check whether the last instant is completed, will try specific times until an exception is thrown
|
||||||
if (!StringUtils.isNullOrEmpty(latestInstant)) {
|
if (!StringUtils.isNullOrEmpty(latestInstant)) {
|
||||||
doCheck();
|
doCheck();
|
||||||
// last instant completed, set it empty
|
// last instant completed, set it empty
|
||||||
@@ -264,7 +263,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
|
|||||||
}
|
}
|
||||||
|
|
||||||
boolean receivedData = false;
|
boolean receivedData = false;
|
||||||
// judge whether has data in this checkpoint and delete maker file.
|
// check whether has data in this checkpoint and delete maker file.
|
||||||
for (FileStatus fileStatus : fileStatuses) {
|
for (FileStatus fileStatus : fileStatuses) {
|
||||||
Path path = fileStatus.getPath();
|
Path path = fileStatus.getPath();
|
||||||
String name = path.getName();
|
String name = path.getName();
|
||||||
|
|||||||
Reference in New Issue
Block a user