[HUDI-1587] Add latency and freshness support (#2541)
Save min and max of event time in each commit and compute the latency and freshness metrics.
This commit is contained in:
@@ -21,20 +21,28 @@ package org.apache.hudi.client;
|
||||
import org.apache.hudi.common.model.HoodieKey;
|
||||
import org.apache.hudi.common.model.HoodieRecord;
|
||||
import org.apache.hudi.common.model.HoodieWriteStat;
|
||||
import org.apache.hudi.common.util.DateTimeUtils;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.time.DateTimeException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
import static org.apache.hudi.common.model.DefaultHoodieRecordPayload.METADATA_EVENT_TIME_KEY;
|
||||
|
||||
/**
|
||||
* Status of a write operation.
|
||||
*/
|
||||
public class WriteStatus implements Serializable {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(WriteStatus.class);
|
||||
private static final long serialVersionUID = 1L;
|
||||
private static final long RANDOM_SEED = 9038412832L;
|
||||
|
||||
@@ -77,6 +85,18 @@ public class WriteStatus implements Serializable {
|
||||
writtenRecords.add(record);
|
||||
}
|
||||
totalRecords++;
|
||||
|
||||
// get the min and max event time for calculating latency and freshness
|
||||
if (optionalRecordMetadata.isPresent()) {
|
||||
String eventTimeVal = optionalRecordMetadata.get().getOrDefault(METADATA_EVENT_TIME_KEY, null);
|
||||
try {
|
||||
long eventTime = DateTimeUtils.parseDateTime(eventTimeVal).toEpochMilli();
|
||||
stat.setMinEventTime(eventTime);
|
||||
stat.setMaxEventTime(eventTime);
|
||||
} catch (DateTimeException | IllegalArgumentException e) {
|
||||
LOG.debug(String.format("Fail to parse event time value: %s", eventTimeVal), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -19,14 +19,15 @@
|
||||
package org.apache.hudi.config;
|
||||
|
||||
import org.apache.hudi.common.config.DefaultHoodieConfig;
|
||||
import org.apache.hudi.config.HoodieMemoryConfig.Builder;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
|
||||
import static org.apache.hudi.common.model.HoodiePayloadProps.DEFAULT_PAYLOAD_EVENT_TIME_FIELD_VAL;
|
||||
import static org.apache.hudi.common.model.HoodiePayloadProps.DEFAULT_PAYLOAD_ORDERING_FIELD_VAL;
|
||||
import static org.apache.hudi.common.model.HoodiePayloadProps.PAYLOAD_EVENT_TIME_FIELD_PROP;
|
||||
import static org.apache.hudi.common.model.HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP;
|
||||
|
||||
/**
|
||||
@@ -63,10 +64,17 @@ public class HoodiePayloadConfig extends DefaultHoodieConfig {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withPayloadEventTimeField(String payloadEventTimeField) {
|
||||
props.setProperty(PAYLOAD_EVENT_TIME_FIELD_PROP, String.valueOf(payloadEventTimeField));
|
||||
return this;
|
||||
}
|
||||
|
||||
public HoodiePayloadConfig build() {
|
||||
HoodiePayloadConfig config = new HoodiePayloadConfig(props);
|
||||
setDefaultOnCondition(props, !props.containsKey(PAYLOAD_ORDERING_FIELD_PROP), PAYLOAD_ORDERING_FIELD_PROP,
|
||||
String.valueOf(DEFAULT_PAYLOAD_ORDERING_FIELD_VAL));
|
||||
setDefaultOnCondition(props, !props.containsKey(PAYLOAD_EVENT_TIME_FIELD_PROP), PAYLOAD_EVENT_TIME_FIELD_PROP,
|
||||
String.valueOf(DEFAULT_PAYLOAD_EVENT_TIME_FIELD_VAL));
|
||||
return config;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,8 @@ package org.apache.hudi.metrics;
|
||||
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.table.timeline.HoodieTimeline;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
@@ -130,6 +132,7 @@ public class HoodieMetrics {
|
||||
|
||||
public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, HoodieCommitMetadata metadata,
|
||||
String actionType) {
|
||||
updateCommitTimingMetrics(commitEpochTimeInMs, durationInMs, metadata, actionType);
|
||||
if (config.isMetricsOn()) {
|
||||
long totalPartitionsWritten = metadata.fetchTotalPartitionsWritten();
|
||||
long totalFilesInsert = metadata.fetchTotalFilesInsert();
|
||||
@@ -144,7 +147,6 @@ public class HoodieMetrics {
|
||||
long totalCompactedRecordsUpdated = metadata.getTotalCompactedRecordsUpdated();
|
||||
long totalLogFilesCompacted = metadata.getTotalLogFilesCompacted();
|
||||
long totalLogFilesSize = metadata.getTotalLogFilesSize();
|
||||
Metrics.registerGauge(getMetricsName(actionType, "duration"), durationInMs);
|
||||
Metrics.registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), totalPartitionsWritten);
|
||||
Metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), totalFilesInsert);
|
||||
Metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), totalFilesUpdate);
|
||||
@@ -152,7 +154,6 @@ public class HoodieMetrics {
|
||||
Metrics.registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), totalUpdateRecordsWritten);
|
||||
Metrics.registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), totalInsertRecordsWritten);
|
||||
Metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), totalBytesWritten);
|
||||
Metrics.registerGauge(getMetricsName(actionType, "commitTime"), commitEpochTimeInMs);
|
||||
Metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), totalTimeTakenByScanner);
|
||||
Metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), totalTimeTakenForInsert);
|
||||
Metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), totalTimeTakenForUpsert);
|
||||
@@ -162,6 +163,23 @@ public class HoodieMetrics {
|
||||
}
|
||||
}
|
||||
|
||||
private void updateCommitTimingMetrics(long commitEpochTimeInMs, long durationInMs, HoodieCommitMetadata metadata,
|
||||
String actionType) {
|
||||
if (config.isMetricsOn()) {
|
||||
Pair<Option<Long>, Option<Long>> eventTimePairMinMax = metadata.getMinAndMaxEventTime();
|
||||
if (eventTimePairMinMax.getLeft().isPresent()) {
|
||||
long commitLatencyInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getLeft().get();
|
||||
Metrics.registerGauge(getMetricsName(actionType, "commitLatencyInMs"), commitLatencyInMs);
|
||||
}
|
||||
if (eventTimePairMinMax.getRight().isPresent()) {
|
||||
long commitFreshnessInMs = commitEpochTimeInMs + durationInMs - eventTimePairMinMax.getRight().get();
|
||||
Metrics.registerGauge(getMetricsName(actionType, "commitFreshnessInMs"), commitFreshnessInMs);
|
||||
}
|
||||
Metrics.registerGauge(getMetricsName(actionType, "commitTime"), commitEpochTimeInMs);
|
||||
Metrics.registerGauge(getMetricsName(actionType, "duration"), durationInMs);
|
||||
}
|
||||
}
|
||||
|
||||
public void updateRollbackMetrics(long durationInMs, long numFilesDeleted) {
|
||||
if (config.isMetricsOn()) {
|
||||
LOG.info(
|
||||
|
||||
@@ -19,6 +19,8 @@
|
||||
package org.apache.hudi.metrics;
|
||||
|
||||
import org.apache.hudi.common.model.HoodieCommitMetadata;
|
||||
import org.apache.hudi.common.util.Option;
|
||||
import org.apache.hudi.common.util.collection.Pair;
|
||||
import org.apache.hudi.config.HoodieWriteConfig;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
@@ -123,6 +125,7 @@ public class TestHoodieMetrics {
|
||||
when(metadata.getTotalCompactedRecordsUpdated()).thenReturn(randomValue + 11);
|
||||
when(metadata.getTotalLogFilesCompacted()).thenReturn(randomValue + 12);
|
||||
when(metadata.getTotalLogFilesSize()).thenReturn(randomValue + 13);
|
||||
when(metadata.getMinAndMaxEventTime()).thenReturn(Pair.of(Option.empty(), Option.empty()));
|
||||
metrics.updateCommitMetrics(randomValue + 14, commitTimer.stop(), metadata, action);
|
||||
|
||||
String metricname = metrics.getMetricsName(action, "duration");
|
||||
|
||||
Reference in New Issue
Block a user