From 04655e9e85e8e4aacc8c66209ccd0f4fb0b28671 Mon Sep 17 00:00:00 2001 From: Nishith Agarwal Date: Sun, 25 Mar 2018 11:12:41 -0700 Subject: [PATCH] Adding metrics for MOR and COW --- .../com/uber/hoodie/HoodieWriteClient.java | 31 +++- .../uber/hoodie/io/HoodieAppendHandle.java | 8 + .../uber/hoodie/io/HoodieCreateHandle.java | 4 + .../com/uber/hoodie/io/HoodieIOHandle.java | 3 + .../com/uber/hoodie/io/HoodieMergeHandle.java | 4 + .../compact/HoodieRealtimeTableCompactor.java | 48 ++++- .../strategy/BoundedIOCompactionStrategy.java | 34 +--- .../compact/strategy/CompactionStrategy.java | 53 ++++-- .../strategy/UnBoundedCompactionStrategy.java | 12 +- .../uber/hoodie/metrics/HoodieMetrics.java | 51 +++-- .../hoodie/table/TestMergeOnReadTable.java | 43 +++++ .../src/main/avro/HoodieCommitMetadata.avsc | 2 +- .../main/avro/HoodieCompactionMetadata.avsc | 2 +- .../common/model/HoodieCommitMetadata.java | 88 ++++++++- .../hoodie/common/model/HoodieWriteStat.java | 174 +++++++++++++++--- .../log/HoodieCompactedLogRecordScanner.java | 46 ++++- .../uber/hoodie/common/util/HoodieTimer.java | 70 +++++++ .../hoodie/common/model/HoodieTestUtils.java | 22 +++ .../model/TestHoodieCommitMetadata.java | 42 +++++ 19 files changed, 616 insertions(+), 121 deletions(-) create mode 100644 hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieTimer.java create mode 100644 hoodie-common/src/test/java/com/uber/hoodie/common/model/TestHoodieCommitMetadata.java diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 93dd4c292..16aba5451 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -543,7 +543,7 @@ public class HoodieWriteClient implements Seriali long durationInMs = metrics.getDurationInMs(writeContext.stop()); metrics .updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(commitTime).getTime(), - durationInMs, metadata); + durationInMs, metadata, actionType); writeContext = null; } logger.info("Committed " + commitTime); @@ -926,6 +926,8 @@ public class HoodieWriteClient implements Seriali // Create a Hoodie table which encapsulated the commits and files visible HoodieTable table = HoodieTable.getHoodieTable( new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config); + // TODO : Fix table.getActionType for MOR table type to return different actions based on delta or compaction + writeContext = metrics.getCommitCtx(); JavaRDD statuses = table.compact(jsc, commitTime); // Trigger the insert and collect statuses statuses = statuses.persist(config.getWriteStatusStorageLevel()); @@ -960,9 +962,22 @@ public class HoodieWriteClient implements Seriali HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true); HoodieTable table = HoodieTable.getHoodieTable(metaClient, config); + // TODO : Fix table.getActionType for MOR table type to return different actions based on delta or compaction and + // then use getTableAndInitCtx + Timer.Context writeContext = metrics.getCommitCtx(); JavaRDD compactedStatuses = table.compact(jsc, compactionCommitTime); if (!compactedStatuses.isEmpty()) { - commitForceCompaction(compactedStatuses, metaClient, compactionCommitTime); + HoodieCommitMetadata metadata = commitForceCompaction(compactedStatuses, metaClient, compactionCommitTime); + long durationInMs = metrics.getDurationInMs(writeContext.stop()); + try { + metrics + .updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(), + durationInMs, metadata, HoodieActiveTimeline.COMMIT_ACTION); + } catch (ParseException e) { + throw new HoodieCommitException( + "Commit time is not of valid format.Failed to commit " + config.getBasePath() + + " at time " + compactionCommitTime, e); + } logger.info("Compacted successfully on commit " + compactionCommitTime); } else { logger.info("Compaction did not run for commit " + compactionCommitTime); @@ -979,7 +994,7 @@ public class HoodieWriteClient implements Seriali return compactionCommitTime; } - private void commitForceCompaction(JavaRDD writeStatuses, + private HoodieCommitMetadata commitForceCompaction(JavaRDD writeStatuses, HoodieTableMetaClient metaClient, String compactionCommitTime) { List updateStatusMap = writeStatuses.map(writeStatus -> writeStatus.getStat()) .collect(); @@ -1002,6 +1017,7 @@ public class HoodieWriteClient implements Seriali throw new HoodieCompactionException( "Failed to commit " + metaClient.getBasePath() + " at time " + compactionCommitTime, e); } + return metadata; } /** @@ -1043,9 +1059,14 @@ public class HoodieWriteClient implements Seriali } private HoodieTable getTableAndInitCtx() { - writeContext = metrics.getCommitCtx(); // Create a Hoodie table which encapsulated the commits and files visible - return HoodieTable.getHoodieTable( + HoodieTable table = HoodieTable.getHoodieTable( new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config); + if (table.getCommitActionType() == HoodieTimeline.COMMIT_ACTION) { + writeContext = metrics.getCommitCtx(); + } else { + writeContext = metrics.getDeltaCommitCtx(); + } + return table; } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index 82d5a9d0b..7ead846de 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -24,6 +24,7 @@ import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.log.HoodieLogFormat; import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer; @@ -73,6 +74,7 @@ public class HoodieAppendHandle extends HoodieIOH private HoodieLogFile currentLogFile; private Writer writer; private boolean doInit = true; + private long estimatedNumberOfBytesWritten; public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable, String fileId, Iterator> recordItr) { @@ -180,6 +182,7 @@ public class HoodieAppendHandle extends HoodieIOH logger.info("AvgRecordSize => " + averageRecordSize); averageRecordSize = (averageRecordSize + SizeEstimator.estimate(record)) / 2; doAppend(header); + estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords; numberOfRecords = 0; } Optional indexedRecord = getIndexedRecord(record); @@ -191,6 +194,7 @@ public class HoodieAppendHandle extends HoodieIOH numberOfRecords++; } doAppend(header); + estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords; } private void doAppend(Map header) { @@ -217,7 +221,11 @@ public class HoodieAppendHandle extends HoodieIOH } writeStatus.getStat().setNumWrites(recordsWritten); writeStatus.getStat().setNumDeletes(recordsDeleted); + writeStatus.getStat().setTotalWriteBytes(estimatedNumberOfBytesWritten); writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size()); + RuntimeStats runtimeStats = new RuntimeStats(); + runtimeStats.setTotalUpsertTime(timer.endTimer()); + writeStatus.getStat().setRuntimeStats(runtimeStats); } catch (IOException e) { throw new HoodieUpsertException("Failed to close UpdateHandle", e); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java index f16b2d211..c2ee5f8c0 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java @@ -22,6 +22,7 @@ import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieWriteStat; +import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.ReflectionUtils; import com.uber.hoodie.config.HoodieWriteConfig; @@ -136,6 +137,9 @@ public class HoodieCreateHandle extends HoodieIOH stat.setPaths(new Path(config.getBasePath()), path, tempPath); stat.setTotalWriteBytes(FSUtils.getFileSize(fs, getStorageWriterPath())); stat.setTotalWriteErrors(status.getFailedRecords().size()); + RuntimeStats runtimeStats = new RuntimeStats(); + runtimeStats.setTotalCreateTime(timer.endTimer()); + stat.setRuntimeStats(runtimeStats); status.setStat(stat); return status; diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java index a0032de2b..e4a43af19 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java @@ -21,6 +21,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; +import com.uber.hoodie.common.util.HoodieTimer; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieIOException; import com.uber.hoodie.table.HoodieTable; @@ -41,6 +42,7 @@ public abstract class HoodieIOHandle { protected final HoodieTable hoodieTable; protected final Schema schema; protected HoodieTimeline hoodieTimeline; + protected HoodieTimer timer; public HoodieIOHandle(HoodieWriteConfig config, String commitTime, HoodieTable hoodieTable) { this.commitTime = commitTime; @@ -49,6 +51,7 @@ public abstract class HoodieIOHandle { this.hoodieTable = hoodieTable; this.hoodieTimeline = hoodieTable.getCompletedCommitTimeline(); this.schema = createHoodieWriteSchema(config); + this.timer = new HoodieTimer().startTimer(); } /** diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index 1d2133d51..d5c6a90de 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -22,6 +22,7 @@ import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieWriteStat; +import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats; import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.ReflectionUtils; @@ -261,6 +262,9 @@ public class HoodieMergeHandle extends HoodieIOHa writeStatus.getStat().setNumDeletes(recordsDeleted); writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten); writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size()); + RuntimeStats runtimeStats = new RuntimeStats(); + runtimeStats.setTotalUpsertTime(timer.endTimer()); + writeStatus.getStat().setRuntimeStats(runtimeStats); } catch (IOException e) { throw new HoodieUpsertException("Failed to close UpdateHandle", e); } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java index 9d68fe1ba..8983605c5 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/HoodieRealtimeTableCompactor.java @@ -24,6 +24,7 @@ import com.google.common.collect.Sets; import com.uber.hoodie.WriteStatus; import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.common.model.HoodieTableType; +import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; @@ -31,6 +32,7 @@ import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner; import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.common.util.HoodieAvroUtils; import com.uber.hoodie.config.HoodieWriteConfig; +import com.uber.hoodie.io.compact.strategy.CompactionStrategy; import com.uber.hoodie.table.HoodieCopyOnWriteTable; import com.uber.hoodie.table.HoodieTable; import java.io.IOException; @@ -46,6 +48,8 @@ import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.util.AccumulatorV2; +import org.apache.spark.util.LongAccumulator; /** * HoodieRealtimeTableCompactor compacts a hoodie table with merge on read storage. Computes all @@ -57,11 +61,20 @@ import org.apache.spark.api.java.function.FlatMapFunction; public class HoodieRealtimeTableCompactor implements HoodieCompactor { private static Logger log = LogManager.getLogger(HoodieRealtimeTableCompactor.class); + // Accumulator to keep track of total log files for a dataset + private AccumulatorV2 totalLogFiles; + // Accumulator to keep track of total log file slices for a dataset + private AccumulatorV2 totalFileSlices; @Override public JavaRDD compact(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable hoodieTable, String compactionCommitTime) throws IOException { + totalLogFiles = new LongAccumulator(); + totalFileSlices = new LongAccumulator(); + jsc.sc().register(totalLogFiles); + jsc.sc().register(totalFileSlices); + List operations = getCompactionWorkload(jsc, hoodieTable, config, compactionCommitTime); if (operations == null) { @@ -117,10 +130,18 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { Iterable> resultIterable = () -> result; return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream) .map(s -> { - s.getStat().setTotalRecordsToBeUpdate(scanner.getTotalRecordsToUpdate()); - s.getStat().setTotalLogFiles(scanner.getTotalLogFiles()); + s.getStat().setTotalUpdatedRecordsCompacted(scanner.getTotalRecordsToUpdate()); + s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles()); s.getStat().setTotalLogRecords(scanner.getTotalLogRecords()); s.getStat().setPartitionPath(operation.getPartitionPath()); + s.getStat().setTotalLogSizeCompacted((long) operation.getMetrics().get( + CompactionStrategy.TOTAL_LOG_FILE_SIZE)); + s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks()); + s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks()); + s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks()); + RuntimeStats runtimeStats = new RuntimeStats(); + runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks()); + s.getStat().setRuntimeStats(runtimeStats); return s; }).collect(toList()); } @@ -145,14 +166,23 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor { TableFileSystemView.RealtimeView fileSystemView = hoodieTable.getRTFileSystemView(); log.info("Compaction looking for files to compact in " + partitionPaths + " partitions"); - List operations = jsc.parallelize(partitionPaths, partitionPaths.size()) - .flatMap((FlatMapFunction) partitionPath -> fileSystemView - .getLatestFileSlices(partitionPath).map( - s -> new CompactionOperation(s.getDataFile().get(), partitionPath, - s.getLogFiles().sorted(HoodieLogFile.getLogVersionComparator().reversed()) - .collect(Collectors.toList()), config)) - .filter(c -> !c.getDeltaFilePaths().isEmpty()).collect(toList()).iterator()).collect(); + List operations = + jsc.parallelize(partitionPaths, partitionPaths.size()) + .flatMap((FlatMapFunction) partitionPath -> fileSystemView + .getLatestFileSlices(partitionPath).map( + s -> { + List logFiles = s.getLogFiles().sorted(HoodieLogFile + .getLogVersionComparator().reversed()).collect(Collectors.toList()); + totalLogFiles.add((long) logFiles.size()); + totalFileSlices.add(1L); + return new CompactionOperation(s.getDataFile().get(), partitionPath, logFiles, config); + }) + .filter(c -> !c.getDeltaFilePaths().isEmpty()) + .collect(toList()).iterator()).collect(); log.info("Total of " + operations.size() + " compactions are retrieved"); + log.info("Total number of latest files slices " + totalFileSlices.value()); + log.info("Total number of log files " + totalLogFiles.value()); + log.info("Total number of file slices " + totalFileSlices.value()); // Filter the compactions with the passed in filter. This lets us choose most effective // compactions only diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/BoundedIOCompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/BoundedIOCompactionStrategy.java index 9393a0479..4f4cdf128 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/BoundedIOCompactionStrategy.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/BoundedIOCompactionStrategy.java @@ -17,15 +17,9 @@ package com.uber.hoodie.io.compact.strategy; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.uber.hoodie.common.model.HoodieDataFile; -import com.uber.hoodie.common.model.HoodieLogFile; -import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.io.compact.CompactionOperation; import java.util.List; -import java.util.Map; -import java.util.Optional; /** * CompactionStrategy which looks at total IO to be done for the compaction (read + write) and @@ -33,33 +27,7 @@ import java.util.Optional; * * @see CompactionStrategy */ -public class BoundedIOCompactionStrategy implements CompactionStrategy { - - public static final String TOTAL_IO_READ_MB = "TOTAL_IO_READ_MB"; - public static final String TOTAL_IO_WRITE_MB = "TOTAL_IO_WRITE_MB"; - public static final String TOTAL_IO_MB = "TOTAL_IO_MB"; - - @Override - public Map captureMetrics(HoodieDataFile dataFile, String partitionPath, - List logFiles) { - Map metrics = Maps.newHashMap(); - // Total size of all the log files - Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize) - .filter(Optional::isPresent).map(Optional::get).reduce((size1, size2) -> size1 + size2) - .orElse(0L); - // Total read will be the base file + all the log files - Long totalIORead = FSUtils.getSizeInMB(dataFile.getFileSize() + totalLogFileSize); - // Total write will be similar to the size of the base file - Long totalIOWrite = FSUtils.getSizeInMB(dataFile.getFileSize()); - // Total IO will the the IO for read + write - Long totalIO = totalIORead + totalIOWrite; - // Save these metrics and we will use during the filter - metrics.put(TOTAL_IO_READ_MB, totalIORead); - metrics.put(TOTAL_IO_WRITE_MB, totalIOWrite); - metrics.put(TOTAL_IO_MB, totalIO); - return metrics; - - } +public class BoundedIOCompactionStrategy extends CompactionStrategy { @Override public List orderAndFilter(HoodieWriteConfig writeConfig, diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java index c5a666cbd..75d62e803 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/CompactionStrategy.java @@ -16,47 +16,76 @@ package com.uber.hoodie.io.compact.strategy; +import com.google.common.collect.Maps; import com.uber.hoodie.common.model.HoodieDataFile; import com.uber.hoodie.common.model.HoodieLogFile; +import com.uber.hoodie.common.util.FSUtils; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.io.compact.CompactionOperation; import java.io.Serializable; import java.util.List; import java.util.Map; +import java.util.Optional; /** - * Strategy for compaction. Pluggable implementation of define how compaction should be done. The - * implementations of this interface can capture the relevant metrics to order and filter the final - * list of compaction operation to run in a single compaction. - *

