diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java index 4da64049b..7fffd6baf 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandle.java @@ -87,8 +87,9 @@ public class FlinkMergeHandle * Use the fileId + "-" + rollNumber as the new fileId of a mini-batch write. */ protected String generatesDataFileNameWithRollover() { - final String fileID = this.fileId + "-" + rollNumber; - return FSUtils.makeDataFileName(instantTime, writeToken, fileID, hoodieTable.getBaseFileExtension()); + // make the intermediate file as hidden + return FSUtils.makeDataFileName("." + instantTime, + writeToken + "-" + rollNumber, this.fileId, hoodieTable.getBaseFileExtension()); } public boolean shouldRollover() { @@ -193,13 +194,8 @@ public class FlinkMergeHandle throw new HoodieIOException("Error when clean the temporary roll file: " + path, e); } } - Path lastPath = rolloverPaths.size() > 0 - ? rolloverPaths.get(rolloverPaths.size() - 1) - : newFilePath; - String newFileName = generatesDataFileName(); - String relativePath = new Path((partitionPath.isEmpty() ? "" : partitionPath + "/") - + newFileName).toString(); - final Path desiredPath = new Path(config.getBasePath(), relativePath); + final Path lastPath = rolloverPaths.get(rolloverPaths.size() - 1); + final Path desiredPath = rolloverPaths.get(0); try { fs.rename(lastPath, desiredPath); } catch (IOException e) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 8244226b3..5c3cbb7b0 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -33,7 +33,6 @@ import org.apache.hudi.table.action.commit.FlinkWriteHelper; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.runtime.state.FunctionInitializationContext; @@ -92,7 +91,7 @@ import java.util.function.BiFunction; */ public class StreamWriteFunction extends KeyedProcessFunction - implements CheckpointedFunction, CheckpointListener { + implements CheckpointedFunction { private static final long serialVersionUID = 1L; @@ -181,11 +180,6 @@ public class StreamWriteFunction } } - @Override - public void notifyCheckpointComplete(long checkpointId) { - this.writeClient.cleanHandles(); - } - /** * End input action for batch source. */ @@ -390,6 +384,7 @@ public class StreamWriteFunction .build(); this.eventGateway.sendEventToCoordinator(event); this.buckets.clear(); + this.writeClient.cleanHandles(); this.currentInstant = ""; } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index f50872672..b6eb39754 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -112,8 +112,6 @@ public class StreamReadMonitoringFunction private final long maxCompactionMemoryInBytes; - private final boolean isDelta; - public StreamReadMonitoringFunction( Configuration conf, Path path, @@ -124,7 +122,6 @@ public class StreamReadMonitoringFunction this.metaClient = metaClient; this.interval = conf.getInteger(FlinkOptions.READ_STREAMING_CHECK_INTERVAL); this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes; - this.isDelta = conf.getString(FlinkOptions.TABLE_TYPE).equalsIgnoreCase(FlinkOptions.TABLE_TYPE_MERGE_ON_READ); } @Override @@ -189,15 +186,12 @@ public class StreamReadMonitoringFunction @VisibleForTesting public void monitorDirAndForwardSplits(SourceContext context) { metaClient.reloadActiveTimeline(); - HoodieTimeline commitTimeline = isDelta - // if is delta, exclude the parquet files from compaction - ? metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants() - : metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(); + HoodieTimeline commitTimeline = metaClient.getCommitsAndCompactionTimeline().filterCompletedInstants(); if (commitTimeline.empty()) { LOG.warn("No splits found for the table under path " + path); return; } - List instants = getUncompactedInstants(commitTimeline, this.issuedInstant); + List instants = filterInstantsWithStart(commitTimeline, this.issuedInstant); // get the latest instant that satisfies condition final HoodieInstant instantToIssue = instants.size() == 0 ? null : instants.get(instants.size() - 1); final InstantRange instantRange; @@ -303,29 +297,26 @@ public class StreamReadMonitoringFunction } /** - * Returns the uncompacted instants with a given issuedInstant to start from. + * Returns the instants with a given issuedInstant to start from. * * @param commitTimeline The completed commits timeline * @param issuedInstant The last issued instant that has already been delivered to downstream * @return the filtered hoodie instants */ - private List getUncompactedInstants( + private List filterInstantsWithStart( HoodieTimeline commitTimeline, final String issuedInstant) { if (issuedInstant != null) { return commitTimeline.getInstants() - .filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN, issuedInstant)) .collect(Collectors.toList()); } else if (this.conf.getOptional(FlinkOptions.READ_STREAMING_START_COMMIT).isPresent()) { String definedStartCommit = this.conf.get(FlinkOptions.READ_STREAMING_START_COMMIT); return commitTimeline.getInstants() - .filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) .filter(s -> HoodieTimeline.compareTimestamps(s.getTimestamp(), GREATER_THAN_OR_EQUALS, definedStartCommit)) .collect(Collectors.toList()); } else { return commitTimeline.getInstants() - .filter(s -> !s.getAction().equals(HoodieTimeline.COMPACTION_ACTION)) .collect(Collectors.toList()); } } @@ -357,14 +348,26 @@ public class StreamReadMonitoringFunction private List getWritePathsOfInstant(HoodieCommitMetadata metadata, FileSystem fs) { return metadata.getFileIdAndFullPaths(path.toString()).values().stream() - .map(path -> { + .map(org.apache.hadoop.fs.Path::new) + // filter out the file paths that does not exist, some files may be cleaned by + // the cleaner. + .filter(path -> { try { - return fs.getFileStatus(new org.apache.hadoop.fs.Path(path)); + return fs.exists(path); + } catch (IOException e) { + LOG.error("Checking exists of path: {} error", path); + throw new HoodieException(e); + } + }).map(path -> { + try { + return fs.getFileStatus(path); } catch (IOException e) { LOG.error("Get write status of path: {} error", path); throw new HoodieException(e); } }) + // filter out crushed files + .filter(fileStatus -> fileStatus.getLen() > 0) .collect(Collectors.toList()); } diff --git a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java index 595bf5a0f..f0f4f41b0 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java +++ b/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java @@ -536,6 +536,9 @@ public class MergeOnReadInputFormat private final GenericRecordBuilder recordBuilder; private final RowDataProjection projection; + + private final InstantRange instantRange; + // add the flag because the flink ParquetColumnarRowSplitReader is buggy: // method #reachedEnd() returns false after it returns true. // refactor it out once FLINK-22370 is resolved. @@ -564,12 +567,20 @@ public class MergeOnReadInputFormat this.rowDataToAvroConverter = RowDataToAvroConverters.createConverter(tableRowType); this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(requiredRowType); this.projection = RowDataProjection.instance(requiredRowType, requiredPos); + this.instantRange = split.getInstantRange().orElse(null); } @Override public boolean reachedEnd() throws IOException { if (!readLogs && !this.reader.reachedEnd()) { currentRecord = this.reader.nextRecord(); + if (instantRange != null) { + boolean isInRange = instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString()); + if (!isInRange) { + // filter base file by instant range + return reachedEnd(); + } + } final String curKey = currentRecord.getString(HOODIE_RECORD_KEY_COL_POS).toString(); if (logRecords.containsKey(curKey)) { keyToSkip.add(curKey); diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index a4b6c16a3..5050109a8 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -163,7 +163,6 @@ public class StreamWriteFunctionWrapper { functionInitializationContext.getOperatorStateStore().checkpointSuccess(checkpointId); coordinator.notifyCheckpointComplete(checkpointId); this.bucketAssignerFunction.notifyCheckpointComplete(checkpointId); - this.writeFunction.notifyCheckpointComplete(checkpointId); if (conf.getBoolean(FlinkOptions.COMPACTION_ASYNC_ENABLED)) { try { compactFunctionWrapper.compact(checkpointId);