1
0

Adding check for rolling stats not present to handle backwards compatibility of existing timeline

This commit is contained in:
Nishith Agarwal
2018-09-08 12:26:54 -07:00
committed by vinoth chandar
parent ea7823a9dd
commit 2b1af18941
3 changed files with 22 additions and 9 deletions

View File

@@ -1286,9 +1286,12 @@ public class HoodieWriteClient<T extends HoodieRecordPayload> implements Seriali
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(table.getActiveTimeline().getInstantDetails(lastInstant .fromBytes(table.getActiveTimeline().getInstantDetails(lastInstant
.get()).get(), HoodieCommitMetadata.class); .get()).get(), HoodieCommitMetadata.class);
rollingStatMetadata = rollingStatMetadata Optional<String> lastRollingStat = Optional.ofNullable(commitMetadata.getExtraMetadata()
.merge(HoodieCommitMetadata.fromBytes(commitMetadata.getExtraMetadata() .get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY));
.get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY).getBytes(), HoodieRollingStatMetadata.class)); if (lastRollingStat.isPresent()) {
rollingStatMetadata = rollingStatMetadata
.merge(HoodieCommitMetadata.fromBytes(lastRollingStat.get().getBytes(), HoodieRollingStatMetadata.class));
}
} }
metadata.addMetadata(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY, rollingStatMetadata.toJsonString()); metadata.addMetadata(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY, rollingStatMetadata.toJsonString());
} catch (IOException io) { } catch (IOException io) {

View File

@@ -307,10 +307,13 @@ public class HoodieMergeOnReadTable<T extends HoodieRecordPayload> extends
if (lastInstant.isPresent()) { if (lastInstant.isPresent()) {
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes( HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
this.getActiveTimeline().getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class); this.getActiveTimeline().getInstantDetails(lastInstant.get()).get(), HoodieCommitMetadata.class);
HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata Optional<String> lastRollingStat = Optional.ofNullable(commitMetadata.getExtraMetadata()
.fromBytes(commitMetadata.getExtraMetadata().get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY) .get(HoodieRollingStatMetadata.ROLLING_STAT_METADATA_KEY));
.getBytes(), HoodieRollingStatMetadata.class); if (lastRollingStat.isPresent()) {
return rollingStatMetadata; HoodieRollingStatMetadata rollingStatMetadata = HoodieCommitMetadata
.fromBytes(lastRollingStat.get().getBytes(), HoodieRollingStatMetadata.class);
return rollingStatMetadata;
}
} }
return null; return null;
} catch (IOException e) { } catch (IOException e) {

View File

@@ -911,7 +911,14 @@ public class TestMergeOnReadTable {
HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc); HoodieTable table = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
String commitTime = "000"; // Create a commit without rolling stats in metadata to test backwards compatibility
HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
String commitActionType = table.getMetaClient().getCommitActionType();
HoodieInstant instant = new HoodieInstant(true, commitActionType, "000");
activeTimeline.createInflight(instant);
activeTimeline.saveAsComplete(instant, Optional.empty());
String commitTime = "001";
client.startCommitWithTime(commitTime); client.startCommitWithTime(commitTime);
HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(); HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
@@ -936,7 +943,7 @@ public class TestMergeOnReadTable {
} }
Assert.assertEquals(inserts, 200); Assert.assertEquals(inserts, 200);
commitTime = "001"; commitTime = "002";
client.startCommitWithTime(commitTime); client.startCommitWithTime(commitTime);
records = dataGen.generateUpdates(commitTime, records); records = dataGen.generateUpdates(commitTime, records);
writeRecords = jsc.parallelize(records, 1); writeRecords = jsc.parallelize(records, 1);