+ * Strategy for compaction. Pluggable implementation to define how compaction should be done. The + * over-ridden implementations of this abstract class can capture the relevant metrics to order + * and filter the final list of compaction operation to run in a single compaction. * Implementation of CompactionStrategy cannot hold any state. Difference instantiations can be * passed in every time * * @see com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor * @see CompactionOperation */ -public interface CompactionStrategy extends Serializable { +public abstract class CompactionStrategy implements Serializable { + + public static final String TOTAL_IO_READ_MB = "TOTAL_IO_READ_MB"; + public static final String TOTAL_IO_WRITE_MB = "TOTAL_IO_WRITE_MB"; + public static final String TOTAL_IO_MB = "TOTAL_IO_MB"; + public static final String TOTAL_LOG_FILE_SIZE = "TOTAL_LOG_FILES_SIZE"; + public static final String TOTAL_LOG_FILES = "TOTAL_LOG_FILES"; /** * Callback hook when a CompactionOperation is created. Individual strategies can capture the * metrics they need to decide on the priority. * - * @param dataFile - Base file to compact + * @param dataFile - Base file to compact * @param partitionPath - Partition path - * @param logFiles - List of log files to compact with the base file + * @param logFiles - List of log files to compact with the base file * @return Map[String, Object] - metrics captured */ - Map captureMetrics(HoodieDataFile dataFile, String partitionPath, - List logFiles); + public Map captureMetrics(HoodieDataFile dataFile, String partitionPath, + List logFiles) { + Map metrics = Maps.newHashMap(); + // Total size of all the log files + Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize).filter(Optional::isPresent) + .map(Optional::get).reduce((size1, size2) -> size1 + size2).orElse(0L); + // Total read will be the base file + all the log files + Long totalIORead = FSUtils.getSizeInMB(dataFile.getFileSize() + totalLogFileSize); + // Total write will be similar to the size of the base file + Long totalIOWrite = FSUtils.getSizeInMB(dataFile.getFileSize()); + // Total IO will the the IO for read + write + Long totalIO = totalIORead + totalIOWrite; + // Save these metrics and we will use during the filter + metrics.put(TOTAL_IO_READ_MB, totalIORead); + metrics.put(TOTAL_IO_WRITE_MB, totalIOWrite); + metrics.put(TOTAL_IO_MB, totalIO); + metrics.put(TOTAL_LOG_FILE_SIZE, totalLogFileSize); + metrics.put(TOTAL_LOG_FILES, logFiles.size()); + return metrics; + + } /** * Order and Filter the list of compactions. Use the metrics captured with the captureMetrics to * order and filter out compactions * * @param writeConfig - HoodieWriteConfig - config for this compaction is passed in - * @param operations - list of compactions collected + * @param operations - list of compactions collected * @return list of compactions to perform in this run */ - List orderAndFilter(HoodieWriteConfig writeConfig, - List operations); + public List orderAndFilter(HoodieWriteConfig writeConfig, + List operations) { + return operations; + } } diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/UnBoundedCompactionStrategy.java b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/UnBoundedCompactionStrategy.java index 08f46019f..3f8297f28 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/UnBoundedCompactionStrategy.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/compact/strategy/UnBoundedCompactionStrategy.java @@ -16,13 +16,9 @@ package com.uber.hoodie.io.compact.strategy; -import com.google.common.collect.Maps; -import com.uber.hoodie.common.model.HoodieDataFile; -import com.uber.hoodie.common.model.HoodieLogFile; import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.io.compact.CompactionOperation; import java.util.List; -import java.util.Map; /** * UnBoundedCompactionStrategy will not change ordering or filter any compaction. It is a @@ -31,13 +27,7 @@ import java.util.Map; * * @see CompactionStrategy */ -public class UnBoundedCompactionStrategy implements CompactionStrategy { - - @Override - public Map captureMetrics(HoodieDataFile dataFile, String partitionPath, - List logFiles) { - return Maps.newHashMap(); - } +public class UnBoundedCompactionStrategy extends CompactionStrategy { @Override public List orderAndFilter(HoodieWriteConfig config, diff --git a/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java b/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java index 78ef4960c..901bd236e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java @@ -21,6 +21,7 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.uber.hoodie.common.model.HoodieCommitMetadata; +import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.config.HoodieWriteConfig; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -35,12 +36,14 @@ public class HoodieMetrics { public String rollbackTimerName = null; public String cleanTimerName = null; public String commitTimerName = null; + public String deltaCommitTimerName = null; public String finalizeTimerName = null; private HoodieWriteConfig config = null; private String tableName = null; private Timer rollbackTimer = null; private Timer cleanTimer = null; private Timer commitTimer = null; + private Timer deltaCommitTimer = null; private Timer finalizeTimer = null; public HoodieMetrics(HoodieWriteConfig config, String tableName) { @@ -48,9 +51,10 @@ public class HoodieMetrics { this.tableName = tableName; if (config.isMetricsOn()) { Metrics.init(config); - this.rollbackTimerName = getMetricsName("timer", "rollback"); - this.cleanTimerName = getMetricsName("timer", "clean"); - this.commitTimerName = getMetricsName("timer", "commit"); + this.rollbackTimerName = getMetricsName("timer", HoodieTimeline.ROLLBACK_ACTION); + this.cleanTimerName = getMetricsName("timer", HoodieTimeline.CLEAN_ACTION); + this.commitTimerName = getMetricsName("timer", HoodieTimeline.COMMIT_ACTION); + this.deltaCommitTimerName = getMetricsName("timer", HoodieTimeline.DELTA_COMMIT_ACTION); this.finalizeTimerName = getMetricsName("timer", "finalize"); } } @@ -87,8 +91,15 @@ public class HoodieMetrics { return finalizeTimer == null ? null : finalizeTimer.time(); } + public Timer.Context getDeltaCommitCtx() { + if (config.isMetricsOn() && deltaCommitTimer == null) { + deltaCommitTimer = createTimer(deltaCommitTimerName); + } + return deltaCommitTimer == null ? null : deltaCommitTimer.time(); + } + public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, - HoodieCommitMetadata metadata) { + HoodieCommitMetadata metadata, String actionType) { if (config.isMetricsOn()) { long totalPartitionsWritten = metadata.fetchTotalPartitionsWritten(); long totalFilesInsert = metadata.fetchTotalFilesInsert(); @@ -97,17 +108,27 @@ public class HoodieMetrics { long totalUpdateRecordsWritten = metadata.fetchTotalUpdateRecordsWritten(); long totalInsertRecordsWritten = metadata.fetchTotalInsertRecordsWritten(); long totalBytesWritten = metadata.fetchTotalBytesWritten(); - registerGauge(getMetricsName("commit", "duration"), durationInMs); - registerGauge(getMetricsName("commit", "totalPartitionsWritten"), totalPartitionsWritten); - registerGauge(getMetricsName("commit", "totalFilesInsert"), totalFilesInsert); - registerGauge(getMetricsName("commit", "totalFilesUpdate"), totalFilesUpdate); - registerGauge(getMetricsName("commit", "totalRecordsWritten"), totalRecordsWritten); - registerGauge(getMetricsName("commit", "totalUpdateRecordsWritten"), - totalUpdateRecordsWritten); - registerGauge(getMetricsName("commit", "totalInsertRecordsWritten"), - totalInsertRecordsWritten); - registerGauge(getMetricsName("commit", "totalBytesWritten"), totalBytesWritten); - registerGauge(getMetricsName("commit", "commitTime"), commitEpochTimeInMs); + long totalTimeTakenByScanner = metadata.getTotalScanTime(); + long totalTimeTakenForInsert = metadata.getTotalCreateTime(); + long totalTimeTakenForUpsert = metadata.getTotalUpsertTime(); + long totalCompactedRecordsUpdated = metadata.getTotalCompactedRecordsUpdated(); + long totalLogFilesCompacted = metadata.getTotalLogFilesCompacted(); + long totalLogFilesSize = metadata.getTotalLogFilesSize(); + registerGauge(getMetricsName(actionType, "duration"), durationInMs); + registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), totalPartitionsWritten); + registerGauge(getMetricsName(actionType, "totalFilesInsert"), totalFilesInsert); + registerGauge(getMetricsName(actionType, "totalFilesUpdate"), totalFilesUpdate); + registerGauge(getMetricsName(actionType, "totalRecordsWritten"), totalRecordsWritten); + registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), totalUpdateRecordsWritten); + registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), totalInsertRecordsWritten); + registerGauge(getMetricsName(actionType, "totalBytesWritten"), totalBytesWritten); + registerGauge(getMetricsName(actionType, "commitTime"), commitEpochTimeInMs); + registerGauge(getMetricsName(actionType, "totalScanTime"), totalTimeTakenByScanner); + registerGauge(getMetricsName(actionType, "totalCreateTime"), totalTimeTakenForInsert); + registerGauge(getMetricsName(actionType, "totalUpsertTime"), totalTimeTakenForUpsert); + registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), totalCompactedRecordsUpdated); + registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), totalLogFilesCompacted); + registerGauge(getMetricsName(actionType, "totalLogFilesSize"), totalLogFilesSize); } } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java index 67f8d6d6e..a57069d1d 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java @@ -67,6 +67,7 @@ import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; @@ -660,6 +661,48 @@ public class TestMergeOnReadTable { } } + @Test + public void testMetadataValuesAfterInsertUpsertAndCompaction() throws Exception { + // insert 100 records + HoodieWriteConfig config = getConfig(false); + HoodieWriteClient writeClient = new HoodieWriteClient(jsc, config); + HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); + String newCommitTime = "100"; + writeClient.startCommitWithTime(newCommitTime); + + List records = dataGen.generateInserts(newCommitTime, 100); + JavaRDD recordsRDD = jsc.parallelize(records, 1); + JavaRDD statuses = writeClient.insert(recordsRDD, newCommitTime); + writeClient.commit(newCommitTime, statuses); + + // total time taken for creating files should be greater than 0 + long totalCreateTime = statuses.map(writeStatus -> writeStatus.getStat().getRuntimeStats().getTotalCreateTime()) + .reduce((a,b) -> a + b).intValue(); + Assert.assertTrue(totalCreateTime > 0); + + // Update all the 100 records + newCommitTime = "101"; + writeClient.startCommitWithTime(newCommitTime); + + List updatedRecords = dataGen.generateUpdates(newCommitTime, records); + JavaRDD updatedRecordsRDD = jsc.parallelize(updatedRecords, 1); + statuses = writeClient.upsert(updatedRecordsRDD, newCommitTime); + writeClient.commit(newCommitTime, statuses); + // total time taken for upsert all records should be greater than 0 + long totalUpsertTime = statuses.map(writeStatus -> writeStatus.getStat().getRuntimeStats().getTotalUpsertTime()) + .reduce((a,b) -> a + b).intValue(); + Assert.assertTrue(totalUpsertTime > 0); + + // Do a compaction + String commitTime = writeClient.startCompaction(); + statuses = writeClient.compact(commitTime); + writeClient.commitCompaction(commitTime, statuses); + // total time taken for scanning log files should be greater than 0 + long timeTakenForScanner = statuses.map(writeStatus -> writeStatus.getStat().getRuntimeStats().getTotalScanTime()) + .reduce((a,b) -> a + b).longValue(); + Assert.assertTrue(timeTakenForScanner > 0); + } + private HoodieWriteConfig getConfig(Boolean autoCommit) { return getConfigBuilder(autoCommit).build(); } diff --git a/hoodie-common/src/main/avro/HoodieCommitMetadata.avsc b/hoodie-common/src/main/avro/HoodieCommitMetadata.avsc index ae3df4ffc..832b5fc9c 100644 --- a/hoodie-common/src/main/avro/HoodieCommitMetadata.avsc +++ b/hoodie-common/src/main/avro/HoodieCommitMetadata.avsc @@ -58,7 +58,7 @@ "type":["null","long"] }, { - "name":"totalRecordsToBeUpdate", + "name":"totalUpdatedRecordsCompacted", "type":["null","long"] } ] diff --git a/hoodie-common/src/main/avro/HoodieCompactionMetadata.avsc b/hoodie-common/src/main/avro/HoodieCompactionMetadata.avsc index 92e774d78..871b1b513 100644 --- a/hoodie-common/src/main/avro/HoodieCompactionMetadata.avsc +++ b/hoodie-common/src/main/avro/HoodieCompactionMetadata.avsc @@ -26,7 +26,7 @@ "type":["null","long"] }, { - "name":"totalRecordsToBeUpdate", + "name":"totalUpdatedRecordsCompacted", "type":["null","long"] }, { diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java index 3f1b69080..fe97a4e34 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieCommitMetadata.java @@ -208,6 +208,92 @@ public class HoodieCommitMetadata implements Serializable { return totalWriteErrors; } + public long getTotalRecordsDeleted() { + long totalDeletes = 0; + for (List stats : partitionToWriteStats.values()) { + for (HoodieWriteStat stat : stats) { + totalDeletes += stat.getNumDeletes(); + } + } + return totalDeletes; + } + + public Long getTotalLogRecordsCompacted() { + Long totalLogRecords = 0L; + for (Map.Entry> entry : partitionToWriteStats.entrySet()) { + for (HoodieWriteStat writeStat : entry.getValue()) { + totalLogRecords += writeStat.getTotalLogRecords(); + } + } + return totalLogRecords; + } + + public Long getTotalLogFilesCompacted() { + Long totalLogFiles = 0L; + for (Map.Entry> entry : partitionToWriteStats.entrySet()) { + for (HoodieWriteStat writeStat : entry.getValue()) { + totalLogFiles += writeStat.getTotalLogFilesCompacted(); + } + } + return totalLogFiles; + } + + public Long getTotalCompactedRecordsUpdated() { + Long totalUpdateRecords = 0L; + for (Map.Entry> entry : partitionToWriteStats.entrySet()) { + for (HoodieWriteStat writeStat : entry.getValue()) { + totalUpdateRecords += writeStat.getTotalUpdatedRecordsCompacted(); + } + } + return totalUpdateRecords; + } + + public Long getTotalLogFilesSize() { + Long totalLogFilesSize = 0L; + for (Map.Entry> entry : partitionToWriteStats.entrySet()) { + for (HoodieWriteStat writeStat : entry.getValue()) { + totalLogFilesSize += writeStat.getTotalLogSizeCompacted(); + } + } + return totalLogFilesSize; + } + + public Long getTotalScanTime() { + Long totalScanTime = 0L; + for (Map.Entry> entry : partitionToWriteStats.entrySet()) { + for (HoodieWriteStat writeStat : entry.getValue()) { + if (writeStat.getRuntimeStats() != null) { + totalScanTime += writeStat.getRuntimeStats().getTotalScanTime(); + } + } + } + return totalScanTime; + } + + public Long getTotalCreateTime() { + Long totalCreateTime = 0L; + for (Map.Entry> entry : partitionToWriteStats.entrySet()) { + for (HoodieWriteStat writeStat : entry.getValue()) { + if (writeStat.getRuntimeStats() != null) { + totalCreateTime += writeStat.getRuntimeStats().getTotalCreateTime(); + } + } + } + return totalCreateTime; + } + + public Long getTotalUpsertTime() { + Long totalUpsertTime = 0L; + for (Map.Entry> entry : partitionToWriteStats.entrySet()) { + for (HoodieWriteStat writeStat : entry.getValue()) { + if (writeStat.getRuntimeStats() != null) { + totalUpsertTime += writeStat.getRuntimeStats().getTotalUpsertTime(); + } + } + } + return totalUpsertTime; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -236,7 +322,7 @@ public class HoodieCommitMetadata implements Serializable { public static HoodieCommitMetadata fromBytes(byte[] bytes) throws IOException { return fromJsonString(new String(bytes, Charset.forName("utf-8"))); } - + private static ObjectMapper getObjectMapper() { ObjectMapper mapper = new ObjectMapper(); mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java index 4706e25a3..e03e84e3a 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieWriteStat.java @@ -16,6 +16,7 @@ package com.uber.hoodie.common.model; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import java.io.Serializable; import javax.annotation.Nullable; @@ -90,19 +91,46 @@ public class HoodieWriteStat implements Serializable { * Total number of log records that were compacted by a compaction operation */ @Nullable - private Long totalLogRecords; + private long totalLogRecords; /** - * Total number of log files that were compacted by a compaction operation + * Total number of log files compacted for a file slice with this base fileid */ @Nullable - private Long totalLogFiles; + private long totalLogFilesCompacted; + + /** + * Total size of all log files for a file slice with this base fileid + */ + @Nullable + private long totalLogSizeCompacted; /** * Total number of records updated by a compaction operation */ @Nullable - private Long totalRecordsToBeUpdate; + private long totalUpdatedRecordsCompacted; + + /** + * Total number of log blocks seen in a compaction operation + */ + @Nullable + private long totalLogBlocks; + + /** + * Total number of corrupt blocks seen in a compaction operation + */ + @Nullable + private long totalCorruptLogBlock; + + /** + * Total number of rollback blocks seen in a compaction operation + */ + private long totalRollbackBlocks; + + @Nullable + @JsonIgnore + private RuntimeStats runtimeStats; public HoodieWriteStat() { // called by jackson json lib @@ -180,28 +208,28 @@ public class HoodieWriteStat implements Serializable { this.partitionPath = partitionPath; } - public Long getTotalLogRecords() { + public long getTotalLogRecords() { return totalLogRecords; } - public void setTotalLogRecords(Long totalLogRecords) { + public void setTotalLogRecords(long totalLogRecords) { this.totalLogRecords = totalLogRecords; } - public Long getTotalLogFiles() { - return totalLogFiles; + public long getTotalLogFilesCompacted() { + return totalLogFilesCompacted; } - public void setTotalLogFiles(Long totalLogFiles) { - this.totalLogFiles = totalLogFiles; + public void setTotalLogFilesCompacted(long totalLogFilesCompacted) { + this.totalLogFilesCompacted = totalLogFilesCompacted; } - public Long getTotalRecordsToBeUpdate() { - return totalRecordsToBeUpdate; + public long getTotalUpdatedRecordsCompacted() { + return totalUpdatedRecordsCompacted; } - public void setTotalRecordsToBeUpdate(Long totalRecordsToBeUpdate) { - this.totalRecordsToBeUpdate = totalRecordsToBeUpdate; + public void setTotalUpdatedRecordsCompacted(long totalUpdatedRecordsCompacted) { + this.totalUpdatedRecordsCompacted = totalUpdatedRecordsCompacted; } public void setTempPath(String tempPath) { @@ -212,6 +240,47 @@ public class HoodieWriteStat implements Serializable { return this.tempPath; } + public long getTotalLogSizeCompacted() { + return totalLogSizeCompacted; + } + + public void setTotalLogSizeCompacted(long totalLogSizeCompacted) { + this.totalLogSizeCompacted = totalLogSizeCompacted; + } + + public long getTotalLogBlocks() { + return totalLogBlocks; + } + + public void setTotalLogBlocks(long totalLogBlocks) { + this.totalLogBlocks = totalLogBlocks; + } + + public long getTotalCorruptLogBlock() { + return totalCorruptLogBlock; + } + + public void setTotalCorruptLogBlock(long totalCorruptLogBlock) { + this.totalCorruptLogBlock = totalCorruptLogBlock; + } + + public long getTotalRollbackBlocks() { + return totalRollbackBlocks; + } + + public void setTotalRollbackBlocks(Long totalRollbackBlocks) { + this.totalRollbackBlocks = totalRollbackBlocks; + } + + @Nullable + public RuntimeStats getRuntimeStats() { + return runtimeStats; + } + + public void setRuntimeStats(@Nullable RuntimeStats runtimeStats) { + this.runtimeStats = runtimeStats; + } + /** * Set path and tempPath relative to the given basePath. */ @@ -224,17 +293,25 @@ public class HoodieWriteStat implements Serializable { @Override public String toString() { - return new StringBuilder() - .append("HoodieWriteStat {") - .append("path=" + path) - .append(", tempPath=" + tempPath) - .append(", prevCommit='" + prevCommit + '\'') - .append(", numWrites=" + numWrites) - .append(", numDeletes=" + numDeletes) - .append(", numUpdateWrites=" + numUpdateWrites) - .append(", numWriteBytes=" + totalWriteBytes) - .append('}') - .toString(); + return "HoodieWriteStat{" + + "fileId='" + fileId + '\'' + + ", path='" + path + '\'' + + ", prevCommit='" + prevCommit + '\'' + + ", numWrites=" + numWrites + + ", numDeletes=" + numDeletes + + ", numUpdateWrites=" + numUpdateWrites + + ", totalWriteBytes=" + totalWriteBytes + + ", totalWriteErrors=" + totalWriteErrors + + ", tempPath='" + tempPath + '\'' + + ", partitionPath='" + partitionPath + + '\'' + ", totalLogRecords=" + totalLogRecords + + ", totalLogFilesCompacted=" + totalLogFilesCompacted + + ", totalLogSizeCompacted=" + totalLogSizeCompacted + + ", totalUpdatedRecordsCompacted=" + totalUpdatedRecordsCompacted + + ", totalLogBlocks=" + totalLogBlocks + + ", totalCorruptLogBlock=" + totalCorruptLogBlock + + ", totalRollbackBlocks=" + totalRollbackBlocks + + '}'; } @Override @@ -260,4 +337,51 @@ public class HoodieWriteStat implements Serializable { result = 31 * result + prevCommit.hashCode(); return result; } + + public static class RuntimeStats implements Serializable { + /** + * Total time taken to read and merge logblocks in a log file + */ + @Nullable + private long totalScanTime; + + /** + * Total time taken by a Hoodie Merge for an existing file + */ + @Nullable + private long totalUpsertTime; + + /** + * Total time taken by a Hoodie Insert to a file + */ + @Nullable + private long totalCreateTime; + + @Nullable + public long getTotalScanTime() { + return totalScanTime; + } + + public void setTotalScanTime(@Nullable long totalScanTime) { + this.totalScanTime = totalScanTime; + } + + @Nullable + public long getTotalUpsertTime() { + return totalUpsertTime; + } + + public void setTotalUpsertTime(@Nullable long totalUpsertTime) { + this.totalUpsertTime = totalUpsertTime; + } + + @Nullable + public long getTotalCreateTime() { + return totalCreateTime; + } + + public void setTotalCreateTime(@Nullable long totalCreateTime) { + this.totalCreateTime = totalCreateTime; + } + } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java index d23792fc7..6e9699442 100644 --- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/HoodieCompactedLogRecordScanner.java @@ -29,6 +29,7 @@ import com.uber.hoodie.common.table.log.block.HoodieAvroDataBlock; import com.uber.hoodie.common.table.log.block.HoodieCommandBlock; import com.uber.hoodie.common.table.log.block.HoodieDeleteBlock; import com.uber.hoodie.common.table.log.block.HoodieLogBlock; +import com.uber.hoodie.common.util.HoodieTimer; import com.uber.hoodie.common.util.SpillableMapUtils; import com.uber.hoodie.common.util.collection.ExternalSpillableMap; import com.uber.hoodie.common.util.collection.converter.HoodieRecordConverter; @@ -73,8 +74,14 @@ public class HoodieCompactedLogRecordScanner implements private final Schema readerSchema; // Total log files read - for metrics private AtomicLong totalLogFiles = new AtomicLong(0); + // Total log blocks read - for metrics + private AtomicLong totalLogBlocks = new AtomicLong(0); // Total log records read - for metrics private AtomicLong totalLogRecords = new AtomicLong(0); + // Total number of rollbacks written across all log files + private AtomicLong totalRollbacks = new AtomicLong(0); + // Total number of corrupt blocks written across all log files + private AtomicLong totalCorruptBlocks = new AtomicLong(0); // Total final list of compacted/merged records private long totalRecordsToUpdate; // Latest valid instant time @@ -84,6 +91,10 @@ public class HoodieCompactedLogRecordScanner implements private String payloadClassFQN; // Store the last instant log blocks (needed to implement rollback) private Deque currentInstantLogBlocks = new ArrayDeque<>(); + // Stores the total time taken to perform reading and merging of log blocks + private long totalTimeTakenToReadAndMergeBlocks = 0L; + // A timer for calculating elapsed time in millis + public HoodieTimer timer = new HoodieTimer(); public HoodieCompactedLogRecordScanner(FileSystem fs, String basePath, List logFilePaths, Schema readerSchema, String latestInstantTime, Long maxMemorySizeInBytes, @@ -93,6 +104,8 @@ public class HoodieCompactedLogRecordScanner implements this.hoodieTableMetaClient = new HoodieTableMetaClient(fs.getConf(), basePath); // load class from the payload fully qualified class name this.payloadClassFQN = this.hoodieTableMetaClient.getTableConfig().getPayloadClass(); + this.totalLogFiles.addAndGet(logFilePaths.size()); + timer.startTimer(); try { // Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize @@ -103,10 +116,10 @@ public class HoodieCompactedLogRecordScanner implements new HoodieLogFormatReader(fs, logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))) .collect(Collectors.toList()), readerSchema, readBlocksLazily, reverseReader, bufferSize); + HoodieLogFile logFile; while (logFormatReaderWrapper.hasNext()) { - HoodieLogFile logFile = logFormatReaderWrapper.getLogFile(); + logFile = logFormatReaderWrapper.getLogFile(); log.info("Scanning log file " + logFile); - totalLogFiles.incrementAndGet(); // Use the HoodieLogFileReader to iterate through the blocks in the log file HoodieLogBlock r = logFormatReaderWrapper.next(); if (r.getBlockType() != CORRUPT_BLOCK @@ -164,6 +177,7 @@ public class HoodieCompactedLogRecordScanner implements // and ensures the same rollback block (R1) is used to rollback both B1 & B2 with // same instant_time int numBlocksRolledBack = 0; + totalRollbacks.incrementAndGet(); while (!currentInstantLogBlocks.isEmpty()) { HoodieLogBlock lastBlock = currentInstantLogBlocks.peek(); // handle corrupt blocks separately since they may not have metadata @@ -200,6 +214,7 @@ public class HoodieCompactedLogRecordScanner implements break; case CORRUPT_BLOCK: log.info("Found a corrupt block in " + logFile.getPath()); + totalCorruptBlocks.incrementAndGet(); // If there is a corrupt block - we will assume that this was the next data block currentInstantLogBlocks.push(r); break; @@ -216,14 +231,13 @@ public class HoodieCompactedLogRecordScanner implements throw new HoodieIOException("IOException when reading log file "); } this.totalRecordsToUpdate = records.size(); + this.totalTimeTakenToReadAndMergeBlocks = timer.endTimer(); log.info("MaxMemoryInBytes allowed for compaction => " + maxMemorySizeInBytes); - log.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records - .getInMemoryMapNumEntries()); - log.info("Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records - .getCurrentInMemoryMapSize()); - log.info("Number of entries in DiskBasedMap in ExternalSpillableMap => " + records - .getDiskBasedMapNumEntries()); + log.info("Number of entries in MemoryBasedMap in ExternalSpillableMap => " + records.getInMemoryMapNumEntries()); + log.info("Total size in bytes of MemoryBasedMap in ExternalSpillableMap => " + records.getCurrentInMemoryMapSize()); + log.info("Number of entries in DiskBasedMap in ExternalSpillableMap => " + records.getDiskBasedMapNumEntries()); log.info("Size of file spilled to disk => " + records.getSizeOfFileOnDiskInBytes()); + log.debug("Total time taken for scanning and compacting log files => " + totalTimeTakenToReadAndMergeBlocks); } /** @@ -307,6 +321,10 @@ public class HoodieCompactedLogRecordScanner implements return totalLogRecords.get(); } + public long getTotalLogBlocks() { + return totalLogBlocks.get(); + } + public Map> getRecords() { return records; } @@ -314,5 +332,17 @@ public class HoodieCompactedLogRecordScanner implements public long getTotalRecordsToUpdate() { return totalRecordsToUpdate; } + + public long getTotalRollbacks() { + return totalRollbacks.get(); + } + + public long getTotalCorruptBlocks() { + return totalCorruptBlocks.get(); + } + + public long getTotalTimeTakenToReadAndMergeBlocks() { + return totalTimeTakenToReadAndMergeBlocks; + } } diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieTimer.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieTimer.java new file mode 100644 index 000000000..6e87387a0 --- /dev/null +++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/HoodieTimer.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.util; + +import com.uber.hoodie.exception.HoodieException; +import java.util.ArrayDeque; +import java.util.Deque; + +/** + * Timing utility to help keep track of execution times of code blocks. This class helps to allow multiple + * timers started at the same time and automatically returns the execution time in the order in which the + * timers are stopped. + */ +public class HoodieTimer { + + // Ordered stack of TimeInfo's to make sure stopping the timer returns the correct elapsed time + Deque timeInfoDeque = new ArrayDeque<>(); + + class TimeInfo { + + // captures the startTime of the code block + long startTime; + // is the timing still running for the last started timer + boolean isRunning; + + public TimeInfo(long startTime) { + this.startTime = startTime; + this.isRunning = true; + } + + public long getStartTime() { + return startTime; + } + + public boolean isRunning() { + return isRunning; + } + + public long stop() { + this.isRunning = false; + return System.currentTimeMillis() - startTime; + } + } + + public HoodieTimer startTimer() { + timeInfoDeque.push(new TimeInfo(System.currentTimeMillis())); + return this; + } + + public long endTimer() { + if (timeInfoDeque.isEmpty()) { + throw new HoodieException("Timer was not started"); + } + return timeInfoDeque.pop().stop(); + } +} diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java index 6aab1e1cd..6a956f27c 100644 --- a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java @@ -27,6 +27,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.uber.hoodie.avro.model.HoodieCleanMetadata; import com.uber.hoodie.common.HoodieCleanStat; +import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats; import com.uber.hoodie.common.table.HoodieTableConfig; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; @@ -335,4 +336,25 @@ public class HoodieTestUtils { } return commits; } + + public static List generateFakeHoodieWriteStat(int limit) { + List writeStatList = new ArrayList<>(); + for (int i = 0; i < limit; i++) { + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setFileId(UUID.randomUUID().toString()); + writeStat.setNumDeletes(0); + writeStat.setNumUpdateWrites(100); + writeStat.setNumWrites(100); + writeStat.setPath("/some/fake/path" + i); + writeStat.setPartitionPath("/some/fake/partition/path" + i); + writeStat.setTotalLogFilesCompacted(100L); + RuntimeStats runtimeStats = new RuntimeStats(); + runtimeStats.setTotalScanTime(100); + runtimeStats.setTotalCreateTime(100); + runtimeStats.setTotalUpsertTime(100); + writeStat.setRuntimeStats(runtimeStats); + writeStatList.add(writeStat); + } + return writeStatList; + } } diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/model/TestHoodieCommitMetadata.java b/hoodie-common/src/test/java/com/uber/hoodie/common/model/TestHoodieCommitMetadata.java new file mode 100644 index 000000000..5f92e83bf --- /dev/null +++ b/hoodie-common/src/test/java/com/uber/hoodie/common/model/TestHoodieCommitMetadata.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2018 Uber Technologies, Inc. (hoodie-dev-group@uber.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.uber.hoodie.common.model; + +import java.util.List; +import org.junit.Assert; +import org.junit.Test; + +public class TestHoodieCommitMetadata { + + @Test + public void testPerfStatPresenceInHoodieMetadata() throws Exception { + + List fakeHoodieWriteStats = HoodieTestUtils.generateFakeHoodieWriteStat(100); + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + fakeHoodieWriteStats.stream().forEach(stat -> commitMetadata.addWriteStat(stat.getPartitionPath(), stat)); + Assert.assertTrue(commitMetadata.getTotalCreateTime() > 0); + Assert.assertTrue(commitMetadata.getTotalUpsertTime() > 0); + Assert.assertTrue(commitMetadata.getTotalScanTime() > 0); + Assert.assertTrue(commitMetadata.getTotalLogFilesCompacted() > 0); + + String serializedCommitMetadata = commitMetadata.toJsonString(); + HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(serializedCommitMetadata); + // Make sure timing metrics are not written to instant file + Assert.assertTrue(metadata.getTotalScanTime() == 0); + Assert.assertTrue(metadata.getTotalLogFilesCompacted() > 0); + } +}