From c0f42afa35768cf191dcc061feac2e7396457058 Mon Sep 17 00:00:00 2001 From: Taher Koitawala Date: Tue, 3 Sep 2019 20:35:42 +0530 Subject: [PATCH] [HUDI-62] Index Lookup Timer added to HoodieWriteClient --- .../org/apache/hudi/HoodieWriteClient.java | 16 ++++++++++++-- .../apache/hudi/metrics/HoodieMetrics.java | 21 ++++++++++++++++++- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java index e6b3152be..4a8482194 100644 --- a/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/HoodieWriteClient.java @@ -101,6 +101,7 @@ public class HoodieWriteClient extends AbstractHo private final transient HoodieIndex index; private transient Timer.Context writeContext = null; private transient Timer.Context compactionTimer; + private transient Timer.Context indexTimer = null; /** * @param jsc @@ -152,8 +153,11 @@ public class HoodieWriteClient extends AbstractHo // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable.getHoodieTable( createMetaClient(true), config, jsc); - + indexTimer = metrics.getIndexCtx(); JavaRDD> recordsWithLocation = index.tagLocation(hoodieRecords, jsc, table); + metrics.updateIndexMetrics("lookup", metrics.getDurationInMs(indexTimer == null ? 0L : + indexTimer.stop())); + indexTimer = null; return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown()); } @@ -167,8 +171,12 @@ public class HoodieWriteClient extends AbstractHo JavaRDD> dedupedRecords = combineOnCondition( config.shouldCombineBeforeUpsert(), records, config.getUpsertShuffleParallelism()); + indexTimer = metrics.getIndexCtx(); // perform index loop up to get existing location of records JavaRDD> taggedRecords = index.tagLocation(dedupedRecords, jsc, table); + metrics.updateIndexMetrics("lookup", metrics.getDurationInMs(indexTimer == null ? 0L : + indexTimer.stop())); + indexTimer = null; return upsertRecordsInternal(taggedRecords, commitTime, table, true); } catch (Throwable e) { if (e instanceof HoodieUpsertException) { @@ -468,8 +476,12 @@ public class HoodieWriteClient extends AbstractHo // cache writeStatusRDD before updating index, so that all actions before this are not triggered again for future // RDD actions that are performed after updating the index. writeStatusRDD = writeStatusRDD.persist(config.getWriteStatusStorageLevel()); + indexTimer = metrics.getIndexCtx(); // Update the index back JavaRDD statuses = index.updateLocation(writeStatusRDD, jsc, table); + metrics.updateIndexMetrics("update", metrics.getDurationInMs(indexTimer == null ? 0L : + indexTimer.stop())); + indexTimer = null; // Trigger the insert and collect statuses commitOnAutoCommit(commitTime, statuses, table.getMetaClient().getCommitActionType()); return statuses; @@ -1410,4 +1422,4 @@ public class HoodieWriteClient extends AbstractHo } } -} +} \ No newline at end of file diff --git a/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java index a5646349c..c462e2b04 100644 --- a/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java +++ b/hudi-client/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java @@ -39,6 +39,7 @@ public class HoodieMetrics { public String deltaCommitTimerName = null; public String finalizeTimerName = null; public String compactionTimerName = null; + public String indexTimerName = null; private HoodieWriteConfig config = null; private String tableName = null; private Timer rollbackTimer = null; @@ -47,6 +48,7 @@ public class HoodieMetrics { private Timer deltaCommitTimer = null; private Timer finalizeTimer = null; private Timer compactionTimer = null; + private Timer indexTimer = null; public HoodieMetrics(HoodieWriteConfig config, String tableName) { this.config = config; @@ -59,6 +61,7 @@ public class HoodieMetrics { this.deltaCommitTimerName = getMetricsName("timer", HoodieTimeline.DELTA_COMMIT_ACTION); this.finalizeTimerName = getMetricsName("timer", "finalize"); this.compactionTimerName = getMetricsName("timer", HoodieTimeline.COMPACTION_ACTION); + this.indexTimerName = getMetricsName("timer", "index"); } } @@ -108,6 +111,13 @@ public class HoodieMetrics { return deltaCommitTimer == null ? null : deltaCommitTimer.time(); } + public Timer.Context getIndexCtx() { + if (config.isMetricsOn() && indexTimer == null) { + indexTimer = createTimer(indexTimerName); + } + return indexTimer == null ? null : indexTimer.time(); + } + public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, HoodieCommitMetadata metadata, String actionType) { if (config.isMetricsOn()) { @@ -172,6 +182,15 @@ public class HoodieMetrics { } } + public void updateIndexMetrics(final String action,final long durationInMs) { + if (config.isMetricsOn()) { + logger.info(String + .format("Sending index metrics (%s.duration, %d)",action, durationInMs)); + Metrics.registerGauge(getMetricsName("index", String.format("%s.duration", action)), + durationInMs); + } + } + @VisibleForTesting String getMetricsName(String action, String metric) { return config == null ? null : String.format("%s.%s.%s", tableName, action, metric); @@ -183,4 +202,4 @@ public class HoodieMetrics { public long getDurationInMs(long ctxDuration) { return ctxDuration / 1000000; } -} +} \ No newline at end of file