Adding metrics for MOR and COW
This commit is contained in:
committed by
vinoth chandar
parent
c66004d79a
commit
04655e9e85
@@ -543,7 +543,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
long durationInMs = metrics.getDurationInMs(writeContext.stop());
|
||||
metrics
|
||||
.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(commitTime).getTime(),
|
||||
durationInMs, metadata);
|
||||
durationInMs, metadata, actionType);
|
||||
writeContext = null;
|
||||
}
|
||||
logger.info("Committed " + commitTime);
|
||||
@@ -926,6 +926,8 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
HoodieTable<T> table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config);
|
||||
// TODO : Fix table.getActionType for MOR table type to return different actions based on delta or compaction
|
||||
writeContext = metrics.getCommitCtx();
|
||||
JavaRDD<WriteStatus> statuses = table.compact(jsc, commitTime);
|
||||
// Trigger the insert and collect statuses
|
||||
statuses = statuses.persist(config.getWriteStatusStorageLevel());
|
||||
@@ -960,9 +962,22 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(),
|
||||
config.getBasePath(), true);
|
||||
HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config);
|
||||
// TODO : Fix table.getActionType for MOR table type to return different actions based on delta or compaction and
|
||||
// then use getTableAndInitCtx
|
||||
Timer.Context writeContext = metrics.getCommitCtx();
|
||||
JavaRDD<WriteStatus> compactedStatuses = table.compact(jsc, compactionCommitTime);
|
||||
if (!compactedStatuses.isEmpty()) {
|
||||
commitForceCompaction(compactedStatuses, metaClient, compactionCommitTime);
|
||||
HoodieCommitMetadata metadata = commitForceCompaction(compactedStatuses, metaClient, compactionCommitTime);
|
||||
long durationInMs = metrics.getDurationInMs(writeContext.stop());
|
||||
try {
|
||||
metrics
|
||||
.updateCommitMetrics(HoodieActiveTimeline.COMMIT_FORMATTER.parse(compactionCommitTime).getTime(),
|
||||
durationInMs, metadata, HoodieActiveTimeline.COMMIT_ACTION);
|
||||
} catch (ParseException e) {
|
||||
throw new HoodieCommitException(
|
||||
"Commit time is not of valid format.Failed to commit " + config.getBasePath()
|
||||
+ " at time " + compactionCommitTime, e);
|
||||
}
|
||||
logger.info("Compacted successfully on commit " + compactionCommitTime);
|
||||
} else {
|
||||
logger.info("Compaction did not run for commit " + compactionCommitTime);
|
||||
@@ -979,7 +994,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
return compactionCommitTime;
|
||||
}
|
||||
|
||||
private void commitForceCompaction(JavaRDD<WriteStatus> writeStatuses,
|
||||
private HoodieCommitMetadata commitForceCompaction(JavaRDD<WriteStatus> writeStatuses,
|
||||
HoodieTableMetaClient metaClient, String compactionCommitTime) {
|
||||
List<HoodieWriteStat> updateStatusMap = writeStatuses.map(writeStatus -> writeStatus.getStat())
|
||||
.collect();
|
||||
@@ -1002,6 +1017,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
throw new HoodieCompactionException(
|
||||
"Failed to commit " + metaClient.getBasePath() + " at time " + compactionCommitTime, e);
|
||||
}
|
||||
return metadata;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -1043,9 +1059,14 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
|
||||
}
|
||||
|
||||
private HoodieTable getTableAndInitCtx() {
|
||||
writeContext = metrics.getCommitCtx();
|
||||
// Create a Hoodie table which encapsulated the commits and files visible
|
||||
return HoodieTable.getHoodieTable(
|
||||
HoodieTable table = HoodieTable.getHoodieTable(
|
||||
new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config);
|
||||
if (table.getCommitActionType() == HoodieTimeline.COMMIT_ACTION) {
|
||||
writeContext = metrics.getCommitCtx();
|
||||
} else {
|
||||
writeContext = metrics.getDeltaCommitCtx();
|
||||
}
|
||||
return table;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ import com.uber.hoodie.common.model.HoodieLogFile;
|
||||
import com.uber.hoodie.common.model.HoodieRecord;
|
||||
import com.uber.hoodie.common.model.HoodieRecordLocation;
|
||||
import com.uber.hoodie.common.model.HoodieRecordPayload;
|
||||
import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats;
|
||||
import com.uber.hoodie.common.table.TableFileSystemView;
|
||||
import com.uber.hoodie.common.table.log.HoodieLogFormat;
|
||||
import com.uber.hoodie.common.table.log.HoodieLogFormat.Writer;
|
||||
@@ -73,6 +74,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
||||
private HoodieLogFile currentLogFile;
|
||||
private Writer writer;
|
||||
private boolean doInit = true;
|
||||
private long estimatedNumberOfBytesWritten;
|
||||
|
||||
public HoodieAppendHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable,
|
||||
String fileId, Iterator<HoodieRecord<T>> recordItr) {
|
||||
@@ -180,6 +182,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
||||
logger.info("AvgRecordSize => " + averageRecordSize);
|
||||
averageRecordSize = (averageRecordSize + SizeEstimator.estimate(record)) / 2;
|
||||
doAppend(header);
|
||||
estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
|
||||
numberOfRecords = 0;
|
||||
}
|
||||
Optional<IndexedRecord> indexedRecord = getIndexedRecord(record);
|
||||
@@ -191,6 +194,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
||||
numberOfRecords++;
|
||||
}
|
||||
doAppend(header);
|
||||
estimatedNumberOfBytesWritten += averageRecordSize * numberOfRecords;
|
||||
}
|
||||
|
||||
private void doAppend(Map<HoodieLogBlock.HeaderMetadataType, String> header) {
|
||||
@@ -217,7 +221,11 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
||||
}
|
||||
writeStatus.getStat().setNumWrites(recordsWritten);
|
||||
writeStatus.getStat().setNumDeletes(recordsDeleted);
|
||||
writeStatus.getStat().setTotalWriteBytes(estimatedNumberOfBytesWritten);
|
||||
writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size());
|
||||
RuntimeStats runtimeStats = new RuntimeStats();
|
||||
runtimeStats.setTotalUpsertTime(timer.endTimer());
|
||||
writeStatus.getStat().setRuntimeStats(runtimeStats);
|
||||
} catch (IOException e) {
|
||||
throw new HoodieUpsertException("Failed to close UpdateHandle", e);
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ import com.uber.hoodie.common.model.HoodieRecord;
|
||||
import com.uber.hoodie.common.model.HoodieRecordLocation;
|
||||
import com.uber.hoodie.common.model.HoodieRecordPayload;
|
||||
import com.uber.hoodie.common.model.HoodieWriteStat;
|
||||
import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.common.util.ReflectionUtils;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
@@ -136,6 +137,9 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
|
||||
stat.setPaths(new Path(config.getBasePath()), path, tempPath);
|
||||
stat.setTotalWriteBytes(FSUtils.getFileSize(fs, getStorageWriterPath()));
|
||||
stat.setTotalWriteErrors(status.getFailedRecords().size());
|
||||
RuntimeStats runtimeStats = new RuntimeStats();
|
||||
runtimeStats.setTotalCreateTime(timer.endTimer());
|
||||
stat.setRuntimeStats(runtimeStats);
|
||||
status.setStat(stat);
|
||||
|
||||
return status;
|
||||
|
||||
@@ -21,6 +21,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||
import com.uber.hoodie.common.table.HoodieTimeline;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.common.util.HoodieAvroUtils;
|
||||
import com.uber.hoodie.common.util.HoodieTimer;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.exception.HoodieIOException;
|
||||
import com.uber.hoodie.table.HoodieTable;
|
||||
@@ -41,6 +42,7 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
|
||||
protected final HoodieTable<T> hoodieTable;
|
||||
protected final Schema schema;
|
||||
protected HoodieTimeline hoodieTimeline;
|
||||
protected HoodieTimer timer;
|
||||
|
||||
public HoodieIOHandle(HoodieWriteConfig config, String commitTime, HoodieTable<T> hoodieTable) {
|
||||
this.commitTime = commitTime;
|
||||
@@ -49,6 +51,7 @@ public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
|
||||
this.hoodieTable = hoodieTable;
|
||||
this.hoodieTimeline = hoodieTable.getCompletedCommitTimeline();
|
||||
this.schema = createHoodieWriteSchema(config);
|
||||
this.timer = new HoodieTimer().startTimer();
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -22,6 +22,7 @@ import com.uber.hoodie.common.model.HoodieRecord;
|
||||
import com.uber.hoodie.common.model.HoodieRecordLocation;
|
||||
import com.uber.hoodie.common.model.HoodieRecordPayload;
|
||||
import com.uber.hoodie.common.model.HoodieWriteStat;
|
||||
import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats;
|
||||
import com.uber.hoodie.common.table.TableFileSystemView;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.common.util.ReflectionUtils;
|
||||
@@ -261,6 +262,9 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
|
||||
writeStatus.getStat().setNumDeletes(recordsDeleted);
|
||||
writeStatus.getStat().setNumUpdateWrites(updatedRecordsWritten);
|
||||
writeStatus.getStat().setTotalWriteErrors(writeStatus.getFailedRecords().size());
|
||||
RuntimeStats runtimeStats = new RuntimeStats();
|
||||
runtimeStats.setTotalUpsertTime(timer.endTimer());
|
||||
writeStatus.getStat().setRuntimeStats(runtimeStats);
|
||||
} catch (IOException e) {
|
||||
throw new HoodieUpsertException("Failed to close UpdateHandle", e);
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ import com.google.common.collect.Sets;
|
||||
import com.uber.hoodie.WriteStatus;
|
||||
import com.uber.hoodie.common.model.HoodieLogFile;
|
||||
import com.uber.hoodie.common.model.HoodieTableType;
|
||||
import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats;
|
||||
import com.uber.hoodie.common.table.HoodieTableMetaClient;
|
||||
import com.uber.hoodie.common.table.HoodieTimeline;
|
||||
import com.uber.hoodie.common.table.TableFileSystemView;
|
||||
@@ -31,6 +32,7 @@ import com.uber.hoodie.common.table.log.HoodieCompactedLogRecordScanner;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.common.util.HoodieAvroUtils;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.io.compact.strategy.CompactionStrategy;
|
||||
import com.uber.hoodie.table.HoodieCopyOnWriteTable;
|
||||
import com.uber.hoodie.table.HoodieTable;
|
||||
import java.io.IOException;
|
||||
@@ -46,6 +48,8 @@ import org.apache.log4j.Logger;
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
import org.apache.spark.util.AccumulatorV2;
|
||||
import org.apache.spark.util.LongAccumulator;
|
||||
|
||||
/**
|
||||
* HoodieRealtimeTableCompactor compacts a hoodie table with merge on read storage. Computes all
|
||||
@@ -57,11 +61,20 @@ import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
public class HoodieRealtimeTableCompactor implements HoodieCompactor {
|
||||
|
||||
private static Logger log = LogManager.getLogger(HoodieRealtimeTableCompactor.class);
|
||||
// Accumulator to keep track of total log files for a dataset
|
||||
private AccumulatorV2<Long, Long> totalLogFiles;
|
||||
// Accumulator to keep track of total log file slices for a dataset
|
||||
private AccumulatorV2<Long, Long> totalFileSlices;
|
||||
|
||||
@Override
|
||||
public JavaRDD<WriteStatus> compact(JavaSparkContext jsc, HoodieWriteConfig config,
|
||||
HoodieTable hoodieTable, String compactionCommitTime) throws IOException {
|
||||
|
||||
totalLogFiles = new LongAccumulator();
|
||||
totalFileSlices = new LongAccumulator();
|
||||
jsc.sc().register(totalLogFiles);
|
||||
jsc.sc().register(totalFileSlices);
|
||||
|
||||
List<CompactionOperation> operations = getCompactionWorkload(jsc, hoodieTable, config,
|
||||
compactionCommitTime);
|
||||
if (operations == null) {
|
||||
@@ -117,10 +130,18 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
|
||||
Iterable<List<WriteStatus>> resultIterable = () -> result;
|
||||
return StreamSupport.stream(resultIterable.spliterator(), false).flatMap(Collection::stream)
|
||||
.map(s -> {
|
||||
s.getStat().setTotalRecordsToBeUpdate(scanner.getTotalRecordsToUpdate());
|
||||
s.getStat().setTotalLogFiles(scanner.getTotalLogFiles());
|
||||
s.getStat().setTotalUpdatedRecordsCompacted(scanner.getTotalRecordsToUpdate());
|
||||
s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles());
|
||||
s.getStat().setTotalLogRecords(scanner.getTotalLogRecords());
|
||||
s.getStat().setPartitionPath(operation.getPartitionPath());
|
||||
s.getStat().setTotalLogSizeCompacted((long) operation.getMetrics().get(
|
||||
CompactionStrategy.TOTAL_LOG_FILE_SIZE));
|
||||
s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks());
|
||||
s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
|
||||
s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks());
|
||||
RuntimeStats runtimeStats = new RuntimeStats();
|
||||
runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
|
||||
s.getStat().setRuntimeStats(runtimeStats);
|
||||
return s;
|
||||
}).collect(toList());
|
||||
}
|
||||
@@ -145,14 +166,23 @@ public class HoodieRealtimeTableCompactor implements HoodieCompactor {
|
||||
|
||||
TableFileSystemView.RealtimeView fileSystemView = hoodieTable.getRTFileSystemView();
|
||||
log.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
|
||||
List<CompactionOperation> operations = jsc.parallelize(partitionPaths, partitionPaths.size())
|
||||
.flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> fileSystemView
|
||||
.getLatestFileSlices(partitionPath).map(
|
||||
s -> new CompactionOperation(s.getDataFile().get(), partitionPath,
|
||||
s.getLogFiles().sorted(HoodieLogFile.getLogVersionComparator().reversed())
|
||||
.collect(Collectors.toList()), config))
|
||||
.filter(c -> !c.getDeltaFilePaths().isEmpty()).collect(toList()).iterator()).collect();
|
||||
List<CompactionOperation> operations =
|
||||
jsc.parallelize(partitionPaths, partitionPaths.size())
|
||||
.flatMap((FlatMapFunction<String, CompactionOperation>) partitionPath -> fileSystemView
|
||||
.getLatestFileSlices(partitionPath).map(
|
||||
s -> {
|
||||
List<HoodieLogFile> logFiles = s.getLogFiles().sorted(HoodieLogFile
|
||||
.getLogVersionComparator().reversed()).collect(Collectors.toList());
|
||||
totalLogFiles.add((long) logFiles.size());
|
||||
totalFileSlices.add(1L);
|
||||
return new CompactionOperation(s.getDataFile().get(), partitionPath, logFiles, config);
|
||||
})
|
||||
.filter(c -> !c.getDeltaFilePaths().isEmpty())
|
||||
.collect(toList()).iterator()).collect();
|
||||
log.info("Total of " + operations.size() + " compactions are retrieved");
|
||||
log.info("Total number of latest files slices " + totalFileSlices.value());
|
||||
log.info("Total number of log files " + totalLogFiles.value());
|
||||
log.info("Total number of file slices " + totalFileSlices.value());
|
||||
|
||||
// Filter the compactions with the passed in filter. This lets us choose most effective
|
||||
// compactions only
|
||||
|
||||
@@ -17,15 +17,9 @@
|
||||
package com.uber.hoodie.io.compact.strategy;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.uber.hoodie.common.model.HoodieDataFile;
|
||||
import com.uber.hoodie.common.model.HoodieLogFile;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.io.compact.CompactionOperation;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* CompactionStrategy which looks at total IO to be done for the compaction (read + write) and
|
||||
@@ -33,33 +27,7 @@ import java.util.Optional;
|
||||
*
|
||||
* @see CompactionStrategy
|
||||
*/
|
||||
public class BoundedIOCompactionStrategy implements CompactionStrategy {
|
||||
|
||||
public static final String TOTAL_IO_READ_MB = "TOTAL_IO_READ_MB";
|
||||
public static final String TOTAL_IO_WRITE_MB = "TOTAL_IO_WRITE_MB";
|
||||
public static final String TOTAL_IO_MB = "TOTAL_IO_MB";
|
||||
|
||||
@Override
|
||||
public Map<String, Object> captureMetrics(HoodieDataFile dataFile, String partitionPath,
|
||||
List<HoodieLogFile> logFiles) {
|
||||
Map<String, Object> metrics = Maps.newHashMap();
|
||||
// Total size of all the log files
|
||||
Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize)
|
||||
.filter(Optional::isPresent).map(Optional::get).reduce((size1, size2) -> size1 + size2)
|
||||
.orElse(0L);
|
||||
// Total read will be the base file + all the log files
|
||||
Long totalIORead = FSUtils.getSizeInMB(dataFile.getFileSize() + totalLogFileSize);
|
||||
// Total write will be similar to the size of the base file
|
||||
Long totalIOWrite = FSUtils.getSizeInMB(dataFile.getFileSize());
|
||||
// Total IO will the the IO for read + write
|
||||
Long totalIO = totalIORead + totalIOWrite;
|
||||
// Save these metrics and we will use during the filter
|
||||
metrics.put(TOTAL_IO_READ_MB, totalIORead);
|
||||
metrics.put(TOTAL_IO_WRITE_MB, totalIOWrite);
|
||||
metrics.put(TOTAL_IO_MB, totalIO);
|
||||
return metrics;
|
||||
|
||||
}
|
||||
public class BoundedIOCompactionStrategy extends CompactionStrategy {
|
||||
|
||||
@Override
|
||||
public List<CompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig,
|
||||
|
||||
@@ -16,47 +16,76 @@
|
||||
|
||||
package com.uber.hoodie.io.compact.strategy;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.uber.hoodie.common.model.HoodieDataFile;
|
||||
import com.uber.hoodie.common.model.HoodieLogFile;
|
||||
import com.uber.hoodie.common.util.FSUtils;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.io.compact.CompactionOperation;
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Strategy for compaction. Pluggable implementation of define how compaction should be done. The
|
||||
* implementations of this interface can capture the relevant metrics to order and filter the final
|
||||
* list of compaction operation to run in a single compaction.
|
||||
* <p>
|
||||
* Strategy for compaction. Pluggable implementation to define how compaction should be done. The
|
||||
* over-ridden implementations of this abstract class can capture the relevant metrics to order
|
||||
* and filter the final list of compaction operation to run in a single compaction.
|
||||
* Implementation of CompactionStrategy cannot hold any state. Difference instantiations can be
|
||||
* passed in every time
|
||||
*
|
||||
* @see com.uber.hoodie.io.compact.HoodieRealtimeTableCompactor
|
||||
* @see CompactionOperation
|
||||
*/
|
||||
public interface CompactionStrategy extends Serializable {
|
||||
public abstract class CompactionStrategy implements Serializable {
|
||||
|
||||
public static final String TOTAL_IO_READ_MB = "TOTAL_IO_READ_MB";
|
||||
public static final String TOTAL_IO_WRITE_MB = "TOTAL_IO_WRITE_MB";
|
||||
public static final String TOTAL_IO_MB = "TOTAL_IO_MB";
|
||||
public static final String TOTAL_LOG_FILE_SIZE = "TOTAL_LOG_FILES_SIZE";
|
||||
public static final String TOTAL_LOG_FILES = "TOTAL_LOG_FILES";
|
||||
|
||||
/**
|
||||
* Callback hook when a CompactionOperation is created. Individual strategies can capture the
|
||||
* metrics they need to decide on the priority.
|
||||
*
|
||||
* @param dataFile - Base file to compact
|
||||
* @param dataFile - Base file to compact
|
||||
* @param partitionPath - Partition path
|
||||
* @param logFiles - List of log files to compact with the base file
|
||||
* @param logFiles - List of log files to compact with the base file
|
||||
* @return Map[String, Object] - metrics captured
|
||||
*/
|
||||
Map<String, Object> captureMetrics(HoodieDataFile dataFile, String partitionPath,
|
||||
List<HoodieLogFile> logFiles);
|
||||
public Map<String, Object> captureMetrics(HoodieDataFile dataFile, String partitionPath,
|
||||
List<HoodieLogFile> logFiles) {
|
||||
Map<String, Object> metrics = Maps.newHashMap();
|
||||
// Total size of all the log files
|
||||
Long totalLogFileSize = logFiles.stream().map(HoodieLogFile::getFileSize).filter(Optional::isPresent)
|
||||
.map(Optional::get).reduce((size1, size2) -> size1 + size2).orElse(0L);
|
||||
// Total read will be the base file + all the log files
|
||||
Long totalIORead = FSUtils.getSizeInMB(dataFile.getFileSize() + totalLogFileSize);
|
||||
// Total write will be similar to the size of the base file
|
||||
Long totalIOWrite = FSUtils.getSizeInMB(dataFile.getFileSize());
|
||||
// Total IO will the the IO for read + write
|
||||
Long totalIO = totalIORead + totalIOWrite;
|
||||
// Save these metrics and we will use during the filter
|
||||
metrics.put(TOTAL_IO_READ_MB, totalIORead);
|
||||
metrics.put(TOTAL_IO_WRITE_MB, totalIOWrite);
|
||||
metrics.put(TOTAL_IO_MB, totalIO);
|
||||
metrics.put(TOTAL_LOG_FILE_SIZE, totalLogFileSize);
|
||||
metrics.put(TOTAL_LOG_FILES, logFiles.size());
|
||||
return metrics;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Order and Filter the list of compactions. Use the metrics captured with the captureMetrics to
|
||||
* order and filter out compactions
|
||||
*
|
||||
* @param writeConfig - HoodieWriteConfig - config for this compaction is passed in
|
||||
* @param operations - list of compactions collected
|
||||
* @param operations - list of compactions collected
|
||||
* @return list of compactions to perform in this run
|
||||
*/
|
||||
List<CompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig,
|
||||
List<CompactionOperation> operations);
|
||||
public List<CompactionOperation> orderAndFilter(HoodieWriteConfig writeConfig,
|
||||
List<CompactionOperation> operations) {
|
||||
return operations;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,13 +16,9 @@
|
||||
|
||||
package com.uber.hoodie.io.compact.strategy;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.uber.hoodie.common.model.HoodieDataFile;
|
||||
import com.uber.hoodie.common.model.HoodieLogFile;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import com.uber.hoodie.io.compact.CompactionOperation;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* UnBoundedCompactionStrategy will not change ordering or filter any compaction. It is a
|
||||
@@ -31,13 +27,7 @@ import java.util.Map;
|
||||
*
|
||||
* @see CompactionStrategy
|
||||
*/
|
||||
public class UnBoundedCompactionStrategy implements CompactionStrategy {
|
||||
|
||||
@Override
|
||||
public Map<String, Object> captureMetrics(HoodieDataFile dataFile, String partitionPath,
|
||||
List<HoodieLogFile> logFiles) {
|
||||
return Maps.newHashMap();
|
||||
}
|
||||
public class UnBoundedCompactionStrategy extends CompactionStrategy {
|
||||
|
||||
@Override
|
||||
public List<CompactionOperation> orderAndFilter(HoodieWriteConfig config,
|
||||
|
||||
@@ -21,6 +21,7 @@ import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.uber.hoodie.common.model.HoodieCommitMetadata;
|
||||
import com.uber.hoodie.common.table.HoodieTimeline;
|
||||
import com.uber.hoodie.config.HoodieWriteConfig;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.Logger;
|
||||
@@ -35,12 +36,14 @@ public class HoodieMetrics {
|
||||
public String rollbackTimerName = null;
|
||||
public String cleanTimerName = null;
|
||||
public String commitTimerName = null;
|
||||
public String deltaCommitTimerName = null;
|
||||
public String finalizeTimerName = null;
|
||||
private HoodieWriteConfig config = null;
|
||||
private String tableName = null;
|
||||
private Timer rollbackTimer = null;
|
||||
private Timer cleanTimer = null;
|
||||
private Timer commitTimer = null;
|
||||
private Timer deltaCommitTimer = null;
|
||||
private Timer finalizeTimer = null;
|
||||
|
||||
public HoodieMetrics(HoodieWriteConfig config, String tableName) {
|
||||
@@ -48,9 +51,10 @@ public class HoodieMetrics {
|
||||
this.tableName = tableName;
|
||||
if (config.isMetricsOn()) {
|
||||
Metrics.init(config);
|
||||
this.rollbackTimerName = getMetricsName("timer", "rollback");
|
||||
this.cleanTimerName = getMetricsName("timer", "clean");
|
||||
this.commitTimerName = getMetricsName("timer", "commit");
|
||||
this.rollbackTimerName = getMetricsName("timer", HoodieTimeline.ROLLBACK_ACTION);
|
||||
this.cleanTimerName = getMetricsName("timer", HoodieTimeline.CLEAN_ACTION);
|
||||
this.commitTimerName = getMetricsName("timer", HoodieTimeline.COMMIT_ACTION);
|
||||
this.deltaCommitTimerName = getMetricsName("timer", HoodieTimeline.DELTA_COMMIT_ACTION);
|
||||
this.finalizeTimerName = getMetricsName("timer", "finalize");
|
||||
}
|
||||
}
|
||||
@@ -87,8 +91,15 @@ public class HoodieMetrics {
|
||||
return finalizeTimer == null ? null : finalizeTimer.time();
|
||||
}
|
||||
|
||||
public Timer.Context getDeltaCommitCtx() {
|
||||
if (config.isMetricsOn() && deltaCommitTimer == null) {
|
||||
deltaCommitTimer = createTimer(deltaCommitTimerName);
|
||||
}
|
||||
return deltaCommitTimer == null ? null : deltaCommitTimer.time();
|
||||
}
|
||||
|
||||
public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs,
|
||||
HoodieCommitMetadata metadata) {
|
||||
HoodieCommitMetadata metadata, String actionType) {
|
||||
if (config.isMetricsOn()) {
|
||||
long totalPartitionsWritten = metadata.fetchTotalPartitionsWritten();
|
||||
long totalFilesInsert = metadata.fetchTotalFilesInsert();
|
||||
@@ -97,17 +108,27 @@ public class HoodieMetrics {
|
||||
long totalUpdateRecordsWritten = metadata.fetchTotalUpdateRecordsWritten();
|
||||
long totalInsertRecordsWritten = metadata.fetchTotalInsertRecordsWritten();
|
||||
long totalBytesWritten = metadata.fetchTotalBytesWritten();
|
||||
registerGauge(getMetricsName("commit", "duration"), durationInMs);
|
||||
registerGauge(getMetricsName("commit", "totalPartitionsWritten"), totalPartitionsWritten);
|
||||
registerGauge(getMetricsName("commit", "totalFilesInsert"), totalFilesInsert);
|
||||
registerGauge(getMetricsName("commit", "totalFilesUpdate"), totalFilesUpdate);
|
||||
registerGauge(getMetricsName("commit", "totalRecordsWritten"), totalRecordsWritten);
|
||||
registerGauge(getMetricsName("commit", "totalUpdateRecordsWritten"),
|
||||
totalUpdateRecordsWritten);
|
||||
registerGauge(getMetricsName("commit", "totalInsertRecordsWritten"),
|
||||
totalInsertRecordsWritten);
|
||||
registerGauge(getMetricsName("commit", "totalBytesWritten"), totalBytesWritten);
|
||||
registerGauge(getMetricsName("commit", "commitTime"), commitEpochTimeInMs);
|
||||
long totalTimeTakenByScanner = metadata.getTotalScanTime();
|
||||
long totalTimeTakenForInsert = metadata.getTotalCreateTime();
|
||||
long totalTimeTakenForUpsert = metadata.getTotalUpsertTime();
|
||||
long totalCompactedRecordsUpdated = metadata.getTotalCompactedRecordsUpdated();
|
||||
long totalLogFilesCompacted = metadata.getTotalLogFilesCompacted();
|
||||
long totalLogFilesSize = metadata.getTotalLogFilesSize();
|
||||
registerGauge(getMetricsName(actionType, "duration"), durationInMs);
|
||||
registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), totalPartitionsWritten);
|
||||
registerGauge(getMetricsName(actionType, "totalFilesInsert"), totalFilesInsert);
|
||||
registerGauge(getMetricsName(actionType, "totalFilesUpdate"), totalFilesUpdate);
|
||||
registerGauge(getMetricsName(actionType, "totalRecordsWritten"), totalRecordsWritten);
|
||||
registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), totalUpdateRecordsWritten);
|
||||
registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), totalInsertRecordsWritten);
|
||||
registerGauge(getMetricsName(actionType, "totalBytesWritten"), totalBytesWritten);
|
||||
registerGauge(getMetricsName(actionType, "commitTime"), commitEpochTimeInMs);
|
||||
registerGauge(getMetricsName(actionType, "totalScanTime"), totalTimeTakenByScanner);
|
||||
registerGauge(getMetricsName(actionType, "totalCreateTime"), totalTimeTakenForInsert);
|
||||
registerGauge(getMetricsName(actionType, "totalUpsertTime"), totalTimeTakenForUpsert);
|
||||
registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), totalCompactedRecordsUpdated);
|
||||
registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), totalLogFilesCompacted);
|
||||
registerGauge(getMetricsName(actionType, "totalLogFilesSize"), totalLogFilesSize);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user