1
0

[HUDI-3373] Add zero value metrics for empty data source and PROMETHEUS_PUSHGATEWAY reporter (#4760)

This commit is contained in:
Vinish Reddy
2022-02-08 01:47:46 +05:30
committed by GitHub
parent 3bd8fc1c3e
commit 8ab6f17149
2 changed files with 26 additions and 0 deletions

View File

@@ -130,6 +130,26 @@ public class HoodieMetrics {
return indexTimer == null ? null : indexTimer.time();
}
public void updateMetricsForEmptyData(String actionType) {
if (!config.isMetricsOn() || !config.getMetricsReporterType().equals(MetricsReporterType.PROMETHEUS_PUSHGATEWAY)) {
// No-op if metrics are not of type PROMETHEUS_PUSHGATEWAY.
return;
}
Metrics.registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), 0);
Metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), 0);
Metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), 0);
Metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"), 0);
Metrics.registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), 0);
Metrics.registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), 0);
Metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), 0);
Metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), 0);
Metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), 0);
Metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), 0);
Metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), 0);
Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), 0);
Metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), 0);
}
public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, HoodieCommitMetadata metadata,
String actionType) {
updateCommitTimingMetrics(commitEpochTimeInMs, durationInMs, metadata, actionType);

View File

@@ -58,6 +58,7 @@ import org.apache.hudi.keygen.KeyGenerator;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
import org.apache.hudi.metrics.HoodieMetrics;
import org.apache.hudi.sync.common.AbstractSyncTool;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.callback.kafka.HoodieWriteCommitKafkaCallback;
@@ -213,6 +214,8 @@ public class DeltaSync implements Serializable {
private transient HoodieDeltaStreamerMetrics metrics;
private transient HoodieMetrics hoodieMetrics;
public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, SchemaProvider schemaProvider,
TypedProperties props, JavaSparkContext jssc, FileSystem fs, Configuration conf,
Function<SparkRDDWriteClient, Boolean> onInitializingHoodieWriteClient) throws IOException {
@@ -233,6 +236,7 @@ public class DeltaSync implements Serializable {
this.transformer = UtilHelpers.createTransformer(cfg.transformerClassNames);
this.metrics = new HoodieDeltaStreamerMetrics(getHoodieClientConfig(this.schemaProvider));
this.hoodieMetrics = new HoodieMetrics(getHoodieClientConfig(this.schemaProvider));
this.formatAdapter = new SourceFormatAdapter(
UtilHelpers.createSource(cfg.sourceClassName, props, jssc, sparkSession, schemaProvider, metrics));
@@ -459,6 +463,8 @@ public class DeltaSync implements Serializable {
if (Objects.equals(checkpointStr, resumeCheckpointStr.orElse(null))) {
LOG.info("No new data, source checkpoint has not changed. Nothing to commit. Old checkpoint=("
+ resumeCheckpointStr + "). New Checkpoint=(" + checkpointStr + ")");
String commitActionType = CommitUtils.getCommitActionType(cfg.operation, HoodieTableType.valueOf(cfg.tableType));
hoodieMetrics.updateMetricsForEmptyData(commitActionType);
return null;
}