[HUDI-2415] Add more info log for flink streaming reader (#3642)
This commit is contained in:
@@ -248,6 +248,13 @@ public class StreamReadMonitoringFunction
|
|||||||
List<HoodieCommitMetadata> activeMetadataList = instants.stream()
|
List<HoodieCommitMetadata> activeMetadataList = instants.stream()
|
||||||
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
|
.map(instant -> WriteProfiles.getCommitMetadata(tableName, path, instant, commitTimeline)).collect(Collectors.toList());
|
||||||
List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(instantRange, commitTimeline, tableName);
|
List<HoodieCommitMetadata> archivedMetadataList = getArchivedMetadata(instantRange, commitTimeline, tableName);
|
||||||
|
if (archivedMetadataList.size() > 0) {
|
||||||
|
LOG.warn(""
|
||||||
|
+ "--------------------------------------------------------------------------------\n"
|
||||||
|
+ "---------- caution: the reader has fall behind too much from the writer,\n"
|
||||||
|
+ "---------- tweak 'read.tasks' option to add parallelism of read tasks.\n"
|
||||||
|
+ "--------------------------------------------------------------------------------");
|
||||||
|
}
|
||||||
List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
|
List<HoodieCommitMetadata> metadataList = archivedMetadataList.size() > 0
|
||||||
? mergeList(activeMetadataList, archivedMetadataList)
|
? mergeList(activeMetadataList, archivedMetadataList)
|
||||||
: activeMetadataList;
|
: activeMetadataList;
|
||||||
@@ -288,6 +295,11 @@ public class StreamReadMonitoringFunction
|
|||||||
}
|
}
|
||||||
// update the issues instant time
|
// update the issues instant time
|
||||||
this.issuedInstant = commitToIssue;
|
this.issuedInstant = commitToIssue;
|
||||||
|
LOG.info(""
|
||||||
|
+ "------------------------------------------------------------\n"
|
||||||
|
+ "---------- consumed to instant: {}\n"
|
||||||
|
+ "------------------------------------------------------------",
|
||||||
|
commitToIssue);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
Reference in New Issue
Block a user