1
0

[HUDI-1511] InstantGenerateOperator support multiple parallelism (#2434)

This commit is contained in:
luokey
2021-01-22 09:17:50 +08:00
committed by GitHub
parent 976420c49a
commit b64d22e047

View File

@@ -38,10 +38,13 @@ import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@@ -49,9 +52,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/** /**
* Operator helps to generate globally unique instant, it must be executed in one parallelism. Before generate a new * Operator helps to generate globally unique instant, it must be executed in one parallelism. Before generate a new
@@ -71,16 +74,20 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
private String latestInstant = ""; private String latestInstant = "";
private List<String> latestInstantList = new ArrayList<>(1); private List<String> latestInstantList = new ArrayList<>(1);
private transient ListState<String> latestInstantState; private transient ListState<String> latestInstantState;
private List<StreamRecord> bufferedRecords = new LinkedList();
private transient ListState<StreamRecord> recordsState;
private Integer retryTimes; private Integer retryTimes;
private Integer retryInterval; private Integer retryInterval;
private static final String DELIMITER = "_";
private static final String INSTANT_MARKER_FOLDER_NAME = ".instant_marker";
private transient boolean isMain = false;
private transient AtomicLong recordCounter = new AtomicLong(0);
private StreamingRuntimeContext runtimeContext;
private int indexOfThisSubtask;
@Override @Override
public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception { public void processElement(StreamRecord<HoodieRecord> streamRecord) throws Exception {
if (streamRecord.getValue() != null) { if (streamRecord.getValue() != null) {
bufferedRecords.add(streamRecord);
output.collect(streamRecord); output.collect(streamRecord);
recordCounter.incrementAndGet();
} }
} }
@@ -88,7 +95,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
public void open() throws Exception { public void open() throws Exception {
super.open(); super.open();
// get configs from runtimeContext // get configs from runtimeContext
cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); cfg = (HoodieFlinkStreamer.Config) runtimeContext.getExecutionConfig().getGlobalJobParameters();
// retry times // retry times
retryTimes = Integer.valueOf(cfg.blockRetryTime); retryTimes = Integer.valueOf(cfg.blockRetryTime);
@@ -102,65 +109,78 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
// Hadoop FileSystem // Hadoop FileSystem
fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get()); fs = FSUtils.getFs(cfg.targetBasePath, serializableHadoopConf.get());
TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(null); if (isMain) {
TaskContextSupplier taskContextSupplier = new FlinkTaskContextSupplier(runtimeContext);
// writeClient // writeClient
writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true); writeClient = new HoodieFlinkWriteClient(new HoodieFlinkEngineContext(taskContextSupplier), StreamerUtil.getHoodieClientConfig(cfg), true);
// init table, create it if not exists. // init table, create it if not exists.
initTable(); initTable();
// create instant marker directory
createInstantMarkerDir();
}
} }
@Override @Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
super.prepareSnapshotPreBarrier(checkpointId); super.prepareSnapshotPreBarrier(checkpointId);
// check whether the last instant is completed, if not, wait 10s and then throws an exception String instantMarkerFileName = String.format("%d%s%d%s%d", indexOfThisSubtask, DELIMITER, checkpointId, DELIMITER, recordCounter.get());
if (!StringUtils.isNullOrEmpty(latestInstant)) { Path path = new Path(new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME), instantMarkerFileName);
doCheck(); // mk marker file by each subtask
// last instant completed, set it empty fs.create(path, true);
latestInstant = ""; LOG.info("Subtask [{}] at checkpoint [{}] created marker file [{}]", indexOfThisSubtask, checkpointId, instantMarkerFileName);
} if (isMain) {
// check whether the last instant is completed, if not, wait 10s and then throws an exception
// no data no new instant if (!StringUtils.isNullOrEmpty(latestInstant)) {
if (!bufferedRecords.isEmpty()) { doCheck();
latestInstant = startNewInstant(checkpointId); // last instant completed, set it empty
latestInstant = "";
}
boolean receivedDataInCurrentCP = checkReceivedData(checkpointId);
// no data no new instant
if (receivedDataInCurrentCP) {
latestInstant = startNewInstant(checkpointId);
}
} }
} }
@Override @Override
public void initializeState(StateInitializationContext context) throws Exception { public void initializeState(StateInitializationContext context) throws Exception {
// instantState runtimeContext = getRuntimeContext();
ListStateDescriptor<String> latestInstantStateDescriptor = new ListStateDescriptor<String>("latestInstant", String.class); indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask();
latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor); isMain = indexOfThisSubtask == 0;
// recordState if (isMain) {
ListStateDescriptor<StreamRecord> recordsStateDescriptor = new ListStateDescriptor<StreamRecord>("recordsState", StreamRecord.class); // instantState
recordsState = context.getOperatorStateStore().getListState(recordsStateDescriptor); ListStateDescriptor<String> latestInstantStateDescriptor = new ListStateDescriptor<>("latestInstant", String.class);
latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor);
if (context.isRestored()) { if (context.isRestored()) {
Iterator<String> latestInstantIterator = latestInstantState.get().iterator(); Iterator<String> latestInstantIterator = latestInstantState.get().iterator();
latestInstantIterator.forEachRemaining(x -> latestInstant = x); latestInstantIterator.forEachRemaining(x -> latestInstant = x);
LOG.info("InstantGenerateOperator initializeState get latestInstant [{}]", latestInstant); LOG.info("Restoring the latest instant [{}] from the state", latestInstant);
}
Iterator<StreamRecord> recordIterator = recordsState.get().iterator();
bufferedRecords.clear();
recordIterator.forEachRemaining(x -> bufferedRecords.add(x));
} }
} }
@Override @Override
public void snapshotState(StateSnapshotContext functionSnapshotContext) throws Exception { public void snapshotState(StateSnapshotContext functionSnapshotContext) throws Exception {
if (latestInstantList.isEmpty()) { long checkpointId = functionSnapshotContext.getCheckpointId();
latestInstantList.add(latestInstant); long recordSize = recordCounter.get();
if (isMain) {
LOG.info("Update latest instant [{}] records size [{}] checkpointId [{}]", latestInstant, recordSize, checkpointId);
if (latestInstantList.isEmpty()) {
latestInstantList.add(latestInstant);
} else {
latestInstantList.set(0, latestInstant);
}
latestInstantState.update(latestInstantList);
} else { } else {
latestInstantList.set(0, latestInstant); LOG.info("Task instance {} received {} records in checkpoint [{}]", indexOfThisSubtask, recordSize, checkpointId);
} }
latestInstantState.update(latestInstantList); recordCounter.set(0);
LOG.info("Update latest instant [{}]", latestInstant);
recordsState.update(bufferedRecords);
LOG.info("Update records state size = [{}]", bufferedRecords.size());
bufferedRecords.clear();
} }
/** /**
@@ -185,10 +205,10 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
int tryTimes = 0; int tryTimes = 0;
while (tryTimes < retryTimes) { while (tryTimes < retryTimes) {
tryTimes++; tryTimes++;
StringBuffer sb = new StringBuffer(); StringBuilder sb = new StringBuilder();
if (rollbackPendingCommits.contains(latestInstant)) { if (rollbackPendingCommits.contains(latestInstant)) {
rollbackPendingCommits.forEach(x -> sb.append(x).append(",")); rollbackPendingCommits.forEach(x -> sb.append(x).append(","));
LOG.warn("Latest transaction [{}] is not completed! unCompleted transaction:[{}],try times [{}]", latestInstant, sb.toString(), tryTimes); LOG.warn("Latest transaction [{}] is not completed! unCompleted transaction:[{}],try times [{}]", latestInstant, sb, tryTimes);
TimeUnit.SECONDS.sleep(retryInterval); TimeUnit.SECONDS.sleep(retryInterval);
rollbackPendingCommits = writeClient.getInflightsAndRequestedInstants(commitType); rollbackPendingCommits = writeClient.getInflightsAndRequestedInstants(commitType);
} else { } else {
@@ -222,4 +242,60 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
fs.close(); fs.close();
} }
} }
private boolean checkReceivedData(long checkpointId) throws InterruptedException, IOException {
int numberOfParallelSubtasks = runtimeContext.getNumberOfParallelSubtasks();
FileStatus[] fileStatuses;
Path instantMarkerPath = new Path(HoodieTableMetaClient.AUXILIARYFOLDER_NAME, INSTANT_MARKER_FOLDER_NAME);
// waiting all subtask create marker file ready
while (true) {
Thread.sleep(500L);
fileStatuses = fs.listStatus(instantMarkerPath, new PathFilter() {
@Override
public boolean accept(Path pathname) {
return pathname.getName().contains(String.format("%s%d%s", DELIMITER, checkpointId, DELIMITER));
}
});
// is ready
if (fileStatuses != null && fileStatuses.length == numberOfParallelSubtasks) {
break;
}
}
boolean receivedData = false;
// judge whether has data in this checkpoint and delete maker file.
for (FileStatus fileStatus : fileStatuses) {
Path path = fileStatus.getPath();
String name = path.getName();
// has data
if (Long.parseLong(name.split(DELIMITER)[2]) > 0) {
receivedData = true;
break;
}
}
// delete all marker file
cleanMarkerDir(instantMarkerPath);
return receivedData;
}
private void createInstantMarkerDir() throws IOException {
// Always create instantMarkerFolder which is needed for InstantGenerateOperator
final Path instantMarkerFolder = new Path(new Path(cfg.targetBasePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME), INSTANT_MARKER_FOLDER_NAME);
if (!fs.exists(instantMarkerFolder)) {
fs.mkdirs(instantMarkerFolder);
} else {
// Clean marker dir.
cleanMarkerDir(instantMarkerFolder);
}
}
private void cleanMarkerDir(Path instantMarkerFolder) throws IOException {
FileStatus[] fileStatuses = fs.listStatus(instantMarkerFolder);
for (FileStatus fileStatus : fileStatuses) {
fs.delete(fileStatus.getPath(), true);
}
}
} }