diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java index e874047b8..d13110fee 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java @@ -130,6 +130,26 @@ public class HoodieMetrics { return indexTimer == null ? null : indexTimer.time(); } + public void updateMetricsForEmptyData(String actionType) { + if (!config.isMetricsOn() || !config.getMetricsReporterType().equals(MetricsReporterType.PROMETHEUS_PUSHGATEWAY)) { + // No-op if metrics are not of type PROMETHEUS_PUSHGATEWAY. + return; + } + Metrics.registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), 0); + Metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), 0); + Metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), 0); + Metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"), 0); + Metrics.registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), 0); + Metrics.registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), 0); + Metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), 0); + Metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), 0); + Metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), 0); + Metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), 0); + Metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), 0); + Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), 0); + Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), 0); + } + public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, HoodieCommitMetadata metadata, String actionType) { updateCommitTimingMetrics(commitEpochTimeInMs, durationInMs, metadata, actionType); diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java index acfdc1bbc..91f09da1d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java @@ -58,6 +58,7 @@ import org.apache.hudi.keygen.KeyGenerator; import org.apache.hudi.keygen.SimpleKeyGenerator; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory; +import org.apache.hudi.metrics.HoodieMetrics; import org.apache.hudi.sync.common.AbstractSyncTool; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback; @@ -213,6 +214,8 @@ public class DeltaSync implements Serializable { private transient HoodieDeltaStreamerMetrics metrics; + private transient HoodieMetrics hoodieMetrics; + public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf, Function onInitializingHoodieWriteClient) throws IOException { @@ -233,6 +236,7 @@ public class DeltaSync implements Serializable { this.transformer = UtilHelpers.createTransformer(cfg.transformerClassNames); this.metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(this.schemaProvider)); + this.hoodieMetrics = new HoodieMetrics(getHoodieClientConfig(this.schemaProvider)); this.formatAdapter = new SourceFormatAdapter( UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider, metrics)); @@ -459,6 +463,8 @@ public class DeltaSync implements Serializable { if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) { LOG.info("No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=(" + resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")"); + String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType)); + hoodieMetrics.updateMetricsForEmptyData(commitActionType); return null; }