[HUDI-1880] Support streaming read with compaction and cleaning (#2921)
This commit is contained in:
@@ -87,8 +87,9 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
|
||||
* Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write.
|
||||
*/
|
||||
protected String generatesDataFileNameWithRollover() {
|
||||
final String fileID = this.fileId + "-" + rollNumber;
|
||||
return FSUtils.makeDataFileName(instantTime, writeToken, fileID, hoodieTable.getBaseFileExtension());
|
||||
// make the intermediate file as hidden
|
||||
return FSUtils.makeDataFileName("." + instantTime,
|
||||
writeToken + "-" + rollNumber, this.fileId, hoodieTable.getBaseFileExtension());
|
||||
}
|
||||
|
||||
public boolean shouldRollover() {
|
||||
@@ -193,13 +194,8 @@ public class FlinkMergeHandle<T extends HoodieRecordPayload, I, K, O>
|
||||
throw new HoodieIOException("Error when clean the temporary roll file: " + path, e);
|
||||
}
|
||||
}
|
||||
Path lastPath = rolloverPaths.size() > 0
|
||||
? rolloverPaths.get(rolloverPaths.size() - 1)
|
||||
: newFilePath;
|
||||
String newFileName = generatesDataFileName();
|
||||
String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/")
|
||||
+ newFileName).toString();
|
||||
final Path desiredPath = new Path(config.getBasePath(), relativePath);
|
||||
final Path lastPath = rolloverPaths.get(rolloverPaths.size() - 1);
|
||||
final Path desiredPath = rolloverPaths.get(0);
|
||||
try {
|
||||
fs.rename(lastPath, desiredPath);
|
||||
} catch (IOException e) {
|
||||
|
||||
@@ -33,7 +33,6 @@ import org.apache.hudi.table.action.commit.FlinkWriteHelper;
|
||||
import org.apache.hudi.util.StreamerUtil;
|
||||
|
||||
import org.apache.flink.annotation.VisibleForTesting;
|
||||
import org.apache.flink.api.common.state.CheckpointListener;
|
||||
import org.apache.flink.configuration.Configuration;
|
||||
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
|
||||
import org.apache.flink.runtime.state.FunctionInitializationContext;
|
||||
@@ -92,7 +91,7 @@ import java.util.function.BiFunction;
|
||||
*/
|
||||
public class StreamWriteFunction<K, I, O>
|
||||
extends KeyedProcessFunction<K, I, O>
|
||||
implements CheckpointedFunction, CheckpointListener {
|
||||
implements CheckpointedFunction {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
@@ -181,11 +180,6 @@ public class StreamWriteFunction<K, I, O>
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyCheckpointComplete(long checkpointId) {
|
||||
this.writeClient.cleanHandles();
|
||||
}
|
||||
|
||||
/**
|
||||
* End input action for batch source.
|
||||
*/
|
||||
@@ -390,6 +384,7 @@ public class StreamWriteFunction<K, I, O>
|
||||
.build();
|
||||
this.eventGateway.sendEventToCoordinator(event);
|
||||
this.buckets.clear();
|
||||
this.writeClient.cleanHandles();
|
||||
this.currentInstant = "";
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,8 +112,6 @@ public class StreamReadMonitoringFunction
|
||||
|
||||
private final long maxCompactionMemoryInBytes;
|
||||
|
||||
private final boolean isDelta;
|
||||
|
||||
public StreamReadMonitoringFunction(
|
||||
Configuration conf,
|
||||
Path path,
|
||||
@@ -124,7 +122,6 @@ public class StreamReadMonitoringFunction
|
||||
this.metaClient = metaClient;
|
||||
this.interval = conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL);
|
||||
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
|
||||
this.isDelta = conf.getString(FlinkOptions.TABLE_TYPE).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -189,15 +186,12 @@ public class StreamReadMonitoringFunction
|
||||
@VisibleForTesting
|
||||
public void monitorDirAndForwardSplits(SourceContext<MergeOnReadInputSplit> context) {
|
||||
metaClient.reloadActiveTimeline();
|
||||
HoodieTimeline commitTimeline = isDelta
|
||||
// if is delta, exclude the parquet files from compaction
|
||||
? metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants()
|
||||
: metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
|
||||
HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants();
|
||||
if (commitTimeline.empty()) {
|
||||
LOG.warn("No splits found for the table under path " + path);
|
||||
return;
|
||||
}
|
||||
List<HoodieInstant> instants = getUncompactedInstants(commitTimeline, this.issuedInstant);
|
||||
List<HoodieInstant> instants = filterInstantsWithStart(commitTimeline, this.issuedInstant);
|
||||
// get the latest instant that satisfies condition
|
||||
final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1);
|
||||
final InstantRange instantRange;
|
||||
@@ -303,29 +297,26 @@ public class StreamReadMonitoringFunction
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the uncompacted instants with a given issuedInstant to start from.
|
||||
* Returns the instants with a given issuedInstant to start from.
|
||||
*
|
||||
* @param commitTimeline The completed commits timeline
|
||||
* @param issuedInstant The last issued instant that has already been delivered to downstream
|
||||
* @return the filtered hoodie instants
|
||||
*/
|
||||
private List<HoodieInstant> getUncompactedInstants(
|
||||
private List<HoodieInstant> filterInstantsWithStart(
|
||||
HoodieTimeline commitTimeline,
|
||||
final String issuedInstant) {
|
||||
if (issuedInstant != null) {
|
||||
return commitTimeline.getInstants()
|
||||
.filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION))
|
||||
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant))
|
||||
.collect(Collectors.toList());
|
||||
} else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) {
|
||||
String definedStartCommit = this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT);
|
||||
return commitTimeline.getInstants()
|
||||
.filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION))
|
||||
.filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, definedStartCommit))
|
||||
.collect(Collectors.toList());
|
||||
} else {
|
||||
return commitTimeline.getInstants()
|
||||
.filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
}
|
||||
@@ -357,14 +348,26 @@ public class StreamReadMonitoringFunction
|
||||
|
||||
private List<FileStatus> getWritePathsOfInstant(HoodieCommitMetadata metadata, FileSystem fs) {
|
||||
return metadata.getFileIdAndFullPaths(path.toString()).values().stream()
|
||||
.map(path -> {
|
||||
.map(org.apache.hadoop.fs.Path::new)
|
||||
// filter out the file paths that does not exist, some files may be cleaned by
|
||||
// the cleaner.
|
||||
.filter(path -> {
|
||||
try {
|
||||
return fs.getFileStatus(new org.apache.hadoop.fs.Path(path));
|
||||
return fs.exists(path);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Checking exists of path: {} error", path);
|
||||
throw new HoodieException(e);
|
||||
}
|
||||
}).map(path -> {
|
||||
try {
|
||||
return fs.getFileStatus(path);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Get write status of path: {} error", path);
|
||||
throw new HoodieException(e);
|
||||
}
|
||||
})
|
||||
// filter out crushed files
|
||||
.filter(fileStatus -> fileStatus.getLen() > 0)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
|
||||
@@ -536,6 +536,9 @@ public class MergeOnReadInputFormat
|
||||
private final GenericRecordBuilder recordBuilder;
|
||||
|
||||
private final RowDataProjection projection;
|
||||
|
||||
private final InstantRange instantRange;
|
||||
|
||||
// add the flag because the flink ParquetColumnarRowSplitReader is buggy:
|
||||
// method #reachedEnd() returns false after it returns true.
|
||||
// refactor it out once FLINK-22370 is resolved.
|
||||
@@ -564,12 +567,20 @@ public class MergeOnReadInputFormat
|
||||
this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter(tableRowType);
|
||||
this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType);
|
||||
this.projection = RowDataProjection.instance(requiredRowType, requiredPos);
|
||||
this.instantRange = split.getInstantRange().orElse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean reachedEnd() throws IOException {
|
||||
if (!readLogs && !this.reader.reachedEnd()) {
|
||||
currentRecord = this.reader.nextRecord();
|
||||
if (instantRange != null) {
|
||||
boolean isInRange = instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString());
|
||||
if (!isInRange) {
|
||||
// filter base file by instant range
|
||||
return reachedEnd();
|
||||
}
|
||||
}
|
||||
final String curKey = currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString();
|
||||
if (logRecords.containsKey(curKey)) {
|
||||
keyToSkip.add(curKey);
|
||||
|
||||
@@ -163,7 +163,6 @@ public class StreamWriteFunctionWrapper<I> {
|
||||
functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId);
|
||||
coordinator.notifyCheckpointComplete(checkpointId);
|
||||
this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId);
|
||||
this.writeFunction.notifyCheckpointComplete(checkpointId);
|
||||
if (conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) {
|
||||
try {
|
||||
compactFunctionWrapper.compact(checkpointId);
|
||||
|
||||
Reference in New Issue
Block a user