diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index c36a90f5a..2aad344de 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -290,6 +290,8 @@ public class DeltaSync implements Serializable { srcRecordsWithCkpt.getRight().getLeft(), metrics, overallTimerContext); } + metrics.updateDeltaStreamerSyncMetrics(System.currentTimeMillis()); + // Clear persistent RDDs jssc.getPersistentRDDs().values().forEach(JavaRDD::unpersist); return result; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java index 056713771..0c2f18fa4 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerMetrics.java @@ -95,6 +95,12 @@ public class HoodieDeltaStreamerMetrics implements Serializable { } } + public void updateDeltaStreamerSyncMetrics(long syncEpochTimeInMs) { + if (config.isMetricsOn()) { + Metrics.registerGauge(getMetricsName("deltastreamer", "lastSync"), syncEpochTimeInMs); + } + } + public long getDurationInMs(long ctxDuration) { return ctxDuration / 1000000; }