1
0

Avoid WriteStatus collect() call when committing batch

This commit is contained in:
Balaji Varadarajan
2018-11-27 23:21:34 -08:00
committed by vinoth chandar
parent fa65db9c4c
commit f999e4960c
8 changed files with 29 additions and 22 deletions

View File

@@ -498,19 +498,21 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
HoodieCommitMetadata metadata = new HoodieCommitMetadata(); HoodieCommitMetadata metadata = new HoodieCommitMetadata();
List<WriteStatus> writeStatusList = writeStatuses.collect();
updateMetadataAndRollingStats(actionType, metadata, writeStatusList); List<HoodieWriteStat> stats = writeStatuses.map(status -> status.getStat()).collect();
updateMetadataAndRollingStats(actionType, metadata, stats);
// Finalize write // Finalize write
final Timer.Context finalizeCtx = metrics.getFinalizeCtx(); final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
try { try {
table.finalizeWrite(jsc, writeStatusList); table.finalizeWrite(jsc, stats);
if (finalizeCtx != null) { if (finalizeCtx != null) {
Optional<Long> durationInMs = Optional.of(metrics.getDurationInMs(finalizeCtx.stop())); Optional<Long> durationInMs = Optional.of(metrics.getDurationInMs(finalizeCtx.stop()));
durationInMs.ifPresent(duration -> { durationInMs.ifPresent(duration -> {
logger.info("Finalize write elapsed time (milliseconds): " + duration); logger.info("Finalize write elapsed time (milliseconds): " + duration);
metrics.updateFinalizeWriteMetrics(duration, writeStatusList.size()); metrics.updateFinalizeWriteMetrics(duration, stats.size());
}); });
} }
} catch (HoodieIOException ioe) { } catch (HoodieIOException ioe) {
@@ -1260,7 +1262,7 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
} }
private void updateMetadataAndRollingStats(String actionType, HoodieCommitMetadata metadata, private void updateMetadataAndRollingStats(String actionType, HoodieCommitMetadata metadata,
List<WriteStatus> writeStatusList) { List<HoodieWriteStat> writeStats) {
// TODO : make sure we cannot rollback / archive last commit file // TODO : make sure we cannot rollback / archive last commit file
try { try {
// Create a Hoodie table which encapsulated the commits and files visible // Create a Hoodie table which encapsulated the commits and files visible
@@ -1273,14 +1275,15 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
// 2. Now, first read the existing rolling stats and merge with the result of current metadata. // 2. Now, first read the existing rolling stats and merge with the result of current metadata.
// Need to do this on every commit (delta or commit) to support COW and MOR. // Need to do this on every commit (delta or commit) to support COW and MOR.
for (WriteStatus status : writeStatusList) {
HoodieWriteStat stat = status.getStat(); for (HoodieWriteStat stat : writeStats) {
String partitionPath = stat.getPartitionPath();
//TODO: why is stat.getPartitionPath() null at times here. //TODO: why is stat.getPartitionPath() null at times here.
metadata.addWriteStat(status.getPartitionPath(), stat); metadata.addWriteStat(partitionPath, stat);
HoodieRollingStat hoodieRollingStat = new HoodieRollingStat(stat.getFileId(), HoodieRollingStat hoodieRollingStat = new HoodieRollingStat(stat.getFileId(),
stat.getNumWrites() - (stat.getNumUpdateWrites() - stat.getNumDeletes()), stat.getNumWrites() - (stat.getNumUpdateWrites() - stat.getNumDeletes()),
stat.getNumUpdateWrites(), stat.getNumDeletes(), stat.getTotalWriteBytes()); stat.getNumUpdateWrites(), stat.getNumDeletes(), stat.getTotalWriteBytes());
rollingStatMetadata.addRollingStat(status.getPartitionPath(), hoodieRollingStat); rollingStatMetadata.addRollingStat(partitionPath, hoodieRollingStat);
} }
// The last rolling stat should be present in the completed timeline // The last rolling stat should be present in the completed timeline
Optional<HoodieInstant> lastInstant = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants() Optional<HoodieInstant> lastInstant = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()

View File

@@ -127,6 +127,7 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload> extends HoodieIOH
writeStatus.getStat().setPrevCommit(baseInstantTime); writeStatus.getStat().setPrevCommit(baseInstantTime);
writeStatus.setFileId(fileId); writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath); writeStatus.setPartitionPath(partitionPath);
writeStatus.getStat().setPartitionPath(partitionPath);
writeStatus.getStat().setFileId(fileId); writeStatus.getStat().setFileId(fileId);
averageRecordSize = SizeEstimator.estimate(record); averageRecordSize = SizeEstimator.estimate(record);
try { try {

View File

@@ -149,6 +149,7 @@ public class HoodieCreateHandle<T extends HoodieRecordPayload> extends HoodieIOH
storageWriter.close(); storageWriter.close();
HoodieWriteStat stat = new HoodieWriteStat(); HoodieWriteStat stat = new HoodieWriteStat();
stat.setPartitionPath(status.getPartitionPath());
stat.setNumWrites(recordsWritten); stat.setNumWrites(recordsWritten);
stat.setNumDeletes(recordsDeleted); stat.setNumDeletes(recordsDeleted);
stat.setNumInserts(insertRecordsWritten); stat.setNumInserts(insertRecordsWritten);

View File

@@ -127,6 +127,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload> extends HoodieIOHa
// file name is same for all records, in this bunch // file name is same for all records, in this bunch
writeStatus.setFileId(fileId); writeStatus.setFileId(fileId);
writeStatus.setPartitionPath(partitionPath); writeStatus.setPartitionPath(partitionPath);
writeStatus.getStat().setPartitionPath(partitionPath);
writeStatus.getStat().setFileId(fileId); writeStatus.getStat().setFileId(fileId);
writeStatus.getStat().setPaths(new Path(config.getBasePath()), newFilePath, tempPath); writeStatus.getStat().setPaths(new Path(config.getBasePath()), newFilePath, tempPath);
// Create the writer for writing the new version file // Create the writer for writing the new version file

View File

@@ -162,7 +162,7 @@ public class HoodieMetrics {
} }
} }
public void updateFinalizeWriteMetrics(long durationInMs, int numFilesFinalized) { public void updateFinalizeWriteMetrics(long durationInMs, long numFilesFinalized) {
if (config.isMetricsOn()) { if (config.isMetricsOn()) {
logger.info(String logger.info(String
.format("Sending finalize write metrics (duration=%d, numFilesFinalized=%d)", .format("Sending finalize write metrics (duration=%d, numFilesFinalized=%d)",

View File

@@ -29,6 +29,7 @@ import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordLocation;
import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieRollingStatMetadata; import com.uber.hoodie.common.model.HoodieRollingStatMetadata;
import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
@@ -376,20 +377,19 @@ public class HoodieCopyOnWriteTable<T extends HoodieRecordPayload> extends Hoodi
/** /**
* Finalize the written data files * Finalize the written data files
* *
* @param writeStatuses List of WriteStatus * @param stats List of HoodieWriteStats
* @return number of files finalized * @return number of files finalized
*/ */
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void finalizeWrite(JavaSparkContext jsc, List<WriteStatus> writeStatuses) public void finalizeWrite(JavaSparkContext jsc, List<HoodieWriteStat> stats)
throws HoodieIOException { throws HoodieIOException {
super.finalizeWrite(jsc, writeStatuses); super.finalizeWrite(jsc, stats);
if (config.shouldUseTempFolderForCopyOnWrite()) { if (config.shouldUseTempFolderForCopyOnWrite()) {
// This is to rename each data file from temporary path to its final location // This is to rename each data file from temporary path to its final location
jsc.parallelize(writeStatuses, config.getFinalizeWriteParallelism()) jsc.parallelize(stats, config.getFinalizeWriteParallelism())
.map(status -> status.getStat())
.foreach(writeStat -> { .foreach(writeStat -> {
final FileSystem fs = getMetaClient().getFs(); final FileSystem fs = getMetaClient().getFs();
final Path finalPath = new Path(config.getBasePath(), writeStat.getPath()); final Path finalPath = new Path(config.getBasePath(), writeStat.getPath());

View File

@@ -295,10 +295,10 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
} }
@Override @Override
public void finalizeWrite(JavaSparkContext jsc, List<WriteStatus> writeStatuses) public void finalizeWrite(JavaSparkContext jsc, List<HoodieWriteStat> stats)
throws HoodieIOException { throws HoodieIOException {
// delegate to base class for MOR tables // delegate to base class for MOR tables
super.finalizeWrite(jsc, writeStatuses); super.finalizeWrite(jsc, stats);
} }
@Override @Override

View File

@@ -23,6 +23,7 @@ import com.uber.hoodie.common.HoodieCleanStat;
import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.HoodieRollbackStat;
import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecord;
import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieRecordPayload;
import com.uber.hoodie.common.model.HoodieWriteStat;
import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTableMetaClient;
import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.HoodieTimeline;
import com.uber.hoodie.common.table.TableFileSystemView; import com.uber.hoodie.common.table.TableFileSystemView;
@@ -252,15 +253,15 @@ public abstract class HoodieTable<T extends HoodieRecordPayload> implements Seri
* Finalize the written data onto storage. Perform any final cleanups * Finalize the written data onto storage. Perform any final cleanups
* *
* @param jsc Spark Context * @param jsc Spark Context
* @param writeStatuses List of WriteStatus * @param stats List of HoodieWriteStats
* @throws HoodieIOException if some paths can't be finalized on storage * @throws HoodieIOException if some paths can't be finalized on storage
*/ */
public void finalizeWrite(JavaSparkContext jsc, List<WriteStatus> writeStatuses) public void finalizeWrite(JavaSparkContext jsc, List<HoodieWriteStat> stats)
throws HoodieIOException { throws HoodieIOException {
if (config.isConsistencyCheckEnabled()) { if (config.isConsistencyCheckEnabled()) {
List<String> pathsToCheck = writeStatuses.stream() List<String> pathsToCheck = stats.stream()
.map(ws -> ws.getStat().getTempPath() != null .map(stat -> stat.getTempPath() != null
? ws.getStat().getTempPath() : ws.getStat().getPath()) ? stat.getTempPath() : stat.getPath())
.collect(Collectors.toList()); .collect(Collectors.toList());
List<String> failingPaths = new ConsistencyCheck(config.getBasePath(), pathsToCheck, jsc, List<String> failingPaths = new ConsistencyCheck(config.getBasePath(), pathsToCheck, jsc,