1
0

[HUDI-760]Remove Rolling Stat management from Hudi Writer (#1739)

This commit is contained in:
baobaoyeye
2020-07-01 11:07:09 +08:00
committed by GitHub
parent 8919be6a5d
commit 2be924fd3a
4 changed files with 57 additions and 154 deletions

View File

@@ -24,8 +24,7 @@ import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRollingStat;
import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -849,10 +848,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
}
/**
* Test to ensure commit metadata points to valid files.
* Test to ensure commit metadata points to valid files.10.
*/
@Test
public void testRollingStatsInMetadata() throws Exception {
public void testMetadataStatsOnCommit() throws Exception {
HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false).build();
HoodieWriteClient client = getHoodieWriteClient(cfg);
@@ -876,14 +875,10 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
String everything = FileIOUtils.readAsUTFString(inputStream);
HoodieCommitMetadata metadata =
HoodieCommitMetadata.fromJsonString(everything.toString(), HoodieCommitMetadata.class);
HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata.fromJsonString(
metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY),
HoodieRollingStatMetadata.class);
int inserts = 0;
for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
.entrySet()) {
for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
inserts += stat.getValue().getInserts();
for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat stat : pstat.getValue()) {
inserts += stat.getNumInserts();
}
}
assertEquals(200, inserts);
@@ -905,19 +900,15 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase {
inputStream = new FileInputStream(filename);
everything = FileIOUtils.readAsUTFString(inputStream);
metadata = HoodieCommitMetadata.fromJsonString(everything.toString(), HoodieCommitMetadata.class);
rollingStatMetadata = HoodieCommitMetadata.fromJsonString(
metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY),
HoodieRollingStatMetadata.class);
inserts = 0;
int upserts = 0;
for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
.entrySet()) {
for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
inserts += stat.getValue().getInserts();
upserts += stat.getValue().getUpserts();
for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat stat : pstat.getValue()) {
inserts += stat.getNumInserts();
upserts += stat.getNumUpdateWrites();
}
}
assertEquals(200, inserts);
assertEquals(0, inserts);
assertEquals(200, upserts);
}

View File

