1
0

[HUDI-62] Index Lookup Timer added to HoodieWriteClient

This commit is contained in:
Taher Koitawala
2019-09-03 20:35:42 +05:30
committed by vinoth chandar
parent 63cc455d9c
commit c0f42afa35
2 changed files with 34 additions and 3 deletions

View File

@@ -101,6 +101,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
private final transient HoodieIndex<T> index;
private transient Timer.Context writeContext = null;
private transient Timer.Context compactionTimer;
private transient Timer.Context indexTimer = null;
/**
* @param jsc
@@ -152,8 +153,11 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
// Create a Hoodie table which encapsulated the commits and files visible
HoodieTable<T> table = HoodieTable.getHoodieTable(
createMetaClient(true), config, jsc);
indexTimer = metrics.getIndexCtx();
JavaRDD<HoodieRecord<T>> recordsWithLocation = index.tagLocation(hoodieRecords, jsc, table);
metrics.updateIndexMetrics("lookup", metrics.getDurationInMs(indexTimer == null ? 0L :
indexTimer.stop()));
indexTimer = null;
return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown());
}
@@ -167,8 +171,12 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
JavaRDD<HoodieRecord<T>> dedupedRecords = combineOnCondition(
config.shouldCombineBeforeUpsert(), records, config.getUpsertShuffleParallelism());
indexTimer = metrics.getIndexCtx();
// perform index loop up to get existing location of records
JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, jsc, table);
metrics.updateIndexMetrics("lookup", metrics.getDurationInMs(indexTimer == null ? 0L :
indexTimer.stop()));
indexTimer = null;
return upsertRecordsInternal(taggedRecords, commitTime, table, true);
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
@@ -468,8 +476,12 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
// cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future
// RDD actions that are performed after updating the index.
writeStatusRDD = writeStatusRDD.persist(config.getWriteStatusStorageLevel());
indexTimer = metrics.getIndexCtx();
// Update the index back
JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, jsc, table);
metrics.updateIndexMetrics("update", metrics.getDurationInMs(indexTimer == null ? 0L :
indexTimer.stop()));
indexTimer = null;
// Trigger the insert and collect statuses
commitOnAutoCommit(commitTime, statuses, table.getMetaClient().getCommitActionType());
return statuses;
@@ -1410,4 +1422,4 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> extends AbstractHo
}
}
}
}

View File

@@ -39,6 +39,7 @@ public class HoodieMetrics {
public String deltaCommitTimerName = null;
public String finalizeTimerName = null;
public String compactionTimerName = null;
public String indexTimerName = null;
private HoodieWriteConfig config = null;
private String tableName = null;
private Timer rollbackTimer = null;
@@ -47,6 +48,7 @@ public class HoodieMetrics {
private Timer deltaCommitTimer = null;
private Timer finalizeTimer = null;
private Timer compactionTimer = null;
private Timer indexTimer = null;
public HoodieMetrics(HoodieWriteConfig config, String tableName) {
this.config = config;
@@ -59,6 +61,7 @@ public class HoodieMetrics {
this.deltaCommitTimerName = getMetricsName("timer", HoodieTimeline.DELTA_COMMIT_ACTION);
this.finalizeTimerName = getMetricsName("timer", "finalize");
this.compactionTimerName = getMetricsName("timer", HoodieTimeline.COMPACTION_ACTION);
this.indexTimerName = getMetricsName("timer", "index");
}
}
@@ -108,6 +111,13 @@ public class HoodieMetrics {
return deltaCommitTimer == null ? null : deltaCommitTimer.time();
}
public Timer.Context getIndexCtx() {
if (config.isMetricsOn() && indexTimer == null) {
indexTimer = createTimer(indexTimerName);
}
return indexTimer == null ? null : indexTimer.time();
}
public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs,
HoodieCommitMetadata metadata, String actionType) {
if (config.isMetricsOn()) {
@@ -172,6 +182,15 @@ public class HoodieMetrics {
}
}
public void updateIndexMetrics(final String action,final long durationInMs) {
if (config.isMetricsOn()) {
logger.info(String
.format("Sending index metrics (%s.duration, %d)",action, durationInMs));
Metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)),
durationInMs);
}
}
@VisibleForTesting
String getMetricsName(String action, String metric) {
return config == null ? null : String.format("%s.%s.%s", tableName, action, metric);
@@ -183,4 +202,4 @@ public class HoodieMetrics {
public long getDurationInMs(long ctxDuration) {
return ctxDuration / 1000000;
}
}
}