diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java index 4e32ec78b..7879243e7 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java @@ -38,10 +38,13 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.PathFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,9 +52,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * Operator helps to generate globally unique instant, it must be executed in one parallelism. Before generate a new @@ -71,16 +74,20 @@ public class InstantGenerateOperator extends AbstractStreamOperator latestInstantList = new ArrayList<>(1); private transient ListState latestInstantState; - private List bufferedRecords = new LinkedList(); - private transient ListState recordsState; private Integer retryTimes; private Integer retryInterval; + private static final String DELIMITER = "_"; + private static final String INSTANT_MARKER_FOLDER_NAME = ".instant_marker"; + private transient boolean isMain = false; + private transient AtomicLong recordCounter = new AtomicLong(0); + private StreamingRuntimeContext runtimeContext; + private int indexOfThisSubtask; @Override public void processElement(StreamRecord streamRecord) throws Exception { if (streamRecord.getValue() != null) { - bufferedRecords.add(streamRecord); output.collect(streamRecord); + recordCounter.incrementAndGet(); } } @@ -88,7 +95,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator latestInstantStateDescriptor = new ListStateDescriptor("latestInstant", String.class); - latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor); + runtimeContext = getRuntimeContext(); + indexOfThisSubtask = runtimeContext.getIndexOfThisSubtask(); + isMain = indexOfThisSubtask == 0; - // recordState - ListStateDescriptor recordsStateDescriptor = new ListStateDescriptor("recordsState", StreamRecord.class); - recordsState = context.getOperatorStateStore().getListState(recordsStateDescriptor); + if (isMain) { + // instantState + ListStateDescriptor latestInstantStateDescriptor = new ListStateDescriptor<>("latestInstant", String.class); + latestInstantState = context.getOperatorStateStore().getListState(latestInstantStateDescriptor); - if (context.isRestored()) { - Iterator latestInstantIterator = latestInstantState.get().iterator(); - latestInstantIterator.forEachRemaining(x -> latestInstant = x); - LOG.info("InstantGenerateOperator initializeState get latestInstant [{}]", latestInstant); - - Iterator recordIterator = recordsState.get().iterator(); - bufferedRecords.clear(); - recordIterator.forEachRemaining(x -> bufferedRecords.add(x)); + if (context.isRestored()) { + Iterator latestInstantIterator = latestInstantState.get().iterator(); + latestInstantIterator.forEachRemaining(x -> latestInstant = x); + LOG.info("Restoring the latest instant [{}] from the state", latestInstant); + } } } @Override public void snapshotState(StateSnapshotContext functionSnapshotContext) throws Exception { - if (latestInstantList.isEmpty()) { - latestInstantList.add(latestInstant); + long checkpointId = functionSnapshotContext.getCheckpointId(); + long recordSize = recordCounter.get(); + if (isMain) { + LOG.info("Update latest instant [{}] records size [{}] checkpointId [{}]", latestInstant, recordSize, checkpointId); + if (latestInstantList.isEmpty()) { + latestInstantList.add(latestInstant); + } else { + latestInstantList.set(0, latestInstant); + } + latestInstantState.update(latestInstantList); } else { - latestInstantList.set(0, latestInstant); + LOG.info("Task instance {} received {} records in checkpoint [{}]", indexOfThisSubtask, recordSize, checkpointId); } - latestInstantState.update(latestInstantList); - LOG.info("Update latest instant [{}]", latestInstant); - - recordsState.update(bufferedRecords); - LOG.info("Update records state size = [{}]", bufferedRecords.size()); - bufferedRecords.clear(); + recordCounter.set(0); } /** @@ -185,10 +205,10 @@ public class InstantGenerateOperator extends AbstractStreamOperator sb.append(x).append(",")); - LOG.warn("Latest transaction [{}] is not completed! unCompleted transaction:[{}],try times [{}]", latestInstant, sb.toString(), tryTimes); + LOG.warn("Latest transaction [{}] is not completed! unCompleted transaction:[{}],try times [{}]", latestInstant, sb, tryTimes); TimeUnit.SECONDS.sleep(retryInterval); rollbackPendingCommits = writeClient.getInflightsAndRequestedInstants(commitType); } else { @@ -222,4 +242,60 @@ public class InstantGenerateOperator extends AbstractStreamOperator 0) { + receivedData = true; + break; + } + } + + // delete all marker file + cleanMarkerDir(instantMarkerPath); + + return receivedData; + } + + private void createInstantMarkerDir() throws IOException { + // Always create instantMarkerFolder which is needed for InstantGenerateOperator + final Path instantMarkerFolder = new Path(new Path(cfg.targetBasePath, HoodieTableMetaClient.AUXILIARYFOLDER_NAME), INSTANT_MARKER_FOLDER_NAME); + if (!fs.exists(instantMarkerFolder)) { + fs.mkdirs(instantMarkerFolder); + } else { + // Clean marker dir. + cleanMarkerDir(instantMarkerFolder); + } + } + + private void cleanMarkerDir(Path instantMarkerFolder) throws IOException { + FileStatus[] fileStatuses = fs.listStatus(instantMarkerFolder); + for (FileStatus fileStatus : fileStatuses) { + fs.delete(fileStatus.getPath(), true); + } + } }