@@ -28,9 +28,8 @@ import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRollingStat;
import org.apache.hudi.common.model.HoodieRollingStatMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -1088,11 +1087,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
/**
* Test to ensure rolling stats are correctly written to metadata file.
* Test to ensure metadata stats are correctly written to metadata file.
*/
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testRollingStatsInMetadata(HoodieFileFormat baseFileFormat) throws Exception {
public void testMetadataStatsOnCommit(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
@@ -1100,7 +1099,7 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
metaClient = getHoodieMetaClient(hadoopConf, basePath);
HoodieTable table = HoodieTable.create(metaClient, cfg, hadoopConf);
// Create a commit without rolling stats in metadata to test backwards compatibility
// Create a commit without metadata stats in metadata to test backwards compatibility
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
String commitActionType = table.getMetaClient().getCommitActionType();
HoodieInstant instant = new HoodieInstant(State.REQUESTED, commitActionType, "000");
@@ -1123,14 +1122,10 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
HoodieCommitMetadata metadata = HoodieCommitMetadata.fromBytes(
table.getActiveTimeline().getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
HoodieCommitMetadata.class);
HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata.fromBytes(
metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
HoodieRollingStatMetadata.class);
int inserts = 0;
for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
.entrySet()) {
for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
inserts += stat.getValue().getInserts();
for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat stat : pstat.getValue()) {
inserts += stat.getNumInserts();
}
}
assertEquals(200, inserts);
@@ -1148,20 +1143,17 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
table.getActiveTimeline()
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
HoodieCommitMetadata.class);
rollingStatMetadata = HoodieCommitMetadata.fromBytes(
metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
HoodieRollingStatMetadata.class);
inserts = 0;
int upserts = 0;
for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
.entrySet()) {
for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
inserts += stat.getValue().getInserts();
upserts += stat.getValue().getUpserts();
for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat stat : pstat.getValue()) {
inserts += stat.getNumInserts();
upserts += stat.getNumUpdateWrites();
}
}
assertEquals(200, inserts);
assertEquals(0, inserts);
assertEquals(200, upserts);
client.rollback(instantTime);
@@ -1172,16 +1164,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
table.getActiveTimeline()
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
HoodieCommitMetadata.class);
rollingStatMetadata = HoodieCommitMetadata.fromBytes(
metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
HoodieRollingStatMetadata.class);
inserts = 0;
upserts = 0;
for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
.entrySet()) {
for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
inserts += stat.getValue().getInserts();
upserts += stat.getValue().getUpserts();
for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat stat : pstat.getValue()) {
inserts += stat.getNumInserts();
upserts += stat.getNumUpdateWrites();
}
}
assertEquals(200, inserts);
@@ -1190,11 +1178,11 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
}
/**
* Test to ensure rolling stats are correctly written to the metadata file, identifies small files and corrects them.
* Test to ensure metadata stats are correctly written to the metadata file, identifies small files and corrects them.
*/
@ParameterizedTest
@MethodSource("argumentsProvider")
public void testRollingStatsWithSmallFileHandling(HoodieFileFormat baseFileFormat) throws Exception {
public void testMetadataStatsWithSmallFileHandling(HoodieFileFormat baseFileFormat) throws Exception {
init(baseFileFormat);
HoodieWriteConfig cfg = getConfigBuilder(false, IndexType.INMEMORY).withAutoCommit(false).build();
@@ -1217,16 +1205,12 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
table.getActiveTimeline()
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
HoodieCommitMetadata.class);
HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata.fromBytes(
metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
HoodieRollingStatMetadata.class);
int inserts = 0;
for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
.entrySet()) {
for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
inserts += stat.getValue().getInserts();
fileIdToInsertsMap.put(stat.getKey(), stat.getValue().getInserts());
fileIdToUpsertsMap.put(stat.getKey(), stat.getValue().getUpserts());
for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat stat : pstat.getValue()) {
inserts += stat.getNumInserts();
fileIdToInsertsMap.put(stat.getFileId(), stat.getNumInserts());
fileIdToUpsertsMap.put(stat.getFileId(), stat.getNumUpdateWrites());
}
}
assertEquals(200, inserts);
@@ -1246,23 +1230,18 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
table.getActiveTimeline()
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
HoodieCommitMetadata.class);
rollingStatMetadata = HoodieCommitMetadata.fromBytes(
metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
HoodieRollingStatMetadata.class);
inserts = 0;
int upserts = 0;
for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
.entrySet()) {
for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
// No new file id should be created, all the data should be written to small files already there
assertTrue(fileIdToInsertsMap.containsKey(stat.getKey()));
assertTrue(fileIdToUpsertsMap.containsKey(stat.getKey()));
inserts += stat.getValue().getInserts();
upserts += stat.getValue().getUpserts();
for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat stat : pstat.getValue()) {
assertTrue(fileIdToInsertsMap.containsKey(stat.getFileId()));
assertTrue(fileIdToUpsertsMap.containsKey(stat.getFileId()));
inserts += stat.getNumInserts();
upserts += stat.getNumUpdateWrites();
}
}
assertEquals(400, inserts);
assertEquals(200, inserts);
assertEquals(200, upserts);
// Test small file handling after compaction
@@ -1273,20 +1252,16 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
// Read from commit file
table = HoodieTable.create(cfg, hadoopConf);
metadata = HoodieCommitMetadata.fromBytes(
HoodieCommitMetadata metadata1 = HoodieCommitMetadata.fromBytes(
table.getActiveTimeline()
.getInstantDetails(table.getActiveTimeline().getCommitsTimeline().lastInstant().get()).get(),
HoodieCommitMetadata.class);
HoodieRollingStatMetadata rollingStatMetadata1 = HoodieCommitMetadata.fromBytes(
metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
HoodieRollingStatMetadata.class);
// Ensure that the rolling stats from the extra metadata of delta commits is copied over to the compaction commit
for (Map.Entry<String, Map<String, HoodieRollingStat>> entry : rollingStatMetadata.getPartitionToRollingStats()
.entrySet()) {
assertTrue(rollingStatMetadata1.getPartitionToRollingStats().containsKey(entry.getKey()));
assertEquals(rollingStatMetadata1.getPartitionToRollingStats().get(entry.getKey()).size(),
entry.getValue().size());
// Ensure that the metadata stats from the extra metadata of delta commits is copied over to the compaction commit
for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
assertTrue(metadata1.getPartitionToWriteStats().containsKey(pstat.getKey()));
assertEquals(metadata1.getPartitionToWriteStats().get(pstat.getKey()).size(),
pstat.getValue().size());
}
// Write inserts + updates
@@ -1305,23 +1280,18 @@ public class TestHoodieMergeOnReadTable extends HoodieClientTestHarness {
table.getActiveTimeline()
.getInstantDetails(table.getActiveTimeline().getDeltaCommitTimeline().lastInstant().get()).get(),
HoodieCommitMetadata.class);
rollingStatMetadata = HoodieCommitMetadata.fromBytes(
metadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(),
HoodieRollingStatMetadata.class);
inserts = 0;
upserts = 0;
for (Map.Entry<String, Map<String, HoodieRollingStat>> pstat : rollingStatMetadata.getPartitionToRollingStats()
.entrySet()) {
for (Map.Entry<String, HoodieRollingStat> stat : pstat.getValue().entrySet()) {
// No new file id should be created, all the data should be written to small files already there
assertTrue(fileIdToInsertsMap.containsKey(stat.getKey()));
inserts += stat.getValue().getInserts();
upserts += stat.getValue().getUpserts();
for (Map.Entry<String, List<HoodieWriteStat>> pstat : metadata.getPartitionToWriteStats().entrySet()) {
for (HoodieWriteStat stat : pstat.getValue()) {
assertTrue(fileIdToInsertsMap.containsKey(stat.getFileId()));
inserts += stat.getNumInserts();
upserts += stat.getNumUpdateWrites();
}
}
assertEquals(600, inserts);
assertEquals(600, upserts);
assertEquals(200, inserts);
assertEquals(400, upserts);
}
}