diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java index ec5690341..c5610d2f5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/StreamReadMonitoringFunction.java @@ -248,6 +248,13 @@ public class StreamReadMonitoringFunction List activeMetadataList = instants.stream() .map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList()); List archivedMetadataList = getArchivedMetadata(instantRange, commitTimeline, tableName); + if (archivedMetadataList.size() > 0) { + LOG.warn("" + + "--------------------------------------------------------------------------------\n" + + "---------- caution: the reader has fall behind too much from the writer,\n" + + "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n" + + "--------------------------------------------------------------------------------"); + } List metadataList = archivedMetadataList.size() > 0 ? mergeList(activeMetadataList, archivedMetadataList) : activeMetadataList; @@ -288,6 +295,11 @@ public class StreamReadMonitoringFunction } // update the issues instant time this.issuedInstant = commitToIssue; + LOG.info("" + + "------------------------------------------------------------\n" + + "---------- consumed to instant: {}\n" + + "------------------------------------------------------------", + commitToIssue); } @Override