diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 147ba56a0..7389cf6e0 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -498,19 +498,21 @@ public class HoodieWriteClient implements Seriali HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(); - List writeStatusList = writeStatuses.collect(); - updateMetadataAndRollingStats(actionType, metadata, writeStatusList); + + List stats = writeStatuses.map(status -> status.getStat()).collect(); + + updateMetadataAndRollingStats(actionType, metadata, stats); // Finalize write final Timer.Context finalizeCtx = metrics.getFinalizeCtx(); try { - table.finalizeWrite(jsc, writeStatusList); + table.finalizeWrite(jsc, stats); if (finalizeCtx != null) { Optional durationInMs = Optional.of(metrics.getDurationInMs(finalizeCtx.stop())); durationInMs.ifPresent(duration -> { logger.info("Finalize write elapsed time (milliseconds): " + duration); - metrics.updateFinalizeWriteMetrics(duration, writeStatusList.size()); + metrics.updateFinalizeWriteMetrics(duration, stats.size()); }); } } catch (HoodieIOException ioe) { @@ -1260,7 +1262,7 @@ public class HoodieWriteClient implements Seriali } private void updateMetadataAndRollingStats(String actionType, HoodieCommitMetadata metadata, - List writeStatusList) { + List writeStats) { // TODO : make sure we cannot rollback / archive last commit file try { // Create a Hoodie table which encapsulated the commits and files visible @@ -1273,14 +1275,15 @@ public class HoodieWriteClient implements Seriali // 2. Now, first read the existing rolling stats and merge with the result of current metadata. // Need to do this on every commit (delta or commit) to support COW and MOR. - for (WriteStatus status : writeStatusList) { - HoodieWriteStat stat = status.getStat(); + + for (HoodieWriteStat stat : writeStats) { + String partitionPath = stat.getPartitionPath(); //TODO: why is stat.getPartitionPath() null at times here. - metadata.addWriteStat(status.getPartitionPath(), stat); + metadata.addWriteStat(partitionPath, stat); HoodieRollingStat hoodieRollingStat = new HoodieRollingStat(stat.getFileId(), stat.getNumWrites() - (stat.getNumUpdateWrites() - stat.getNumDeletes()), stat.getNumUpdateWrites(), stat.getNumDeletes(), stat.getTotalWriteBytes()); - rollingStatMetadata.addRollingStat(status.getPartitionPath(), hoodieRollingStat); + rollingStatMetadata.addRollingStat(partitionPath, hoodieRollingStat); } // The last rolling stat should be present in the completed timeline Optional lastInstant = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants() diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java index c60384475..5434c5766 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java @@ -127,6 +127,7 @@ public class HoodieAppendHandle extends HoodieIOH writeStatus.getStat().setPrevCommit(baseInstantTime); writeStatus.setFileId(fileId); writeStatus.setPartitionPath(partitionPath); + writeStatus.getStat().setPartitionPath(partitionPath); writeStatus.getStat().setFileId(fileId); averageRecordSize = SizeEstimator.estimate(record); try { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java index 0781faf36..c2688b8f0 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java @@ -149,6 +149,7 @@ public class HoodieCreateHandle extends HoodieIOH storageWriter.close(); HoodieWriteStat stat = new HoodieWriteStat(); + stat.setPartitionPath(status.getPartitionPath()); stat.setNumWrites(recordsWritten); stat.setNumDeletes(recordsDeleted); stat.setNumInserts(insertRecordsWritten); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java index f11fce57c..621b37a76 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java @@ -127,6 +127,7 @@ public class HoodieMergeHandle extends HoodieIOHa // file name is same for all records, in this bunch writeStatus.setFileId(fileId); writeStatus.setPartitionPath(partitionPath); + writeStatus.getStat().setPartitionPath(partitionPath); writeStatus.getStat().setFileId(fileId); writeStatus.getStat().setPaths(new Path(config.getBasePath()), newFilePath, tempPath); // Create the writer for writing the new version file diff --git a/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java b/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java index ead65d70d..aa9afa6fc 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/metrics/HoodieMetrics.java @@ -162,7 +162,7 @@ public class HoodieMetrics { } } - public void updateFinalizeWriteMetrics(long durationInMs, int numFilesFinalized) { + public void updateFinalizeWriteMetrics(long durationInMs, long numFilesFinalized) { if (config.isMetricsOn()) { logger.info(String .format("Sending finalize write metrics (duration=%d, numFilesFinalized=%d)", diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java index 67411627b..fc6dcf904 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java @@ -29,6 +29,7 @@ import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordLocation; import com.uber.hoodie.common.model.HoodieRecordPayload; import com.uber.hoodie.common.model.HoodieRollingStatMetadata; +import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline; @@ -376,20 +377,19 @@ public class HoodieCopyOnWriteTable extends Hoodi /** * Finalize the written data files * - * @param writeStatuses List of WriteStatus + * @param stats List of HoodieWriteStats * @return number of files finalized */ @Override @SuppressWarnings("unchecked") - public void finalizeWrite(JavaSparkContext jsc, List writeStatuses) + public void finalizeWrite(JavaSparkContext jsc, List stats) throws HoodieIOException { - super.finalizeWrite(jsc, writeStatuses); + super.finalizeWrite(jsc, stats); if (config.shouldUseTempFolderForCopyOnWrite()) { // This is to rename each data file from temporary path to its final location - jsc.parallelize(writeStatuses, config.getFinalizeWriteParallelism()) - .map(status -> status.getStat()) + jsc.parallelize(stats, config.getFinalizeWriteParallelism()) .foreach(writeStat -> { final FileSystem fs = getMetaClient().getFs(); final Path finalPath = new Path(config.getBasePath(), writeStat.getPath()); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 9968d72d4..ad065fbfc 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -295,10 +295,10 @@ public class HoodieMergeOnReadTable extends } @Override - public void finalizeWrite(JavaSparkContext jsc, List writeStatuses) + public void finalizeWrite(JavaSparkContext jsc, List stats) throws HoodieIOException { // delegate to base class for MOR tables - super.finalizeWrite(jsc, writeStatuses); + super.finalizeWrite(jsc, stats); } @Override diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index 28de32b90..10ef94ed2 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -23,6 +23,7 @@ import com.uber.hoodie.common.HoodieCleanStat; import com.uber.hoodie.common.HoodieRollbackStat; import com.uber.hoodie.common.model.HoodieRecord; import com.uber.hoodie.common.model.HoodieRecordPayload; +import com.uber.hoodie.common.model.HoodieWriteStat; import com.uber.hoodie.common.table.HoodieTableMetaClient; import com.uber.hoodie.common.table.HoodieTimeline; import com.uber.hoodie.common.table.TableFileSystemView; @@ -252,15 +253,15 @@ public abstract class HoodieTable implements Seri * Finalize the written data onto storage. Perform any final cleanups * * @param jsc Spark Context - * @param writeStatuses List of WriteStatus + * @param stats List of HoodieWriteStats * @throws HoodieIOException if some paths can't be finalized on storage */ - public void finalizeWrite(JavaSparkContext jsc, List writeStatuses) + public void finalizeWrite(JavaSparkContext jsc, List stats) throws HoodieIOException { if (config.isConsistencyCheckEnabled()) { - List pathsToCheck = writeStatuses.stream() - .map(ws -> ws.getStat().getTempPath() != null - ? ws.getStat().getTempPath() : ws.getStat().getPath()) + List pathsToCheck = stats.stream() + .map(stat -> stat.getTempPath() != null + ? stat.getTempPath() : stat.getPath()) .collect(Collectors.toList()); List failingPaths = new ConsistencyCheck(config.getBasePath(), pathsToCheck, jsc,