diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index af13171c1..096bc2e33 100644 --- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -22,8 +22,6 @@ import com.codahale.metrics.Timer; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecordPayload; -import org.apache.hudi.common.model.HoodieRollingStat; -import org.apache.hudi.common.model.HoodieRollingStatMetadata; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -104,8 +102,7 @@ public abstract class AbstractHoodieWriteClient e HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); HoodieCommitMetadata metadata = new HoodieCommitMetadata(); List stats = writeStatuses.map(WriteStatus::getStat).collect(); - - updateMetadataAndRollingStats(actionType, metadata, stats); + stats.forEach(stat -> metadata.addWriteStat(stat.getPartitionPath(), stat)); // Finalize write finalizeWrite(table, instantTime, stats); @@ -175,48 +172,6 @@ public abstract class AbstractHoodieWriteClient e } } - private void updateMetadataAndRollingStats(String actionType, HoodieCommitMetadata metadata, - List writeStats) { - // TODO : make sure we cannot rollback / archive last commit file - try { - // Create a Hoodie table which encapsulated the commits and files visible - HoodieTable table = HoodieTable.create(config, hadoopConf); - // 0. All of the rolling stat management is only done by the DELTA commit for MOR and COMMIT for COW other wise - // there may be race conditions - HoodieRollingStatMetadata rollingStatMetadata = new HoodieRollingStatMetadata(actionType); - // 1. Look up the previous compaction/commit and get the HoodieCommitMetadata from there. - // 2. Now, first read the existing rolling stats and merge with the result of current metadata. - - // Need to do this on every commit (delta or commit) to support COW and MOR. - - for (HoodieWriteStat stat : writeStats) { - String partitionPath = stat.getPartitionPath(); - // TODO: why is stat.getPartitionPath() null at times here. - metadata.addWriteStat(partitionPath, stat); - HoodieRollingStat hoodieRollingStat = new HoodieRollingStat(stat.getFileId(), - stat.getNumWrites() - (stat.getNumUpdateWrites() - stat.getNumDeletes()), stat.getNumUpdateWrites(), - stat.getNumDeletes(), stat.getTotalWriteBytes()); - rollingStatMetadata.addRollingStat(partitionPath, hoodieRollingStat); - } - // The last rolling stat should be present in the completed timeline - Option lastInstant = - table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant(); - if (lastInstant.isPresent()) { - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( - table.getActiveTimeline().getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class); - Option lastRollingStat = Option - .ofNullable(commitMetadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY)); - if (lastRollingStat.isPresent()) { - rollingStatMetadata = rollingStatMetadata - .merge(HoodieCommitMetadata.fromBytes(lastRollingStat.get().getBytes(), HoodieRollingStatMetadata.class)); - } - } - metadata.addMetadata(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY, rollingStatMetadata.toJsonString()); - } catch (IOException io) { - throw new HoodieCommitException("Unable to save rolling stats"); - } - } - public HoodieMetrics getMetrics() { return metrics; } diff --git a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index 0717fd2f4..40185c69d 100644 --- a/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -190,10 +190,9 @@ public abstract class BaseCommitActionExecutor> result.setCommitted(true); List stats = result.getWriteStatuses().map(WriteStatus::getStat).collect(); + stats.forEach(stat -> metadata.addWriteStat(stat.getPartitionPath(), stat)); result.setWriteStats(stats); - updateMetadataAndRollingStats(metadata, stats); - // Finalize write finalizeWrite(instantTime, stats, result); @@ -230,18 +229,6 @@ public abstract class BaseCommitActionExecutor> } } - private void updateMetadataAndRollingStats(HoodieCommitMetadata metadata, List writeStats) { - // 1. Look up the previous compaction/commit and get the HoodieCommitMetadata from there. - // 2. Now, first read the existing rolling stats and merge with the result of current metadata. - - // Need to do this on every commit (delta or commit) to support COW and MOR. - for (HoodieWriteStat stat : writeStats) { - String partitionPath = stat.getPartitionPath(); - // TODO: why is stat.getPartitionPath() null at times here. - metadata.addWriteStat(partitionPath, stat); - } - } - protected boolean isWorkloadProfileNeeded() { return true; } diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index 8107cdf62..fc1d6baef 100644 --- a/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -24,8 +24,7 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRollingStat; -import org.apache.hudi.common.model.HoodieRollingStatMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -849,10 +848,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { } /** - * Test to ensure commit metadata points to valid files. + * Test to ensure commit metadata points to valid files.10. */ @Test - public void testRollingStatsInMetadata() throws Exception { + public void testMetadataStatsOnCommit() throws Exception { HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build(); HoodieWriteClient client = getHoodieWriteClient(cfg); @@ -876,14 +875,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { String everything = FileIOUtils.readAsUTFString(inputStream); HoodieCommitMetadata metadata = HoodieCommitMetadata.fromJsonString(everything.toString(), HoodieCommitMetadata.class); - HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata.fromJsonString( - metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY), - HoodieRollingStatMetadata.class); int inserts = 0; - for (Map.Entry> pstat : rollingStatMetadata.getPartitionToRollingStats() - .entrySet()) { - for (Map.Entry stat : pstat.getValue().entrySet()) { - inserts += stat.getValue().getInserts(); + for (Map.Entry> pstat : metadata.getPartitionToWriteStats().entrySet()) { + for (HoodieWriteStat stat : pstat.getValue()) { + inserts += stat.getNumInserts(); } } assertEquals(200, inserts); @@ -905,19 +900,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { inputStream = new FileInputStream(filename); everything = FileIOUtils.readAsUTFString(inputStream); metadata = HoodieCommitMetadata.fromJsonString(everything.toString(), HoodieCommitMetadata.class); - rollingStatMetadata = HoodieCommitMetadata.fromJsonString( - metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY), - HoodieRollingStatMetadata.class); inserts = 0; int upserts = 0; - for (Map.Entry> pstat : rollingStatMetadata.getPartitionToRollingStats() - .entrySet()) { - for (Map.Entry stat : pstat.getValue().entrySet()) { - inserts += stat.getValue().getInserts(); - upserts += stat.getValue().getUpserts(); + for (Map.Entry> pstat : metadata.getPartitionToWriteStats().entrySet()) { + for (HoodieWriteStat stat : pstat.getValue()) { + inserts += stat.getNumInserts(); + upserts += stat.getNumUpdateWrites(); } } - assertEquals(200, inserts); + assertEquals(0, inserts); assertEquals(200, upserts); } diff --git a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java index 0e9402574..8e0afbcd4 100644 --- a/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java +++ b/hudi-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java @@ -28,9 +28,8 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.model.HoodieRollingStat; -import org.apache.hudi.common.model.HoodieRollingStatMetadata; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -1088,11 +1087,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { } /** - * Test to ensure rolling stats are correctly written to metadata file. + * Test to ensure metadata stats are correctly written to metadata file. */ @ParameterizedTest @MethodSource("argumentsProvider") - public void testRollingStatsInMetadata(HoodieFileFormat baseFileFormat) throws Exception { + public void testMetadataStatsOnCommit(HoodieFileFormat baseFileFormat) throws Exception { init(baseFileFormat); HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); @@ -1100,7 +1099,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { metaClient = getHoodieMetaClient(hadoopConf, basePath); HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf); - // Create a commit without rolling stats in metadata to test backwards compatibility + // Create a commit without metadata stats in metadata to test backwards compatibility HoodieActiveTimeline activeTimeline = table.getActiveTimeline(); String commitActionType = table.getMetaClient().getCommitActionType(); HoodieInstant instant = new HoodieInstant(State.REQUESTED, commitActionType, "000"); @@ -1123,14 +1122,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes( table.getActiveTimeline().getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class); - HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata.fromBytes( - metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(), - HoodieRollingStatMetadata.class); int inserts = 0; - for (Map.Entry> pstat : rollingStatMetadata.getPartitionToRollingStats() - .entrySet()) { - for (Map.Entry stat : pstat.getValue().entrySet()) { - inserts += stat.getValue().getInserts(); + for (Map.Entry> pstat : metadata.getPartitionToWriteStats().entrySet()) { + for (HoodieWriteStat stat : pstat.getValue()) { + inserts += stat.getNumInserts(); } } assertEquals(200, inserts); @@ -1148,20 +1143,17 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class); - rollingStatMetadata = HoodieCommitMetadata.fromBytes( - metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(), - HoodieRollingStatMetadata.class); + inserts = 0; int upserts = 0; - for (Map.Entry> pstat : rollingStatMetadata.getPartitionToRollingStats() - .entrySet()) { - for (Map.Entry stat : pstat.getValue().entrySet()) { - inserts += stat.getValue().getInserts(); - upserts += stat.getValue().getUpserts(); + for (Map.Entry> pstat : metadata.getPartitionToWriteStats().entrySet()) { + for (HoodieWriteStat stat : pstat.getValue()) { + inserts += stat.getNumInserts(); + upserts += stat.getNumUpdateWrites(); } } - assertEquals(200, inserts); + assertEquals(0, inserts); assertEquals(200, upserts); client.rollback(instantTime); @@ -1172,16 +1164,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class); - rollingStatMetadata = HoodieCommitMetadata.fromBytes( - metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(), - HoodieRollingStatMetadata.class); inserts = 0; upserts = 0; - for (Map.Entry> pstat : rollingStatMetadata.getPartitionToRollingStats() - .entrySet()) { - for (Map.Entry stat : pstat.getValue().entrySet()) { - inserts += stat.getValue().getInserts(); - upserts += stat.getValue().getUpserts(); + for (Map.Entry> pstat : metadata.getPartitionToWriteStats().entrySet()) { + for (HoodieWriteStat stat : pstat.getValue()) { + inserts += stat.getNumInserts(); + upserts += stat.getNumUpdateWrites(); } } assertEquals(200, inserts); @@ -1190,11 +1178,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { } /** - * Test to ensure rolling stats are correctly written to the metadata file, identifies small files and corrects them. + * Test to ensure metadata stats are correctly written to the metadata file, identifies small files and corrects them. */ @ParameterizedTest @MethodSource("argumentsProvider") - public void testRollingStatsWithSmallFileHandling(HoodieFileFormat baseFileFormat) throws Exception { + public void testMetadataStatsWithSmallFileHandling(HoodieFileFormat baseFileFormat) throws Exception { init(baseFileFormat); HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build(); @@ -1217,16 +1205,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class); - HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata.fromBytes( - metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(), - HoodieRollingStatMetadata.class); int inserts = 0; - for (Map.Entry> pstat : rollingStatMetadata.getPartitionToRollingStats() - .entrySet()) { - for (Map.Entry stat : pstat.getValue().entrySet()) { - inserts += stat.getValue().getInserts(); - fileIdToInsertsMap.put(stat.getKey(), stat.getValue().getInserts()); - fileIdToUpsertsMap.put(stat.getKey(), stat.getValue().getUpserts()); + for (Map.Entry> pstat : metadata.getPartitionToWriteStats().entrySet()) { + for (HoodieWriteStat stat : pstat.getValue()) { + inserts += stat.getNumInserts(); + fileIdToInsertsMap.put(stat.getFileId(), stat.getNumInserts()); + fileIdToUpsertsMap.put(stat.getFileId(), stat.getNumUpdateWrites()); } } assertEquals(200, inserts); @@ -1246,23 +1230,18 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class); - rollingStatMetadata = HoodieCommitMetadata.fromBytes( - metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(), - HoodieRollingStatMetadata.class); inserts = 0; int upserts = 0; - for (Map.Entry> pstat : rollingStatMetadata.getPartitionToRollingStats() - .entrySet()) { - for (Map.Entry stat : pstat.getValue().entrySet()) { - // No new file id should be created, all the data should be written to small files already there - assertTrue(fileIdToInsertsMap.containsKey(stat.getKey())); - assertTrue(fileIdToUpsertsMap.containsKey(stat.getKey())); - inserts += stat.getValue().getInserts(); - upserts += stat.getValue().getUpserts(); + for (Map.Entry> pstat : metadata.getPartitionToWriteStats().entrySet()) { + for (HoodieWriteStat stat : pstat.getValue()) { + assertTrue(fileIdToInsertsMap.containsKey(stat.getFileId())); + assertTrue(fileIdToUpsertsMap.containsKey(stat.getFileId())); + inserts += stat.getNumInserts(); + upserts += stat.getNumUpdateWrites(); } } - assertEquals(400, inserts); + assertEquals(200, inserts); assertEquals(200, upserts); // Test small file handling after compaction @@ -1273,20 +1252,16 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { // Read from commit file table = HoodieTable.create(cfg, hadoopConf); - metadata = HoodieCommitMetadata.fromBytes( + HoodieCommitMetadata metadata1 = HoodieCommitMetadata.fromBytes( table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getCommitsTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class); - HoodieRollingStatMetadata rollingStatMetadata1 = HoodieCommitMetadata.fromBytes( - metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(), - HoodieRollingStatMetadata.class); - // Ensure that the rolling stats from the extra metadata of delta commits is copied over to the compaction commit - for (Map.Entry> entry : rollingStatMetadata.getPartitionToRollingStats() - .entrySet()) { - assertTrue(rollingStatMetadata1.getPartitionToRollingStats().containsKey(entry.getKey())); - assertEquals(rollingStatMetadata1.getPartitionToRollingStats().get(entry.getKey()).size(), - entry.getValue().size()); + // Ensure that the metadata stats from the extra metadata of delta commits is copied over to the compaction commit + for (Map.Entry> pstat : metadata.getPartitionToWriteStats().entrySet()) { + assertTrue(metadata1.getPartitionToWriteStats().containsKey(pstat.getKey())); + assertEquals(metadata1.getPartitionToWriteStats().get(pstat.getKey()).size(), + pstat.getValue().size()); } // Write inserts + updates @@ -1305,23 +1280,18 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness { table.getActiveTimeline() .getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(), HoodieCommitMetadata.class); - rollingStatMetadata = HoodieCommitMetadata.fromBytes( - metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(), - HoodieRollingStatMetadata.class); inserts = 0; upserts = 0; - for (Map.Entry> pstat : rollingStatMetadata.getPartitionToRollingStats() - .entrySet()) { - for (Map.Entry stat : pstat.getValue().entrySet()) { - // No new file id should be created, all the data should be written to small files already there - assertTrue(fileIdToInsertsMap.containsKey(stat.getKey())); - inserts += stat.getValue().getInserts(); - upserts += stat.getValue().getUpserts(); + for (Map.Entry> pstat : metadata.getPartitionToWriteStats().entrySet()) { + for (HoodieWriteStat stat : pstat.getValue()) { + assertTrue(fileIdToInsertsMap.containsKey(stat.getFileId())); + inserts += stat.getNumInserts(); + upserts += stat.getNumUpdateWrites(); } } - assertEquals(600, inserts); - assertEquals(600, upserts); + assertEquals(200, inserts); + assertEquals(400, upserts); } }